From cb6c1718e711db77f7b6a06c8bc51fe392509bbc Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Thu, 9 May 2013 01:04:49 +0200 Subject: [PATCH] runqueue: add a simple task queueing/completion tracking implementation Signed-off-by: Felix Fietkau --- CMakeLists.txt | 2 +- examples/CMakeLists.txt | 3 + examples/runqueue-example.c | 112 +++++++++++++++ runqueue.c | 267 ++++++++++++++++++++++++++++++++++++ runqueue.h | 113 +++++++++++++++ 5 files changed, 496 insertions(+), 1 deletion(-) create mode 100644 examples/runqueue-example.c create mode 100644 runqueue.c create mode 100644 runqueue.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 015b8bd..bd3e2b5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,7 +19,7 @@ IF(JSONC_FOUND) INCLUDE_DIRECTORIES(${JSONC_INCLUDE_DIRS}) ENDIF() -SET(SOURCES avl.c avl-cmp.c blob.c blobmsg.c uloop.c usock.c ustream.c ustream-fd.c vlist.c utils.c safe_list.c) +SET(SOURCES avl.c avl-cmp.c blob.c blobmsg.c uloop.c usock.c ustream.c ustream-fd.c vlist.c utils.c safe_list.c runqueue.c) ADD_LIBRARY(ubox SHARED ${SOURCES}) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 8727f35..51b97df 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -17,3 +17,6 @@ TARGET_LINK_LIBRARIES(blobmsg-example ubox blobmsg_json json) ADD_EXECUTABLE(ustream-example ustream-example.c) TARGET_LINK_LIBRARIES(ustream-example ubox) +ADD_EXECUTABLE(runqueue-example runqueue-example.c) +TARGET_LINK_LIBRARIES(runqueue-example ubox) + diff --git a/examples/runqueue-example.c b/examples/runqueue-example.c new file mode 100644 index 0000000..727463f --- /dev/null +++ b/examples/runqueue-example.c @@ -0,0 +1,112 @@ +/* + * runqueue-example.c + * + * Copyright (C) 2013 Felix Fietkau + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include +#include +#include +#include + +#include "runqueue.h" + +static struct runqueue q; + +struct sleeper { + struct runqueue_process proc; + int val; +}; + +static void q_empty(struct runqueue *q) +{ + fprintf(stderr, "All done!\n"); + uloop_end(); +} + +static void q_sleep_run(struct runqueue *q, struct runqueue_task *t) +{ + struct sleeper *s = container_of(t, struct sleeper, proc.task); + char str[32]; + pid_t pid; + + fprintf(stderr, "[%d/%d] start 'sleep %d'\n", q->running_tasks, q->max_running_tasks, s->val); + + pid = fork(); + if (pid < 0) + return; + + if (pid) { + runqueue_process_add(q, &s->proc, pid); + return; + } + + sprintf(str, "%d", s->val); + execlp("sleep", "sleep", str, NULL); + exit(1); +} + +static void q_sleep_cancel(struct runqueue *q, struct runqueue_task *t, int type) +{ + struct sleeper *s = container_of(t, struct sleeper, proc.task); + + fprintf(stderr, "[%d/%d] cancel 'sleep %d'\n", q->running_tasks, q->max_running_tasks, s->val); + runqueue_process_cancel_cb(q, t, type); +} + +static void q_sleep_complete(struct runqueue *q, struct runqueue_process *p, int ret) +{ + struct sleeper *s = container_of(p, struct sleeper, proc); + + fprintf(stderr, "[%d/%d] finish 'sleep %d'\n", q->running_tasks, q->max_running_tasks, s->val); + free(s); +} + +static void add_sleeper(int val) +{ + static const struct runqueue_task_type sleeper_type = { + .run = q_sleep_run, + .cancel = q_sleep_cancel, + .kill = runqueue_process_kill_cb, + }; + struct sleeper *s; + + s = calloc(1, sizeof(*s)); + s->proc.task.type = &sleeper_type; + s->proc.task.run_timeout = 500; + s->proc.complete = q_sleep_complete; + s->val = val; + runqueue_task_add(&q, &s->proc.task, false); +} + +int main(int argc, char **argv) +{ + uloop_init(); + + runqueue_init(&q); + q.empty_cb = q_empty; + q.max_running_tasks = 1; + + if (argc > 1) + q.max_running_tasks = atoi(argv[1]); + + add_sleeper(1); + add_sleeper(1); + add_sleeper(1); + uloop_run(); + uloop_done(); + + return 0; +} diff --git a/runqueue.c b/runqueue.c new file mode 100644 index 0000000..5cc37bb --- /dev/null +++ b/runqueue.c @@ -0,0 +1,267 @@ +/* + * runqueue.c - a simple task queueing/completion tracking helper + * + * Copyright (C) 2013 Felix Fietkau + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include +#include +#include "runqueue.h" + +static void +__runqueue_empty_cb(struct uloop_timeout *timeout) +{ + struct runqueue *q = container_of(timeout, struct runqueue, timeout); + + q->empty_cb(q); +} + +void runqueue_init(struct runqueue *q) +{ + INIT_SAFE_LIST(&q->tasks_active); + INIT_SAFE_LIST(&q->tasks_inactive); +} + +static void __runqueue_start_next(struct uloop_timeout *timeout) +{ + struct runqueue *q = container_of(timeout, struct runqueue, timeout); + struct runqueue_task *t; + + do { + if (q->stopped) + break; + + if (list_empty(&q->tasks_inactive.list)) + break; + + if (q->max_running_tasks && q->running_tasks >= q->max_running_tasks) + break; + + t = list_first_entry(&q->tasks_inactive.list, struct runqueue_task, list.list); + safe_list_del(&t->list); + safe_list_add(&t->list, &q->tasks_active); + t->running = true; + q->running_tasks++; + if (t->run_timeout) + uloop_timeout_set(&t->timeout, t->run_timeout); + t->type->run(q, t); + } while (1); + + if (!q->empty && + list_empty(&q->tasks_active.list) && + list_empty(&q->tasks_inactive.list)) { + q->empty = true; + if (q->empty_cb) { + q->timeout.cb = __runqueue_empty_cb; + uloop_timeout_set(&q->timeout, 1); + } + } +} + +static void runqueue_start_next(struct runqueue *q) +{ + if (q->empty) + return; + + q->timeout.cb = __runqueue_start_next; + uloop_timeout_set(&q->timeout, 1); +} + +static int __runqueue_cancel(void *ctx, struct safe_list *list) +{ + struct runqueue_task *t; + + t = container_of(list, struct runqueue_task, list); + runqueue_task_cancel(t, 0); + + return 0; +} + +void runqueue_cancel_active(struct runqueue *q) +{ + safe_list_for_each(&q->tasks_active, __runqueue_cancel, NULL); +} + +void runqueue_cancel_pending(struct runqueue *q) +{ + safe_list_for_each(&q->tasks_inactive, __runqueue_cancel, NULL); +} + +void runqueue_cancel(struct runqueue *q) +{ + runqueue_cancel_pending(q); + runqueue_cancel_active(q); +} + +void runqueue_kill(struct runqueue *q) +{ + struct runqueue_task *t; + + while (!list_empty(&q->tasks_active.list)) { + t = list_first_entry(&q->tasks_active.list, struct runqueue_task, list.list); + runqueue_task_kill(t); + } + runqueue_cancel_pending(q); + uloop_timeout_cancel(&q->timeout); +} + +void runqueue_task_cancel(struct runqueue_task *t, int type) +{ + if (!t->queued) + return; + + if (!t->running) { + runqueue_task_complete(t); + return; + } + + t->cancelled = true; + if (t->cancel_timeout) + uloop_timeout_set(&t->timeout, t->cancel_timeout); + if (t->type->cancel) + t->type->cancel(t->q, t, type); +} + +static void +__runqueue_task_timeout(struct uloop_timeout *timeout) +{ + struct runqueue_task *t = container_of(timeout, struct runqueue_task, timeout); + + if (t->cancelled) + runqueue_task_kill(t); + else + runqueue_task_cancel(t, t->cancel_type); +} + +void runqueue_task_add(struct runqueue *q, struct runqueue_task *t, bool running) +{ + struct safe_list *head; + + if (t->queued) + return; + + if (!t->type->run && !running) { + fprintf(stderr, "BUG: inactive task added without run() callback\n"); + return; + } + + if (running) { + q->running_tasks++; + head = &q->tasks_active; + } else { + head = &q->tasks_inactive; + } + + t->timeout.cb = __runqueue_task_timeout; + t->q = q; + safe_list_add(&t->list, head); + t->cancelled = false; + t->queued = true; + t->running = running; + q->empty = false; + + runqueue_start_next(q); +} + +void runqueue_task_kill(struct runqueue_task *t) +{ + struct runqueue *q = t->q; + bool running = t->running; + + if (!t->queued) + return; + + runqueue_task_complete(t); + if (running && t->type->kill) + t->type->kill(q, t); + + runqueue_start_next(q); +} + +void runqueue_stop(struct runqueue *q) +{ + q->stopped = true; +} + +void runqueue_resume(struct runqueue *q) +{ + q->stopped = false; + runqueue_start_next(q); +} + +void runqueue_task_complete(struct runqueue_task *t) +{ + if (!t->queued) + return; + + if (t->running) + t->q->running_tasks--; + + safe_list_del(&t->list); + t->queued = false; + t->running = false; + t->cancelled = false; + runqueue_start_next(t->q); +} + +static void +__runqueue_proc_cb(struct uloop_process *p, int ret) +{ + struct runqueue_process *t = container_of(p, struct runqueue_process, proc); + struct runqueue *q = t->task.q; + + runqueue_task_complete(&t->task); + if (t->complete) + t->complete(q, t, ret); +} + +void runqueue_process_cancel_cb(struct runqueue *q, struct runqueue_task *t, int type) +{ + struct runqueue_process *p = container_of(t, struct runqueue_process, task); + + if (!type) + type = SIGTERM; + + kill(p->proc.pid, type); +} + +void runqueue_process_kill_cb(struct runqueue *q, struct runqueue_task *t) +{ + struct runqueue_process *p = container_of(t, struct runqueue_process, task); + + uloop_process_delete(&p->proc); + kill(p->proc.pid, SIGKILL); + __runqueue_proc_cb(&p->proc, -1); +} + +static const struct runqueue_task_type runqueue_proc_type = { + .name = "process", + .cancel = runqueue_process_cancel_cb, + .kill = runqueue_process_kill_cb, +}; + +void runqueue_process_add(struct runqueue *q, struct runqueue_process *p, pid_t pid) +{ + if (p->proc.pending) + return; + + p->proc.pid = pid; + p->proc.cb = __runqueue_proc_cb; + if (!p->task.type) + p->task.type = &runqueue_proc_type; + uloop_process_add(&p->proc); + if (!p->task.running) + runqueue_task_add(q, &p->task, true); +} diff --git a/runqueue.h b/runqueue.h new file mode 100644 index 0000000..ad64f3d --- /dev/null +++ b/runqueue.h @@ -0,0 +1,113 @@ +/* + * runqueue.c - a simple task queueing/completion tracking helper + * + * Copyright (C) 2013 Felix Fietkau + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef __LIBUBOX_RUNQUEUE_H +#define __LIBUBOX_RUNQUEUE_H + +#include +#include +#include + +struct runqueue; +struct runqueue_task; +struct runqueue_task_type; + +struct runqueue { + struct safe_list tasks_active; + struct safe_list tasks_inactive; + struct uloop_timeout timeout; + + int running_tasks; + int max_running_tasks; + bool stopped; + bool empty; + + /* called when the runqueue is emptied */ + void (*empty_cb)(struct runqueue *q); +}; + +struct runqueue_task_type { + const char *name; + + /* + * called when a task is requested to run + * + * The task is removed from the list before this callback is run. It + * can re-arm itself using runqueue_task_add. + */ + void (*run)(struct runqueue *q, struct runqueue_task *t); + + /* + * called to request cancelling a task + * + * int type is used as an optional hint for the method to be used when + * cancelling the task, e.g. a signal number for processes. Calls + * runqueue_task_complete when done. + */ + void (*cancel)(struct runqueue *q, struct runqueue_task *t, int type); + + /* + * called to kill a task. must not make any calls to runqueue_task_complete, + * it has already been removed from the list. + */ + void (*kill)(struct runqueue *q, struct runqueue_task *t); +}; + +struct runqueue_task { + struct safe_list list; + const struct runqueue_task_type *type; + struct runqueue *q; + + struct uloop_timeout timeout; + int run_timeout; + int cancel_timeout; + int cancel_type; + + bool queued; + bool running; + bool cancelled; +}; + +struct runqueue_process { + struct runqueue_task task; + struct uloop_process proc; + void (*complete)(struct runqueue *q, struct runqueue_process *p, int ret); +}; + +void runqueue_init(struct runqueue *q); +void runqueue_cancel(struct runqueue *q); +void runqueue_cancel_active(struct runqueue *q); +void runqueue_cancel_pending(struct runqueue *q); +void runqueue_kill(struct runqueue *q); + +void runqueue_stop(struct runqueue *q); +void runqueue_resume(struct runqueue *q); + +void runqueue_task_add(struct runqueue *q, struct runqueue_task *t, bool running); +void runqueue_task_complete(struct runqueue_task *t); + +void runqueue_task_cancel(struct runqueue_task *t, int type); +void runqueue_task_kill(struct runqueue_task *t); + +void runqueue_process_add(struct runqueue *q, struct runqueue_process *p, pid_t pid); + +/* to be used only from runqueue_process callbacks */ +void runqueue_process_cancel_cb(struct runqueue *q, struct runqueue_task *t, int type); +void runqueue_process_kill_cb(struct runqueue *q, struct runqueue_task *t); + +#endif