watch add/remove -> subscribe/unsubscribe:

rename the ADD_WATCH/REMOVE_WATCH messages to SUBSCRIBE/UNSUBSCRIBE and change
the message format and libubus API in preparation for adding object notifications

Signed-off-by: Felix Fietkau <nbd@openwrt.org>
This commit is contained in:
Felix Fietkau 2012-12-13 18:44:15 +01:00
parent 43d6047c40
commit d366a6de83
10 changed files with 164 additions and 143 deletions

View file

@ -12,7 +12,7 @@ IF(APPLE)
LINK_DIRECTORIES(/opt/local/lib) LINK_DIRECTORIES(/opt/local/lib)
ENDIF() ENDIF()
ADD_LIBRARY(ubus SHARED libubus.c libubus-io.c libubus-obj.c) ADD_LIBRARY(ubus SHARED libubus.c libubus-io.c libubus-obj.c libubus-sub.c)
TARGET_LINK_LIBRARIES(ubus ubox) TARGET_LINK_LIBRARIES(ubus ubox)
ADD_EXECUTABLE(ubusd ubusd.c ubusd_id.c ubusd_obj.c ubusd_proto.c ubusd_event.c) ADD_EXECUTABLE(ubusd ubusd.c ubusd_id.c ubusd_obj.c ubusd_proto.c ubusd_event.c)

View file

@ -16,7 +16,7 @@
#include "libubus.h" #include "libubus.h"
static struct ubus_context *ctx; static struct ubus_context *ctx;
static struct ubus_watch_object test_event; static struct ubus_subscriber test_event;
static struct blob_buf b; static struct blob_buf b;
enum { enum {
@ -79,8 +79,9 @@ static const struct blobmsg_policy watch_policy[__WATCH_MAX] = {
[WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 }, [WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 },
}; };
static void test_handle_event(struct ubus_context *ctx, struct ubus_watch_object *w, static void
uint32_t id) test_handle_remove(struct ubus_context *ctx, struct ubus_subscriber *s,
uint32_t id)
{ {
fprintf(stderr, "Object %08x went away\n", id); fprintf(stderr, "Object %08x went away\n", id);
} }
@ -96,8 +97,8 @@ static int test_watch(struct ubus_context *ctx, struct ubus_object *obj,
if (!tb[WATCH_ID]) if (!tb[WATCH_ID])
return UBUS_STATUS_INVALID_ARGUMENT; return UBUS_STATUS_INVALID_ARGUMENT;
test_event.cb = test_handle_event; test_event.remove_cb = test_handle_remove;
ret = ubus_watch_object_add(ctx, &test_event, blobmsg_get_u32(tb[WATCH_ID])); ret = ubus_subscribe(ctx, &test_event, blobmsg_get_u32(tb[WATCH_ID]));
fprintf(stderr, "Watching object %08x: %s\n", blobmsg_get_u32(tb[WATCH_ID]), ubus_strerror(ret)); fprintf(stderr, "Watching object %08x: %s\n", blobmsg_get_u32(tb[WATCH_ID]), ubus_strerror(ret));
return ret; return ret;
} }
@ -125,7 +126,7 @@ static void server_main(void)
if (ret) if (ret)
fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret)); fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret));
ret = ubus_register_watch_object(ctx, &test_event); ret = ubus_register_subscriber(ctx, &test_event);
if (ret) if (ret)
fprintf(stderr, "Failed to add watch handler: %s\n", ubus_strerror(ret)); fprintf(stderr, "Failed to add watch handler: %s\n", ubus_strerror(ret));

View file

@ -26,5 +26,6 @@ void ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr);
void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr); void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr);
int __hidden ubus_start_request(struct ubus_context *ctx, struct ubus_request *req, int __hidden ubus_start_request(struct ubus_context *ctx, struct ubus_request *req,
struct blob_attr *msg, int cmd, uint32_t peer); struct blob_attr *msg, int cmd, uint32_t peer);
void ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr);
#endif #endif

94
libubus-sub.c Normal file
View file

