From c6f705451527e6e5e8f5a2715f11478eeb8799e4 Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Sat, 19 May 2012 21:09:35 +0200 Subject: [PATCH] ubusd: add support for watching objects to get notified when they go away --- libubus.c | 87 +++++++++++++++++++++++++++++++++++++++++++++------ libubus.h | 16 ++++++++++ ubusd_obj.c | 33 +++++++++++++++++++ ubusd_obj.h | 13 ++++++++ ubusd_proto.c | 79 ++++++++++++++++++++++++++++++++++++++++++++++ ubusmsg.h | 5 +++ 6 files changed, 223 insertions(+), 10 deletions(-) diff --git a/libubus.c b/libubus.c index 4502f24..590a0fa 100644 --- a/libubus.c +++ b/libubus.c @@ -709,7 +709,7 @@ static bool ubus_push_object_type(const struct ubus_object_type *type) return true; } -static int __ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) +int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) { struct ubus_request req; int ret; @@ -740,14 +740,6 @@ static int __ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) return 0; } -int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) -{ - if (!obj->name || !obj->type) - return UBUS_STATUS_INVALID_ARGUMENT; - - return __ubus_add_object(ctx, obj); -} - static void ubus_remove_object_cb(struct ubus_request *req, int type, struct blob_attr *msg) { struct ubus_object *obj = req->priv; @@ -819,7 +811,7 @@ int ubus_register_event_handler(struct ubus_context *ctx, if (!!obj->name ^ !!obj->type) return UBUS_STATUS_INVALID_ARGUMENT; - ret = __ubus_add_object(ctx, obj); + ret = ubus_add_object(ctx, obj); if (ret) return ret; } @@ -835,6 +827,81 @@ int ubus_register_event_handler(struct ubus_context *ctx, NULL, NULL, 0); } +enum { + WATCH_ID, + WATCH_NOTIFY, + __WATCH_MAX +}; + +static const struct blobmsg_policy watch_policy[] = { + [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 }, + [WATCH_NOTIFY] = { .name = "notify", .type = BLOBMSG_TYPE_STRING }, +}; + + +static int ubus_watch_cb(struct ubus_context *ctx, struct ubus_object *obj, + struct ubus_request_data *req, + const char *method, struct blob_attr *msg) +{ + struct ubus_watch_object *w; + struct blob_attr *tb[__WATCH_MAX]; + + blobmsg_parse(watch_policy, ARRAY_SIZE(watch_policy), tb, blob_data(msg), blob_len(msg)); + + if (!tb[WATCH_ID] || !tb[WATCH_NOTIFY]) + return UBUS_STATUS_INVALID_ARGUMENT; + + if (req->peer) + return UBUS_STATUS_INVALID_ARGUMENT; + + w = container_of(obj, struct ubus_watch_object, obj); + w->cb(ctx, w, blobmsg_get_u32(tb[WATCH_ID])); + return 0; +} + +static const struct ubus_method watch_method = { + .name = NULL, + .handler = ubus_watch_cb, +}; + +int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *w_obj) +{ + struct ubus_object *obj = &w_obj->obj; + + obj->methods = &watch_method; + obj->n_methods = 1; + + return ubus_add_object(ctx, obj); +} + +static int +__ubus_watch_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type) +{ + struct ubus_request req; + + blob_buf_init(&b, 0); + blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id); + blob_put_int32(&b, UBUS_ATTR_TARGET, id); + if (method) + blob_put_string(&b, UBUS_ATTR_METHOD, method); + + if (ubus_start_request(ctx, &req, b.head, type, 0) < 0) + return UBUS_STATUS_INVALID_ARGUMENT; + + return ubus_complete_request(ctx, &req, 0); + +} + +int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) +{ + return __ubus_watch_request(ctx, &obj->obj, id, "event", UBUS_MSG_ADD_WATCH); +} + +int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id) +{ + return __ubus_watch_request(ctx, &obj->obj, id, NULL, UBUS_MSG_REMOVE_WATCH); +} + int ubus_send_event(struct ubus_context *ctx, const char *id, struct blob_attr *data) { diff --git a/libubus.h b/libubus.h index e0e418b..13e4c03 100644 --- a/libubus.h +++ b/libubus.h @@ -26,6 +26,7 @@ struct ubus_request; struct ubus_request_data; struct ubus_object_data; struct ubus_event_handler; +struct ubus_watch_object; typedef void (*ubus_lookup_handler_t)(struct ubus_context *ctx, struct ubus_object_data *obj, @@ -35,6 +36,8 @@ typedef int (*ubus_handler_t)(struct ubus_context *ctx, struct ubus_object *obj, const char *method, struct blob_attr *msg); typedef void (*ubus_event_handler_t)(struct ubus_context *ctx, struct ubus_event_handler *ev, const char *type, struct blob_attr *msg); +typedef void (*ubus_watch_handler_t)(struct ubus_context *ctx, struct ubus_watch_object *w, + uint32_t id); typedef void (*ubus_data_handler_t)(struct ubus_request *req, int type, struct blob_attr *msg); typedef void (*ubus_complete_handler_t)(struct ubus_request *req, int ret); @@ -84,6 +87,12 @@ struct ubus_object { int n_methods; }; +struct ubus_watch_object { + struct ubus_object obj; + + ubus_watch_handler_t cb; +}; + struct ubus_event_handler { struct ubus_object obj; @@ -185,6 +194,13 @@ int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj); /* remove the object from the ubus connection */ int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj); +/* add an object for watching other object state changes */ +int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *obj); + +int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id); + +int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id); + /* ----------- rpc ----------- */ /* invoke a method on a specific object */ diff --git a/ubusd_obj.c b/ubusd_obj.c index 2248a08..7ae9b5f 100644 --- a/ubusd_obj.c +++ b/ubusd_obj.c @@ -112,6 +112,8 @@ struct ubus_object *ubusd_create_object_internal(struct ubus_object_type *type, obj->type = type; INIT_LIST_HEAD(&obj->list); INIT_LIST_HEAD(&obj->events); + INIT_LIST_HEAD(&obj->watchers); + INIT_LIST_HEAD(&obj->watched); if (type) type->refcount++; @@ -162,8 +164,39 @@ free: return NULL; } +void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method) +{ + struct ubus_watch *w; + + w = calloc(1, sizeof(*w) + strlen(method) + 1); + if (!w) + return; + + w->watcher = obj; + w->watched = target; + list_add(&w->watcher_list, &target->watchers); + list_add(&w->watched_list, &obj->watched); + strcpy(w->method, method); +} + +void ubus_watch_free(struct ubus_watch *w) +{ + list_del(&w->watcher_list); + list_del(&w->watched_list); + free(w); +} + void ubusd_free_object(struct ubus_object *obj) { + struct ubus_watch *w, *tmp; + + list_for_each_entry_safe(w, tmp, &obj->watched, watched_list) { + ubus_watch_free(w); + } + list_for_each_entry_safe(w, tmp, &obj->watchers, watcher_list) { + ubus_proto_notify_watch(w); + } + ubusd_event_cleanup_object(obj); if (obj->path.key) { ubusd_send_obj_event(obj, false); diff --git a/ubusd_obj.h b/ubusd_obj.h index 83bc366..d0573f0 100644 --- a/ubusd_obj.h +++ b/ubusd_obj.h @@ -35,12 +35,20 @@ struct ubus_method { struct blob_attr data[]; }; +struct ubus_watch { + struct list_head watcher_list, watched_list; + struct ubus_object *watcher, *watched; + char method[]; +}; + struct ubus_object { struct ubus_id id; struct list_head list; struct list_head events; + struct list_head watchers, watched; + struct ubus_object_type *type; struct avl_node path; @@ -48,6 +56,7 @@ struct ubus_object { int (*recv_msg)(struct ubus_client *client, const char *method, struct blob_attr *msg); int event_seen; + unsigned int invoke_seq; }; struct ubus_object *ubusd_create_object(struct ubus_client *cl, struct blob_attr **attr); @@ -67,4 +76,8 @@ static inline struct ubus_object *ubusd_find_object(uint32_t objid) return obj; } +void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method); +void ubus_watch_free(struct ubus_watch *w); +void ubus_proto_notify_watch(struct ubus_watch *w); + #endif diff --git a/ubusd_proto.c b/ubusd_proto.c index d49ef48..f461e47 100644 --- a/ubusd_proto.c +++ b/ubusd_proto.c @@ -285,6 +285,61 @@ error: return -1; } +static int ubusd_handle_add_watch(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr) +{ + struct ubus_object *obj, *target; + + if (!attr[UBUS_ATTR_OBJID] || !attr[UBUS_ATTR_TARGET] || + !attr[UBUS_ATTR_METHOD]) { + return UBUS_STATUS_INVALID_ARGUMENT; + } + + obj = ubusd_find_object(blob_get_u32(attr[UBUS_ATTR_OBJID])); + if (!obj) + return UBUS_STATUS_NOT_FOUND; + + if (cl != obj->client) + return UBUS_STATUS_INVALID_ARGUMENT; + + target = ubusd_find_object(blob_get_u32(attr[UBUS_ATTR_TARGET])); + if (!target) + return UBUS_STATUS_NOT_FOUND; + + if (cl == target->client) + return UBUS_STATUS_INVALID_ARGUMENT; + + ubus_watch_new(obj, target, blob_data(attr[UBUS_ATTR_METHOD])); + return 0; +} + +static int ubusd_handle_remove_watch(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr) +{ + struct ubus_object *obj; + struct ubus_watch *w; + uint32_t id; + + if (!attr[UBUS_ATTR_OBJID] || !attr[UBUS_ATTR_TARGET]) + return UBUS_STATUS_INVALID_ARGUMENT; + + obj = ubusd_find_object(blob_get_u32(attr[UBUS_ATTR_OBJID])); + if (!obj) + return UBUS_STATUS_NOT_FOUND; + + if (cl != obj->client) + return UBUS_STATUS_INVALID_ARGUMENT; + + id = blob_get_u32(attr[UBUS_ATTR_TARGET]); + list_for_each_entry(w, &obj->watched, watched_list) { + if (w->watched->id.id != id) + continue; + + ubus_watch_free(w); + return 0; + } + + return UBUS_STATUS_NOT_FOUND; +} + static const ubus_cmd_cb handlers[__UBUS_MSG_LAST] = { [UBUS_MSG_PING] = ubusd_send_pong, [UBUS_MSG_ADD_OBJECT] = ubusd_handle_add_object, @@ -293,6 +348,8 @@ static const ubus_cmd_cb handlers[__UBUS_MSG_LAST] = { [UBUS_MSG_INVOKE] = ubusd_handle_invoke, [UBUS_MSG_STATUS] = ubusd_handle_response, [UBUS_MSG_DATA] = ubusd_handle_response, + [UBUS_MSG_ADD_WATCH] = ubusd_handle_add_watch, + [UBUS_MSG_REMOVE_WATCH] = ubusd_handle_remove_watch, }; void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub) @@ -359,6 +416,28 @@ void ubusd_proto_free_client(struct ubus_client *cl) ubus_free_id(&clients, &cl->id); } +void ubus_proto_notify_watch(struct ubus_watch *w) +{ + struct ubus_msg_buf *ub; + void *data; + + blob_buf_init(&b, 0); + blob_put_int32(&b, UBUS_ATTR_OBJID, w->watcher->id.id); + blob_put_string(&b, UBUS_ATTR_METHOD, w->method); + + data = blob_nest_start(&b, UBUS_ATTR_DATA); + blobmsg_add_string(&b, "notify", "remove"); + blobmsg_add_u32(&b, "id", w->watched->id.id); + blobmsg_add_u32(&b, "peer", w->watched->client->id.id); + blob_nest_end(&b, data); + + ub = ubus_msg_from_blob(false); + ubus_msg_init(ub, UBUS_MSG_INVOKE, ++w->watcher->invoke_seq, 0); + ubus_msg_send(w->watcher->client, ub, true); + + ubus_watch_free(w); +} + static void __init ubusd_proto_init(void) { ubus_init_id_tree(&clients); diff --git a/ubusmsg.h b/ubusmsg.h index a0681e1..607ecd4 100644 --- a/ubusmsg.h +++ b/ubusmsg.h @@ -54,6 +54,10 @@ enum ubus_msg_type { UBUS_MSG_ADD_OBJECT, UBUS_MSG_REMOVE_OBJECT, + /* watch an object, notify on remove */ + UBUS_MSG_ADD_WATCH, + UBUS_MSG_REMOVE_WATCH, + /* must be last */ __UBUS_MSG_LAST, }; @@ -71,6 +75,7 @@ enum ubus_msg_attr { UBUS_ATTR_SIGNATURE, UBUS_ATTR_DATA, + UBUS_ATTR_TARGET, /* must be last */ UBUS_ATTR_MAX,