ubusd: add lookup command queuing support

Defers and continues a client's lookup command to avoid unnecessary
buffering under load.

Signed-off-by: Daniel Danzberger <daniel@dd-wrt.com>
Signed-off-by: Felix Fietkau <nbd@nbd.name>
This commit is contained in:
Daniel Danzberger 2022-06-08 13:12:29 +02:00 committed by Felix Fietkau
parent 2bebf93cd3
commit 9913aa61de
3 changed files with 122 additions and 8 deletions

View file

@ -41,6 +41,12 @@ struct ubus_msg_buf_list {
struct ubus_msg_buf *msg;
};
struct ubus_client_cmd {
struct list_head list;
struct ubus_msg_buf *msg;
struct ubus_object *obj;
};
struct ubus_client {
struct ubus_id id;
struct uloop_fd sock;
@ -53,6 +59,7 @@ struct ubus_client {
struct list_head objects;
struct list_head cmd_queue;
struct list_head tx_queue;
unsigned int txq_ofs;
unsigned int txq_len;
@ -86,6 +93,7 @@ void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub
void ubusd_proto_free_client(struct ubus_client *cl);
void ubus_proto_send_msg_from_blob(struct ubus_client *cl, struct ubus_msg_buf *ub,
uint8_t type);
int ubusd_cmd_lookup(struct ubus_client *cl, struct ubus_client_cmd *cmd);
typedef struct ubus_msg_buf *(*event_fill_cb)(void *priv, const char *id);
void ubusd_event_init(void);

View file

@ -32,6 +32,28 @@ static void handle_client_disconnect(struct ubus_client *cl)
free(cl);
}
static void ubus_client_cmd_free(struct ubus_client_cmd *cmd)
{
list_del(&cmd->list);
ubus_msg_free(cmd->msg);
free(cmd);
}
static void ubus_client_cmd_queue_process(struct ubus_client *cl)
{
struct ubus_client_cmd *cmd, *tmp;
list_for_each_entry_safe(cmd, tmp, &cl->cmd_queue, list) {
int ret = ubusd_cmd_lookup(cl, cmd);
/* Stop if the last command caused buffering again */
if (ret == -2)
break;
ubus_client_cmd_free(cmd);
}
}
static void client_cb(struct uloop_fd *sock, unsigned int events)
{
struct ubus_client *cl = container_of(sock, struct ubus_client, sock);
@ -82,10 +104,15 @@ static void client_cb(struct uloop_fd *sock, unsigned int events)
ubus_msg_list_free(ubl);
}
/* prevent further ULOOP_WRITE events if we don't have data
* to send anymore */
if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE))
uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER);
if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE)) {
/* Process queued commands */
ubus_client_cmd_queue_process(cl);
/* prevent further ULOOP_WRITE events if we don't have data
* to send anymore */
if (list_empty(&cl->tx_queue))
uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER);
}
retry:
if (!sock->eof && cl->pending_msg_offset < (int) sizeof(cl->hdrbuf)) {

View file

@ -186,16 +186,56 @@ static void ubusd_send_obj(struct ubus_client *cl, struct ubus_msg_buf *ub, stru
ubus_proto_send_msg_from_blob(cl, ub, UBUS_MSG_DATA);
}
static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr)
static int ubus_client_cmd_queue_add(struct ubus_client *cl,
struct ubus_msg_buf *msg,
struct ubus_object *obj)
{
struct ubus_object *obj;
struct ubus_client_cmd *cmd = malloc(sizeof(*cmd));
if (cmd) {
cmd->msg = msg;
cmd->obj = obj;
list_add_tail(&cmd->list, &cl->cmd_queue);
return -2;
}
return UBUS_STATUS_UNKNOWN_ERROR;
}
static int __ubusd_handle_lookup(struct ubus_client *cl,
struct ubus_msg_buf *ub,
struct blob_attr **attr,
struct ubus_client_cmd *cmd)
{
struct ubus_object *obj = NULL;
char *objpath;
bool found = false;
int len;
if (!attr[UBUS_ATTR_OBJPATH]) {
avl_for_each_element(&path, obj, path)
ubusd_send_obj(cl, ub, obj);
if (cmd)
obj = cmd->obj;
/* Start from beginning or continue from the last object */
if (obj == NULL)
obj = avl_first_element(&path, obj, path);
avl_for_element_range(obj, avl_last_element(&path, obj, path), obj, path) {
/* Keep sending objects until buffering starts */
if (list_empty(&cl->tx_queue)) {
ubusd_send_obj(cl, ub, obj);
} else {
/* Queue command and continue on the next call */
int ret;
if (cmd == NULL) {
ret = ubus_client_cmd_queue_add(cl, ub, obj);
} else {
cmd->obj = obj;
ret = -2;
}
return ret;
}
}
return 0;
}
@ -230,6 +270,40 @@ static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub,
return 0;
}
static int ubusd_handle_lookup(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr)
{
int rc;
if (list_empty(&cl->tx_queue))
rc = __ubusd_handle_lookup(cl, ub, attr, NULL);
else
rc = ubus_client_cmd_queue_add(cl, ub, NULL);
return rc;
}
int ubusd_cmd_lookup(struct ubus_client *cl, struct ubus_client_cmd *cmd)
{
struct ubus_msg_buf *ub = cmd->msg;
struct blob_attr **attr;
int ret;
attr = ubus_parse_msg(ub->data, blob_raw_len(ub->data));
ret = __ubusd_handle_lookup(cl, ub, attr, cmd);
if (ret != -2) {
struct ubus_msg_buf *retmsg = cl->retmsg;
int *retmsg_data = blob_data(blob_data(retmsg->data));
retmsg->hdr.seq = ub->hdr.seq;
retmsg->hdr.peer = ub->hdr.peer;
*retmsg_data = htonl(ret);
ubus_msg_send(cl, retmsg);
}
return ret;
}
static void
ubusd_forward_invoke(struct ubus_client *cl, struct ubus_object *obj,
const char *method, struct ubus_msg_buf *ub,
@ -458,6 +532,10 @@ void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub
else
ret = UBUS_STATUS_INVALID_COMMAND;
/* Command has not been completed yet and got queued */
if (ret == -2)
return;
ubus_msg_free(ub);
if (ret == -1)
@ -495,6 +573,7 @@ struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb)
goto free;
INIT_LIST_HEAD(&cl->objects);
INIT_LIST_HEAD(&cl->cmd_queue);
INIT_LIST_HEAD(&cl->tx_queue);
cl->sock.fd = fd;
cl->sock.cb = cb;