@ -0,0 +1,94 @@
/*
* Copyright (C) 2011-2012 Felix Fietkau <nbd@openwrt.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License version 2.1
* as published by the Free Software Foundation
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/
#include "libubus.h"
#include "libubus-internal.h"
static int ubus_subscriber_cb(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req,
const char *method, struct blob_attr *msg)
{
struct ubus_subscriber *s;
s = container_of(obj, struct ubus_subscriber, obj);
s->cb(ctx, obj, req, method, msg);
return 0;
}
static const struct ubus_method watch_method = {
.name = NULL,
.handler = ubus_subscriber_cb,
};
int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *s)
{
struct ubus_object *obj = &s->obj;
obj->methods = &watch_method;
obj->n_methods = 1;
return ubus_add_object(ctx, obj);
}
static int
__ubus_subscribe_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type)
{
struct ubus_request req;
blob_buf_init(&b, 0);
blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
blob_put_int32(&b, UBUS_ATTR_TARGET, id);
if (method)
blob_put_string(&b, UBUS_ATTR_METHOD, method);
if (ubus_start_request(ctx, &req, b.head, type, 0) < 0)
return UBUS_STATUS_INVALID_ARGUMENT;
return ubus_complete_request(ctx, &req, 0);
}
int ubus_subscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id)
{
return __ubus_subscribe_request(ctx, &obj->obj, id, "event", UBUS_MSG_SUBSCRIBE);
}
int ubus_unsubscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id)
{
return __ubus_subscribe_request(ctx, &obj->obj, id, NULL, UBUS_MSG_UNSUBSCRIBE);
}
void __hidden ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr)
{
struct ubus_subscriber *s;
struct blob_attr **attrbuf;
struct ubus_object *obj;
uint32_t objid;
attrbuf = ubus_parse_msg(hdr->data);
if (!attrbuf[UBUS_ATTR_OBJID] || !attrbuf[UBUS_ATTR_TARGET])
return;
objid = blob_get_u32(attrbuf[UBUS_ATTR_OBJID]);
obj = avl_find_element(&ctx->objects, &objid, obj, avl);
if (!obj)
return;
if (obj->methods != &watch_method)
return;
s = container_of(obj, struct ubus_subscriber, obj);
s->remove_cb(ctx, s, blob_get_u32(attrbuf[UBUS_ATTR_TARGET]));
}

View file

@ -228,6 +228,10 @@ void __hidden ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr
ubus_process_invoke(ctx, hdr); ubus_process_invoke(ctx, hdr);
} }
break; break;
case UBUS_MSG_UNSUBSCRIBE:
ubus_process_unsubscribe(ctx, hdr);
break;
} }
} }
@ -514,81 +518,6 @@ int ubus_register_event_handler(struct ubus_context *ctx,
NULL, NULL, 0); NULL, NULL, 0);
} }
enum {
WATCH_ID,
WATCH_NOTIFY,
__WATCH_MAX
};
static const struct blobmsg_policy watch_policy[] = {
[WATCH_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 },
[WATCH_NOTIFY] = { .name = "notify", .type = BLOBMSG_TYPE_STRING },
};
static int ubus_watch_cb(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req,
const char *method, struct blob_attr *msg)
{
struct ubus_watch_object *w;
struct blob_attr *tb[__WATCH_MAX];
blobmsg_parse(watch_policy, ARRAY_SIZE(watch_policy), tb, blob_data(msg), blob_len(msg));
if (!tb[WATCH_ID] || !tb[WATCH_NOTIFY])
return UBUS_STATUS_INVALID_ARGUMENT;
if (req->peer)
return UBUS_STATUS_INVALID_ARGUMENT;
w = container_of(obj, struct ubus_watch_object, obj);
w->cb(ctx, w, blobmsg_get_u32(tb[WATCH_ID]));
return 0;
}
static const struct ubus_method watch_method = {
.name = NULL,
.handler = ubus_watch_cb,
};
int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *w_obj)
{
struct ubus_object *obj = &w_obj->obj;
obj->methods = &watch_method;
obj->n_methods = 1;
return ubus_add_object(ctx, obj);
}
static int
__ubus_watch_request(struct ubus_context *ctx, struct ubus_object *obj, uint32_t id, const char *method, int type)
{
struct ubus_request req;
blob_buf_init(&b, 0);
blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
blob_put_int32(&b, UBUS_ATTR_TARGET, id);
if (method)
blob_put_string(&b, UBUS_ATTR_METHOD, method);
if (ubus_start_request(ctx, &req, b.head, type, 0) < 0)
return UBUS_STATUS_INVALID_ARGUMENT;
return ubus_complete_request(ctx, &req, 0);
}
int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id)
{
return __ubus_watch_request(ctx, &obj->obj, id, "event", UBUS_MSG_ADD_WATCH);
}
int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id)
{
return __ubus_watch_request(ctx, &obj->obj, id, NULL, UBUS_MSG_REMOVE_WATCH);
}
int ubus_send_event(struct ubus_context *ctx, const char *id, int ubus_send_event(struct ubus_context *ctx, const char *id,
struct blob_attr *data) struct blob_attr *data)
{ {

View file

@ -29,7 +29,7 @@ struct ubus_request;
struct ubus_request_data; struct ubus_request_data;
struct ubus_object_data; struct ubus_object_data;
struct ubus_event_handler; struct ubus_event_handler;
struct ubus_watch_object; struct ubus_subscriber;
typedef void (*ubus_lookup_handler_t)(struct ubus_context *ctx, typedef void (*ubus_lookup_handler_t)(struct ubus_context *ctx,
struct ubus_object_data *obj, struct ubus_object_data *obj,
@ -37,10 +37,10 @@ typedef void (*ubus_lookup_handler_t)(struct ubus_context *ctx,
typedef int (*ubus_handler_t)(struct ubus_context *ctx, struct ubus_object *obj, typedef int (*ubus_handler_t)(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req, struct ubus_request_data *req,
const char *method, struct blob_attr *msg); const char *method, struct blob_attr *msg);
typedef void (*ubus_remove_handler_t)(struct ubus_context *ctx,
struct ubus_subscriber *obj, uint32_t id);
typedef void (*ubus_event_handler_t)(struct ubus_context *ctx, struct ubus_event_handler *ev, typedef void (*ubus_event_handler_t)(struct ubus_context *ctx, struct ubus_event_handler *ev,
const char *type, struct blob_attr *msg); const char *type, struct blob_attr *msg);
typedef void (*ubus_watch_handler_t)(struct ubus_context *ctx, struct ubus_watch_object *w,
uint32_t id);
typedef void (*ubus_data_handler_t)(struct ubus_request *req, typedef void (*ubus_data_handler_t)(struct ubus_request *req,
int type, struct blob_attr *msg); int type, struct blob_attr *msg);
typedef void (*ubus_complete_handler_t)(struct ubus_request *req, int ret); typedef void (*ubus_complete_handler_t)(struct ubus_request *req, int ret);
@ -90,10 +90,11 @@ struct ubus_object {
int n_methods; int n_methods;
}; };
struct ubus_watch_object { struct ubus_subscriber {
struct ubus_object obj; struct ubus_object obj;
ubus_watch_handler_t cb; ubus_handler_t cb;
ubus_remove_handler_t remove_cb;
}; };
struct ubus_event_handler { struct ubus_event_handler {
@ -199,12 +200,10 @@ int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj);
/* remove the object from the ubus connection */ /* remove the object from the ubus connection */
int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj); int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj);
/* add an object for watching other object state changes */ /* add a subscriber notifications from another object */
int ubus_register_watch_object(struct ubus_context *ctx, struct ubus_watch_object *obj); int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *obj);
int ubus_subscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id);
int ubus_watch_object_add(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id); int ubus_unsubscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id);
int ubus_watch_object_remove(struct ubus_context *ctx, struct ubus_watch_object *obj, uint32_t id);
/* ----------- rpc ----------- */ /* ----------- rpc ----------- */

