diff --git a/ubusd.c b/ubusd.c index 5993653..c324c70 100644 --- a/ubusd.c +++ b/ubusd.c @@ -133,13 +133,25 @@ ssize_t ubus_msg_writev(int fd, struct ubus_msg_buf *ub, size_t offset) return ret; } +void ubus_msg_list_free(struct ubus_msg_buf_list *ubl) +{ + list_del_init(&ubl->list); + ubus_msg_free(ubl->msg); + free(ubl); +} + static void ubus_msg_enqueue(struct ubus_client *cl, struct ubus_msg_buf *ub) { - if (cl->tx_queue[cl->txq_tail]) + struct ubus_msg_buf_list *ubl; + + ubl = calloc(1, sizeof(struct ubus_msg_buf_list)); + if (!ubl) return; - cl->tx_queue[cl->txq_tail] = ubus_msg_ref(ub); - cl->txq_tail = (cl->txq_tail + 1) % ARRAY_SIZE(cl->tx_queue); + INIT_LIST_HEAD(&ubl->list); + ubl->msg = ubus_msg_ref(ub); + + list_add_tail(&cl->tx_queue, &ubl->list); } /* takes the msgbuf reference */ @@ -150,7 +162,7 @@ void ubus_msg_send(struct ubus_client *cl, struct ubus_msg_buf *ub) if (ub->hdr.type != UBUS_MSG_MONITOR) ubusd_monitor_message(cl, ub, true); - if (!cl->tx_queue[cl->txq_cur]) { + if (list_empty(&cl->tx_queue)) { written = ubus_msg_writev(cl->sock.fd, ub, 0); if (written < 0) diff --git a/ubusd.h b/ubusd.h index 923e43d..f34cba1 100644 --- a/ubusd.h +++ b/ubusd.h @@ -23,7 +23,6 @@ #include "ubusmsg.h" #include "ubusd_acl.h" -#define UBUSD_CLIENT_BACKLOG 32 #define UBUS_OBJ_HASH_BITS 4 extern struct blob_buf b; @@ -36,6 +35,11 @@ struct ubus_msg_buf { int len; }; +struct ubus_msg_buf_list { + struct list_head list; + struct ubus_msg_buf *msg; +}; + struct ubus_client { struct ubus_id id; struct uloop_fd sock; @@ -48,8 +52,8 @@ struct ubus_client { struct list_head objects; - struct ubus_msg_buf *tx_queue[UBUSD_CLIENT_BACKLOG]; - unsigned int txq_cur, txq_tail, txq_ofs; + struct list_head tx_queue; + unsigned int txq_ofs; struct ubus_msg_buf *pending_msg; struct ubus_msg_buf *retmsg; @@ -72,6 +76,7 @@ struct ubus_msg_buf *ubus_msg_new(void *data, int len, bool shared); void ubus_msg_send(struct ubus_client *cl, struct ubus_msg_buf *ub); ssize_t ubus_msg_writev(int fd, struct ubus_msg_buf *ub, size_t offset); void ubus_msg_free(struct ubus_msg_buf *ub); +void ubus_msg_list_free(struct ubus_msg_buf_list *ubl); struct blob_attr **ubus_parse_msg(struct blob_attr *msg, size_t len); struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb); diff --git a/ubusd_main.c b/ubusd_main.c index c3d8049..3728a42 100644 --- a/ubusd_main.c +++ b/ubusd_main.c @@ -17,28 +17,11 @@ #include "ubusd.h" -static struct ubus_msg_buf *ubus_msg_head(struct ubus_client *cl) -{ - return cl->tx_queue[cl->txq_cur]; -} - -static void ubus_msg_dequeue(struct ubus_client *cl) -{ - struct ubus_msg_buf *ub = ubus_msg_head(cl); - - if (!ub) - return; - - ubus_msg_free(ub); - cl->txq_ofs = 0; - cl->tx_queue[cl->txq_cur] = NULL; - cl->txq_cur = (cl->txq_cur + 1) % ARRAY_SIZE(cl->tx_queue); -} - static void handle_client_disconnect(struct ubus_client *cl) { - while (ubus_msg_head(cl)) - ubus_msg_dequeue(cl); + struct ubus_msg_buf_list *ubl, *ubl2; + list_for_each_entry_safe(ubl, ubl2, &cl->tx_queue, list) + ubus_msg_list_free(ubl); ubusd_monitor_disconnect(cl); ubusd_proto_free_client(cl); @@ -55,6 +38,7 @@ static void client_cb(struct uloop_fd *sock, unsigned int events) uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 }; struct msghdr msghdr = { 0 }; struct ubus_msg_buf *ub; + struct ubus_msg_buf_list *ubl, *ubl2; static struct iovec iov; struct cmsghdr *cmsg; int *pfd; @@ -73,9 +57,10 @@ static void client_cb(struct uloop_fd *sock, unsigned int events) msghdr.msg_controllen = cmsg->cmsg_len; /* first try to tx more pending data */ - while ((ub = ubus_msg_head(cl))) { + list_for_each_entry_safe(ubl, ubl2, &cl->tx_queue, list) { ssize_t written; + ub = ubl->msg; written = ubus_msg_writev(sock->fd, ub, cl->txq_ofs); if (written < 0) { switch(errno) { @@ -92,12 +77,12 @@ static void client_cb(struct uloop_fd *sock, unsigned int events) if (cl->txq_ofs < ub->len + sizeof(ub->hdr)) break; - ubus_msg_dequeue(cl); + ubus_msg_list_free(ubl); } /* prevent further ULOOP_WRITE events if we don't have data * to send anymore */ - if (!ubus_msg_head(cl) && (events & ULOOP_WRITE)) + if (list_empty(&cl->tx_queue) && (events & ULOOP_WRITE)) uloop_fd_add(sock, ULOOP_READ | ULOOP_EDGE_TRIGGER); retry: @@ -171,7 +156,7 @@ retry: } out: - if (!sock->eof || ubus_msg_head(cl)) + if (!sock->eof || !list_empty(&cl->tx_queue)) return; disconnect: diff --git a/ubusd_proto.c b/ubusd_proto.c index 4746605..b20f91c 100644 --- a/ubusd_proto.c +++ b/ubusd_proto.c @@ -495,6 +495,7 @@ struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb) goto free; INIT_LIST_HEAD(&cl->objects); + INIT_LIST_HEAD(&cl->tx_queue); cl->sock.fd = fd; cl->sock.cb = cb; cl->pending_msg_fd = -1;