diff --git a/CMakeLists.txt b/CMakeLists.txt index a2c4101..b32cbb9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,7 +12,7 @@ IF(APPLE) LINK_DIRECTORIES(/opt/local/lib) ENDIF() -ADD_LIBRARY(ubus SHARED libubus.c libubus-io.c libubus-obj.c) +ADD_LIBRARY(ubus SHARED libubus.c libubus-io.c libubus-obj.c libubus-sub.c) TARGET_LINK_LIBRARIES(ubus ubox) ADD_EXECUTABLE(ubusd ubusd.c ubusd_id.c ubusd_obj.c ubusd_proto.c ubusd_event.c) diff --git a/examples/server.c b/examples/server.c index 97789d5..529059c 100644 --- a/examples/server.c +++ b/examples/server.c @@ -16,7 +16,7 @@ #include "libubus.h" static struct ubus_context *ctx; -static struct ubus_watch_object test_event; +static struct ubus_subscriber test_event; static struct blob_buf b; enum { @@ -79,8 +79,9 @@ static const struct blobmsg_policy watch_policy[__WATCH_MAX] = { [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 }, }; -static void test_handle_event(struct ubus_context *ctx, struct ubus_watch_object *w, - uint32_t id) +static void +test_handle_remove(struct ubus_context *ctx, struct ubus_subscriber *s, + uint32_t id) { fprintf(stderr, "Object %08x went away\n", id); } @@ -96,8 +97,8 @@ static int test_watch(struct ubus_context *ctx, struct ubus_object *obj, if (!tb[WATCH_ID]) return UBUS_STATUS_INVALID_ARGUMENT; - test_event.cb = test_handle_event; - ret = ubus_watch_object_add(ctx, &test_event, blobmsg_get_u32(tb[WATCH_ID])); + test_event.remove_cb = test_handle_remove; + ret = ubus_subscribe(ctx, &test_event, blobmsg_get_u32(tb[WATCH_ID])); fprintf(stderr, "Watching object %08x: %s\n", blobmsg_get_u32(tb[WATCH_ID]), ubus_strerror(ret)); return ret; } @@ -125,7 +126,7 @@ static void server_main(void) if (ret) fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret)); - ret = ubus_register_watch_object(ctx, &test_event); + ret = ubus_register_subscriber(ctx, &test_event); if (ret) fprintf(stderr, "Failed to add watch handler: %s\n", ubus_strerror(ret)); diff --git a/libubus-internal.h b/libubus-internal.h index b58fb1d..f3e2a73 100644 --- a/libubus-internal.h +++ b/libubus-internal.h @@ -26,5 +26,6 @@ void ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr); void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr); int __hidden ubus_start_request(struct ubus_context *ctx, struct ubus_request *req, struct blob_attr *msg, int cmd, uint32_t peer); +void ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr); #endif diff --git a/libubus-sub.c b/libubus-sub.c new file mode 100644 index 0000000..2bfb483 --- /dev/null +++ b/libubus-sub.c @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2011-2012 Felix Fietkau + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include "libubus.h" +#include "libubus-internal.h" + +static int ubus_subscriber_cb(struct ubus_context *ctx, struct ubus_object *obj, + struct ubus_request_data *req, + const char *method, struct blob_attr *msg) +{ + struct ubus_subscriber *s; + + s = container_of(obj, struct ubus_subscriber, obj); + s->cb(ctx, obj, req, method, msg); + return 0; +} + +static const struct ubus_method watch_method = { + .name = NULL, + .handler = ubus_subscriber_cb, +}; + +int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *s) +{ + struct ubus_object *obj = &s->obj; + + obj->methods = &watch_method; + obj->n_methods = 1; + + return ubus_add_object(ctx, obj); +} + +static int +__ubus_subscribe_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type) +{ + struct ubus_request req; + + blob_buf_init(&b, 0); + blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id); + blob_put_int32(&b, UBUS_ATTR_TARGET, id); + if (method) + blob_put_string(&b, UBUS_ATTR_METHOD, method); + + if (ubus_start_request(ctx, &req, b.head, type, 0) < 0) + return UBUS_STATUS_INVALID_ARGUMENT; + + return ubus_complete_request(ctx, &req, 0); + +} + +int ubus_subscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id) +{ + return __ubus_subscribe_request(ctx, &obj->obj, id, "event", UBUS_MSG_SUBSCRIBE); +} + +int ubus_unsubscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id) +{ + return __ubus_subscribe_request(ctx, &obj->obj, id, NULL, UBUS_MSG_UNSUBSCRIBE); +} + +void __hidden ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr) +{ + struct ubus_subscriber *s; + struct blob_attr **attrbuf; + struct ubus_object *obj; + uint32_t objid; + + attrbuf = ubus_parse_msg(hdr->data); + if (!attrbuf[UBUS_ATTR_OBJID] || !attrbuf[UBUS_ATTR_TARGET]) + return; + + objid = blob_get_u32(attrbuf[UBUS_ATTR_OBJID]); + obj = avl_find_element(&ctx->objects, &objid, obj, avl); + if (!obj) + return; + + if (obj->methods != &watch_method) + return; + + s = container_of(obj, struct ubus_subscriber, obj); + s->remove_cb(ctx, s, blob_get_u32(attrbuf[UBUS_ATTR_TARGET])); +} + + diff --git a/libubus.c b/libubus.c index bcaf63d..03899d6 100644 --- a/libubus.c +++ b/libubus.c @@ -228,6 +228,10 @@ void __hidden ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr ubus_process_invoke(ctx, hdr); } break; + + case UBUS_MSG_UNSUBSCRIBE: + ubus_process_unsubscribe(ctx, hdr); + break; } } @@ -514,81 +518,6 @@ int ubus_register_event_handler(struct ubus_context *ctx, NULL, NULL, 0); } -enum { - WATCH_ID, - WATCH_NOTIFY, - __WATCH_MAX -}; - -static const struct blobmsg_policy watch_policy[] = { - [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 }, - [WATCH_NOTIFY] = { .name = "notify", .type = BLOBMSG_TYPE_STRING }, -}; - - -static int ubus_watch_cb(struct ubus_context *ctx, struct ubus_object *obj, - struct ubus_request_data *req, - const char *method, struct blob_attr *msg) -{ - struct ubus_watch_object *w; - struct blob_attr *tb[__WATCH_MAX]; - - blobmsg_parse(watch_policy, ARRAY_SIZE(watch_policy), tb, blob_data(msg), blob_len(msg)); - - if (!tb[WATCH_ID] || !tb[WATCH_NOTIFY]) - return UBUS_STATUS_INVALID_ARGUMENT; - - if (req->peer) - return UBUS_STATUS_INVALID_ARGUMENT; - - w = container_of(obj, struct ubus_watch_object, obj); - w->cb(ctx, w, blobmsg_get_u32(tb[WATCH_ID])); - return 0; -} - -static const struct ubus_method watch_method = { - .name = NULL, - .handler = ubus_watch_cb, -}; - -int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *w_obj) -{ - struct ubus_object *obj = &w_obj->obj; - - obj->methods = &watch_method; - obj->n_methods = 1; - - return ubus_add_object(ctx, obj); -} - -static int -__ubus_watch_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type) -{ - struct ubus_request req; - - blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id); - blob_put_int32(&b, UBUS_ATTR_TARGET, id); - if (method) - blob_put_string(&b, UBUS_ATTR_METHOD, method); - - if (ubus_start_request(ctx, &req, b.head, type, 0) < 0) - return UBUS_STATUS_INVALID_ARGUMENT; - - return ubus_complete_request(ctx, &req, 0); - -} - -int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) -{ - return __ubus_watch_request(ctx, &obj->obj, id, "event", UBUS_MSG_ADD_WATCH); -} - -int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) -{ - return __ubus_watch_request(ctx, &obj->obj, id, NULL, UBUS_MSG_REMOVE_WATCH); -} - int ubus_send_event(struct ubus_context *ctx, const char *id, struct blob_attr *data) { diff --git a/libubus.h b/libubus.h index 59d981e..17c4952 100644 --- a/libubus.h +++ b/libubus.h @@ -29,7 +29,7 @@ struct ubus_request; struct ubus_request_data; struct ubus_object_data; struct ubus_event_handler; -struct ubus_watch_object; +struct ubus_subscriber; typedef void (*ubus_lookup_handler_t)(struct ubus_context *ctx, struct ubus_object_data *obj, @@ -37,10 +37,10 @@ typedef void (*ubus_lookup_handler_t)(struct ubus_context *ctx, typedef int (*ubus_handler_t)(struct ubus_context *ctx, struct ubus_object *obj, struct ubus_request_data *req, const char *method, struct blob_attr *msg); +typedef void (*ubus_remove_handler_t)(struct ubus_context *ctx, + struct ubus_subscriber *obj, uint32_t id); typedef void (*ubus_event_handler_t)(struct ubus_context *ctx, struct ubus_event_handler *ev, const char *type, struct blob_attr *msg); -typedef void (*ubus_watch_handler_t)(struct ubus_context *ctx, struct ubus_watch_object *w, - uint32_t id); typedef void (*ubus_data_handler_t)(struct ubus_request *req, int type, struct blob_attr *msg); typedef void (*ubus_complete_handler_t)(struct ubus_request *req, int ret); @@ -90,10 +90,11 @@ struct ubus_object { int n_methods; }; -struct ubus_watch_object { +struct ubus_subscriber { struct ubus_object obj; - ubus_watch_handler_t cb; + ubus_handler_t cb; + ubus_remove_handler_t remove_cb; }; struct ubus_event_handler { @@ -199,12 +200,10 @@ int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj); /* remove the object from the ubus connection */ int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj); -/* add an object for watching other object state changes */ -int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *obj); - -int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id); - -int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id); +/* add a subscriber notifications from another object */ +int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *obj); +int ubus_subscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id); +int ubus_unsubscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id); /* ----------- rpc ----------- */ diff --git a/ubusd_obj.c b/ubusd_obj.c index 7ae9b5f..8b1b18f 100644 --- a/ubusd_obj.c +++ b/ubusd_obj.c @@ -112,8 +112,8 @@ struct ubus_object *ubusd_create_object_internal(struct ubus_object_type *type, obj->type = type; INIT_LIST_HEAD(&obj->list); INIT_LIST_HEAD(&obj->events); - INIT_LIST_HEAD(&obj->watchers); - INIT_LIST_HEAD(&obj->watched); + INIT_LIST_HEAD(&obj->subscribers); + INIT_LIST_HEAD(&obj->target_list); if (type) type->refcount++; @@ -164,37 +164,37 @@ free: return NULL; } -void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method) +void ubus_subscribe(struct ubus_object *obj, struct ubus_object *target, const char *method) { - struct ubus_watch *w; + struct ubus_subscription *s; - w = calloc(1, sizeof(*w) + strlen(method) + 1); - if (!w) + s = calloc(1, sizeof(*s) + strlen(method) + 1); + if (!s) return; - w->watcher = obj; - w->watched = target; - list_add(&w->watcher_list, &target->watchers); - list_add(&w->watched_list, &obj->watched); - strcpy(w->method, method); + s->subscriber = obj; + s->target = target; + list_add(&s->list, &target->subscribers); + list_add(&s->target_list, &obj->target_list); + strcpy(s->method, method); } -void ubus_watch_free(struct ubus_watch *w) +void ubus_unsubscribe(struct ubus_subscription *s) { - list_del(&w->watcher_list); - list_del(&w->watched_list); - free(w); + list_del(&s->list); + list_del(&s->target_list); + free(s); } void ubusd_free_object(struct ubus_object *obj) { - struct ubus_watch *w, *tmp; + struct ubus_subscription *s, *tmp; - list_for_each_entry_safe(w, tmp, &obj->watched, watched_list) { - ubus_watch_free(w); + list_for_each_entry_safe(s, tmp, &obj->target_list, target_list) { + ubus_unsubscribe(s); } - list_for_each_entry_safe(w, tmp, &obj->watchers, watcher_list) { - ubus_proto_notify_watch(w); + list_for_each_entry_safe(s, tmp, &obj->subscribers, list) { + ubus_notify_unsubscribe(s); } ubusd_event_cleanup_object(obj); diff --git a/ubusd_obj.h b/ubusd_obj.h index d0573f0..5a82e8d 100644 --- a/ubusd_obj.h +++ b/ubusd_obj.h @@ -35,9 +35,9 @@ struct ubus_method { struct blob_attr data[]; }; -struct ubus_watch { - struct list_head watcher_list, watched_list; - struct ubus_object *watcher, *watched; +struct ubus_subscription { + struct list_head list, target_list; + struct ubus_object *subscriber, *target; char method[]; }; @@ -47,7 +47,7 @@ struct ubus_object { struct list_head events; - struct list_head watchers, watched; + struct list_head subscribers, target_list; struct ubus_object_type *type; struct avl_node path; @@ -76,8 +76,8 @@ static inline struct ubus_object *ubusd_find_object(uint32_t objid) return obj; } -void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method); -void ubus_watch_free(struct ubus_watch *w); -void ubus_proto_notify_watch(struct ubus_watch *w); +void ubus_subscribe(struct ubus_object *obj, struct ubus_object *target, const char *method); +void ubus_unsubscribe(struct ubus_subscription *s); +void ubus_notify_unsubscribe(struct ubus_subscription *s); #endif diff --git a/ubusd_proto.c b/ubusd_proto.c index f461e47..59920ed 100644 --- a/ubusd_proto.c +++ b/ubusd_proto.c @@ -308,14 +308,14 @@ static int ubusd_handle_add_watch(struct ubus_client *cl, struct ubus_msg_buf *u if (cl == target->client) return UBUS_STATUS_INVALID_ARGUMENT; - ubus_watch_new(obj, target, blob_data(attr[UBUS_ATTR_METHOD])); + ubus_subscribe(obj, target, blob_data(attr[UBUS_ATTR_METHOD])); return 0; } static int ubusd_handle_remove_watch(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr) { struct ubus_object *obj; - struct ubus_watch *w; + struct ubus_subscription *s; uint32_t id; if (!attr[UBUS_ATTR_OBJID] || !attr[UBUS_ATTR_TARGET]) @@ -329,11 +329,11 @@ static int ubusd_handle_remove_watch(struct ubus_client *cl, struct ubus_msg_buf return UBUS_STATUS_INVALID_ARGUMENT; id = blob_get_u32(attr[UBUS_ATTR_TARGET]); - list_for_each_entry(w, &obj->watched, watched_list) { - if (w->watched->id.id != id) + list_for_each_entry(s, &obj->target_list, target_list) { + if (s->target->id.id != id) continue; - ubus_watch_free(w); + ubus_unsubscribe(s); return 0; } @@ -348,8 +348,8 @@ static const ubus_cmd_cb handlers[__UBUS_MSG_LAST] = { [UBUS_MSG_INVOKE] = ubusd_handle_invoke, [UBUS_MSG_STATUS] = ubusd_handle_response, [UBUS_MSG_DATA] = ubusd_handle_response, - [UBUS_MSG_ADD_WATCH] = ubusd_handle_add_watch, - [UBUS_MSG_REMOVE_WATCH] = ubusd_handle_remove_watch, + [UBUS_MSG_SUBSCRIBE] = ubusd_handle_add_watch, + [UBUS_MSG_UNSUBSCRIBE] = ubusd_handle_remove_watch, }; void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub) @@ -416,26 +416,19 @@ void ubusd_proto_free_client(struct ubus_client *cl) ubus_free_id(&clients, &cl->id); } -void ubus_proto_notify_watch(struct ubus_watch *w) +void ubus_notify_unsubscribe(struct ubus_subscription *s) { struct ubus_msg_buf *ub; - void *data; blob_buf_init(&b, 0); - blob_put_int32(&b, UBUS_ATTR_OBJID, w->watcher->id.id); - blob_put_string(&b, UBUS_ATTR_METHOD, w->method); - - data = blob_nest_start(&b, UBUS_ATTR_DATA); - blobmsg_add_string(&b, "notify", "remove"); - blobmsg_add_u32(&b, "id", w->watched->id.id); - blobmsg_add_u32(&b, "peer", w->watched->client->id.id); - blob_nest_end(&b, data); + blob_put_int32(&b, UBUS_ATTR_OBJID, s->subscriber->id.id); + blob_put_int32(&b, UBUS_ATTR_TARGET, s->target->id.id); ub = ubus_msg_from_blob(false); - ubus_msg_init(ub, UBUS_MSG_INVOKE, ++w->watcher->invoke_seq, 0); - ubus_msg_send(w->watcher->client, ub, true); + ubus_msg_init(ub, UBUS_MSG_UNSUBSCRIBE, ++s->subscriber->invoke_seq, 0); + ubus_msg_send(s->subscriber->client, ub, true); - ubus_watch_free(w); + ubus_unsubscribe(s); } static void __init ubusd_proto_init(void) diff --git a/ubusmsg.h b/ubusmsg.h index 10f84db..833d7bf 100644 --- a/ubusmsg.h +++ b/ubusmsg.h @@ -54,9 +54,13 @@ enum ubus_msg_type { UBUS_MSG_ADD_OBJECT, UBUS_MSG_REMOVE_OBJECT, - /* watch an object, notify on remove */ - UBUS_MSG_ADD_WATCH, - UBUS_MSG_REMOVE_WATCH, + /* + * subscribe/unsubscribe to object notifications + * The unsubscribe message is sent from ubusd when + * the object disappears + */ + UBUS_MSG_SUBSCRIBE, + UBUS_MSG_UNSUBSCRIBE, /* must be last */ __UBUS_MSG_LAST,