View file

@ -112,8 +112,8 @@ struct ubus_object *ubusd_create_object_internal(struct ubus_object_type *type,
obj->type = type; obj->type = type;
INIT_LIST_HEAD(&obj->list); INIT_LIST_HEAD(&obj->list);
INIT_LIST_HEAD(&obj->events); INIT_LIST_HEAD(&obj->events);
INIT_LIST_HEAD(&obj->watchers); INIT_LIST_HEAD(&obj->subscribers);
INIT_LIST_HEAD(&obj->watched); INIT_LIST_HEAD(&obj->target_list);
if (type) if (type)
type->refcount++; type->refcount++;
@ -164,37 +164,37 @@ free:
return NULL; return NULL;
} }
void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method) void ubus_subscribe(struct ubus_object *obj, struct ubus_object *target, const char *method)
{ {
struct ubus_watch *w; struct ubus_subscription *s;
w = calloc(1, sizeof(*w) + strlen(method) + 1); s = calloc(1, sizeof(*s) + strlen(method) + 1);
if (!w) if (!s)
return; return;
w->watcher = obj; s->subscriber = obj;
w->watched = target; s->target = target;
list_add(&w->watcher_list, &target->watchers); list_add(&s->list, &target->subscribers);
list_add(&w->watched_list, &obj->watched); list_add(&s->target_list, &obj->target_list);
strcpy(w->method, method); strcpy(s->method, method);
} }
void ubus_watch_free(struct ubus_watch *w) void ubus_unsubscribe(struct ubus_subscription *s)
{ {
list_del(&w->watcher_list); list_del(&s->list);
list_del(&w->watched_list); list_del(&s->target_list);
free(w); free(s);
} }
void ubusd_free_object(struct ubus_object *obj) void ubusd_free_object(struct ubus_object *obj)
{ {
struct ubus_watch *w, *tmp; struct ubus_subscription *s, *tmp;
list_for_each_entry_safe(w, tmp, &obj->watched, watched_list) { list_for_each_entry_safe(s, tmp, &obj->target_list, target_list) {
ubus_watch_free(w); ubus_unsubscribe(s);
} }
list_for_each_entry_safe(w, tmp, &obj->watchers, watcher_list) { list_for_each_entry_safe(s, tmp, &obj->subscribers, list) {
ubus_proto_notify_watch(w); ubus_notify_unsubscribe(s);
} }
ubusd_event_cleanup_object(obj); ubusd_event_cleanup_object(obj);

View file

@ -35,9 +35,9 @@ struct ubus_method {
struct blob_attr data[]; struct blob_attr data[];
}; };
struct ubus_watch { struct ubus_subscription {
struct list_head watcher_list, watched_list; struct list_head list, target_list;
struct ubus_object *watcher, *watched; struct ubus_object *subscriber, *target;
char method[]; char method[];
}; };
@ -47,7 +47,7 @@ struct ubus_object {
struct list_head events; struct list_head events;
struct list_head watchers, watched; struct list_head subscribers, target_list;
struct ubus_object_type *type; struct ubus_object_type *type;
struct avl_node path; struct avl_node path;
@ -76,8 +76,8 @@ static inline struct ubus_object *ubusd_find_object(uint32_t objid)
return obj; return obj;
} }
void ubus_watch_new(struct ubus_object *obj, struct ubus_object *target, const char *method); void ubus_subscribe(struct ubus_object *obj, struct ubus_object *target, const char *method);
void ubus_watch_free(struct ubus_watch *w); void ubus_unsubscribe(struct ubus_subscription *s);
void ubus_proto_notify_watch(struct ubus_watch *w); void ubus_notify_unsubscribe(struct ubus_subscription *s);
#endif #endif

View file

@ -308,14 +308,14 @@ static int ubusd_handle_add_watch(struct ubus_client *cl, struct ubus_msg_buf *u
if (cl == target->client) if (cl == target->client)
return UBUS_STATUS_INVALID_ARGUMENT; return UBUS_STATUS_INVALID_ARGUMENT;
ubus_watch_new(obj, target, blob_data(attr[UBUS_ATTR_METHOD])); ubus_subscribe(obj, target, blob_data(attr[UBUS_ATTR_METHOD]));
return 0; return 0;
} }
static int ubusd_handle_remove_watch(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr) static int ubusd_handle_remove_watch(struct ubus_client *cl, struct ubus_msg_buf *ub, struct blob_attr **attr)
{ {
struct ubus_object *obj; struct ubus_object *obj;
struct ubus_watch *w; struct ubus_subscription *s;
uint32_t id; uint32_t id;
if (!attr[UBUS_ATTR_OBJID] || !attr[UBUS_ATTR_TARGET]) if (!attr[UBUS_ATTR_OBJID] || !attr[UBUS_ATTR_TARGET])
@ -329,11 +329,11 @@ static int ubusd_handle_remove_watch(struct ubus_client *cl, struct ubus_msg_buf
return UBUS_STATUS_INVALID_ARGUMENT; return UBUS_STATUS_INVALID_ARGUMENT;
id = blob_get_u32(attr[UBUS_ATTR_TARGET]); id = blob_get_u32(attr[UBUS_ATTR_TARGET]);
list_for_each_entry(w, &obj->watched, watched_list) { list_for_each_entry(s, &obj->target_list, target_list) {
if (w->watched->id.id != id) if (s->target->id.id != id)
continue; continue;
ubus_watch_free(w); ubus_unsubscribe(s);
return 0; return 0;
} }
@ -348,8 +348,8 @@ static const ubus_cmd_cb handlers[__UBUS_MSG_LAST] = {
[UBUS_MSG_INVOKE] = ubusd_handle_invoke, [UBUS_MSG_INVOKE] = ubusd_handle_invoke,
[UBUS_MSG_STATUS] = ubusd_handle_response, [UBUS_MSG_STATUS] = ubusd_handle_response,
[UBUS_MSG_DATA] = ubusd_handle_response, [UBUS_MSG_DATA] = ubusd_handle_response,
[UBUS_MSG_ADD_WATCH] = ubusd_handle_add_watch, [UBUS_MSG_SUBSCRIBE] = ubusd_handle_add_watch,
[UBUS_MSG_REMOVE_WATCH] = ubusd_handle_remove_watch, [UBUS_MSG_UNSUBSCRIBE] = ubusd_handle_remove_watch,
}; };
void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub) void ubusd_proto_receive_message(struct ubus_client *cl, struct ubus_msg_buf *ub)
@ -416,26 +416,19 @@ void ubusd_proto_free_client(struct ubus_client *cl)
ubus_free_id(&clients, &cl->id); ubus_free_id(&clients, &cl->id);
} }
void ubus_proto_notify_watch(struct ubus_watch *w) void ubus_notify_unsubscribe(struct ubus_subscription *s)
{ {
struct ubus_msg_buf *ub; struct ubus_msg_buf *ub;
void *data;
blob_buf_init(&b, 0); blob_buf_init(&b, 0);
blob_put_int32(&b, UBUS_ATTR_OBJID, w->watcher->id.id); blob_put_int32(&b, UBUS_ATTR_OBJID, s->subscriber->id.id);
blob_put_string(&b, UBUS_ATTR_METHOD, w->method); blob_put_int32(&b, UBUS_ATTR_TARGET, s->target->id.id);
data = blob_nest_start(&b, UBUS_ATTR_DATA);
blobmsg_add_string(&b, "notify", "remove");
blobmsg_add_u32(&b, "id", w->watched->id.id);
blobmsg_add_u32(&b, "peer", w->watched->client->id.id);
blob_nest_end(&b, data);
ub = ubus_msg_from_blob(false); ub = ubus_msg_from_blob(false);
ubus_msg_init(ub, UBUS_MSG_INVOKE, ++w->watcher->invoke_seq, 0); ubus_msg_init(ub, UBUS_MSG_UNSUBSCRIBE, ++s->subscriber->invoke_seq, 0);
ubus_msg_send(w->watcher->client, ub, true); ubus_msg_send(s->subscriber->client, ub, true);
ubus_watch_free(w); ubus_unsubscribe(s);
} }
static void __init ubusd_proto_init(void) static void __init ubusd_proto_init(void)

View file

@ -54,9 +54,13 @@ enum ubus_msg_type {
UBUS_MSG_ADD_OBJECT, UBUS_MSG_ADD_OBJECT,
UBUS_MSG_REMOVE_OBJECT, UBUS_MSG_REMOVE_OBJECT,
/* watch an object, notify on remove */ /*
UBUS_MSG_ADD_WATCH, * subscribe/unsubscribe to object notifications
UBUS_MSG_REMOVE_WATCH, * The unsubscribe message is sent from ubusd when
* the object disappears
*/
UBUS_MSG_SUBSCRIBE,
UBUS_MSG_UNSUBSCRIBE,
/* must be last */ /* must be last */
__UBUS_MSG_LAST, __UBUS_MSG_LAST,