diff --git a/examples/uloop-example.lua b/examples/uloop-example.lua index 511b9ea..f3aef60 100755 --- a/examples/uloop-example.lua +++ b/examples/uloop-example.lua @@ -24,6 +24,23 @@ uloop.timer(function() print("2000 ms timer run"); end, 2000) -- timer example 3 (will never run) uloop.timer(function() print("3000 ms timer run"); end, 3000):cancel() +-- periodic interval timer +local intv +intv = uloop.interval(function() + print(string.format("Interval expiration #%d - %dms until next expiration", + intv:expirations(), intv:remaining())) + + -- after 5 expirations, lower interval to 500ms + if intv:expirations() >= 5 then + intv:set(500) + end + + -- cancel after 10 expirations + if intv:expirations() >= 10 then + intv:cancel() + end +end, 1000) + -- process function p1(r) print("Process 1 completed") diff --git a/lua/uloop.c b/lua/uloop.c index 7e9ed10..45c9bc7 100644 --- a/lua/uloop.c +++ b/lua/uloop.c @@ -41,6 +41,11 @@ struct lua_uloop_process { int r; }; +struct lua_uloop_interval { + struct uloop_interval i; + int r; +}; + static lua_State *state; static void * @@ -382,6 +387,112 @@ static int ul_process(lua_State *L) return 1; } +static void ul_interval_cb(struct uloop_interval *i) +{ + struct lua_uloop_interval *intv = container_of(i, struct lua_uloop_interval, i); + + lua_getglobal(state, "__uloop_cb"); + lua_rawgeti(state, -1, intv->r); + lua_remove(state, -2); + + lua_call(state, 0, 0); +} + +static int ul_interval_set(lua_State *L) +{ + struct lua_uloop_interval *intv; + double set; + + if (!lua_isnumber(L, -1)) { + lua_pushstring(L, "invalid arg list"); + lua_error(L); + + return 0; + } + + set = lua_tointeger(L, -1); + intv = lua_touserdata(L, 1); + uloop_interval_set(&intv->i, set); + + return 1; +} + +static int ul_interval_expirations(lua_State *L) +{ + struct lua_uloop_interval *intv; + + intv = lua_touserdata(L, 1); + lua_pushinteger(L, intv->i.expirations); + return 1; +} + +static int ul_interval_remaining(lua_State *L) +{ + struct lua_uloop_interval *intv; + + intv = lua_touserdata(L, 1); + lua_pushnumber(L, uloop_interval_remaining(&intv->i)); + return 1; +} + +static int ul_interval_free(lua_State *L) +{ + struct lua_uloop_interval *intv = lua_touserdata(L, 1); + + uloop_interval_cancel(&intv->i); + + /* obj.__index.__gc = nil , make sure executing only once*/ + lua_getfield(L, -1, "__index"); + lua_pushstring(L, "__gc"); + lua_pushnil(L); + lua_settable(L, -3); + + lua_getglobal(state, "__uloop_cb"); + luaL_unref(state, -1, intv->r); + + return 1; +} + +static const luaL_Reg interval_m[] = { + { "set", ul_interval_set }, + { "expirations", ul_interval_expirations }, + { "remaining", ul_interval_remaining }, + { "cancel", ul_interval_free }, + { NULL, NULL } +}; + +static int ul_interval(lua_State *L) +{ + struct lua_uloop_interval *intv; + unsigned int set = 0; + int ref; + + if (lua_isnumber(L, -1)) { + set = lua_tointeger(L, -1); + lua_pop(L, 1); + } + + if (!lua_isfunction(L, -1)) { + lua_pushstring(L, "invalid arg list"); + lua_error(L); + + return 0; + } + + lua_getglobal(L, "__uloop_cb"); + lua_pushvalue(L, -2); + ref = luaL_ref(L, -2); + + intv = ul_create_userdata(L, sizeof(*intv), interval_m, ul_interval_free); + intv->r = ref; + intv->i.cb = ul_interval_cb; + + if (set) + uloop_interval_set(&intv->i, set); + + return 1; +} + static int ul_init(lua_State *L) { uloop_init(); @@ -410,6 +521,7 @@ static luaL_reg uloop_func[] = { {"timer", ul_timer}, {"process", ul_process}, {"fd_add", ul_ufd_add}, + {"interval", ul_interval}, {"cancel", ul_end}, {NULL, NULL}, }; diff --git a/uloop-epoll.c b/uloop-epoll.c index 70e45e4..cf5733f 100644 --- a/uloop-epoll.c +++ b/uloop-epoll.c @@ -104,3 +104,83 @@ static int uloop_fetch_events(int timeout) return nfds; } + +static void dispatch_timer(struct uloop_fd *u, unsigned int events) +{ + if (!(events & ULOOP_READ)) + return; + + uint64_t fired; + + if (read(u->fd, &fired, sizeof(fired)) != sizeof(fired)) + return; + + struct uloop_interval *tm = container_of(u, struct uloop_interval, private.ufd); + + tm->expirations += fired; + tm->cb(tm); +} + +static int timer_register(struct uloop_interval *tm, unsigned int msecs) +{ + if (!tm->private.ufd.registered) { + int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC|TFD_NONBLOCK); + + if (fd == -1) + return -1; + + tm->private.ufd.fd = fd; + tm->private.ufd.cb = dispatch_timer; + } + + struct itimerspec spec = { + .it_value = { + .tv_sec = msecs / 1000, + .tv_nsec = (msecs % 1000) * 1000000 + }, + .it_interval = { + .tv_sec = msecs / 1000, + .tv_nsec = (msecs % 1000) * 1000000 + } + }; + + if (timerfd_settime(tm->private.ufd.fd, 0, &spec, NULL) == -1) + goto err; + + if (uloop_fd_add(&tm->private.ufd, ULOOP_READ) == -1) + goto err; + + return 0; + +err: + uloop_fd_delete(&tm->private.ufd); + close(tm->private.ufd.fd); + memset(&tm->private.ufd, 0, sizeof(tm->private.ufd)); + + return -1; +} + +static int timer_remove(struct uloop_interval *tm) +{ + int ret = __uloop_fd_delete(&tm->private.ufd); + + if (ret == 0) { + close(tm->private.ufd.fd); + memset(&tm->private.ufd, 0, sizeof(tm->private.ufd)); + } + + return ret; +} + +static int64_t timer_next(struct uloop_interval *tm) +{ + struct itimerspec spec; + + if (!tm->private.ufd.registered) + return -1; + + if (timerfd_gettime(tm->private.ufd.fd, &spec) == -1) + return -1; + + return spec.it_value.tv_sec * 1000 + spec.it_value.tv_nsec / 1000000; +} diff --git a/uloop-kqueue.c b/uloop-kqueue.c index c1275b0..a48cca0 100644 --- a/uloop-kqueue.c +++ b/uloop-kqueue.c @@ -103,6 +103,23 @@ static int __uloop_fd_delete(struct uloop_fd *fd) return register_poll(fd, 0); } +static int64_t get_timestamp_us(void) +{ +#ifdef CLOCK_MONOTONIC + struct timespec ts = { 0, 0 }; + + clock_gettime(CLOCK_MONOTONIC, &ts); + + return ts.tv_sec * 1000000 + ts.tv_nsec / 1000; +#else + struct timeval tv = { 0, 0 }; + + gettimeofday(&tv, NULL); + + return tv.tv_sec * 1000000 + tv.tv_usec; +#endif +} + static int uloop_fetch_events(int timeout) { struct timespec ts; @@ -115,6 +132,16 @@ static int uloop_fetch_events(int timeout) nfds = kevent(poll_fd, NULL, 0, events, ARRAY_SIZE(events), timeout >= 0 ? &ts : NULL); for (n = 0; n < nfds; n++) { + if (events[n].filter == EVFILT_TIMER) { + struct uloop_interval *tm = events[n].udata; + + tm->private.time.fired = get_timestamp_us(); + tm->expirations += events[n].data; + tm->cb(tm); + + continue; + } + struct uloop_fd_event *cur = &cur_fds[n]; struct uloop_fd *u = events[n].udata; unsigned int ev = 0; @@ -148,3 +175,35 @@ static int uloop_fetch_events(int timeout) } return nfds; } + +static int timer_register(struct uloop_interval *tm, unsigned int msecs) +{ + struct kevent ev; + + tm->private.time.msecs = msecs; + tm->private.time.fired = get_timestamp_us(); + + EV_SET(&ev, (uintptr_t)tm, EVFILT_TIMER, EV_ADD, NOTE_USECONDS, msecs * 1000, tm); + + return kevent(poll_fd, &ev, 1, NULL, 0, NULL); +} + +static int timer_remove(struct uloop_interval *tm) +{ + struct kevent ev; + + EV_SET(&ev, (uintptr_t)tm, EVFILT_TIMER, EV_DELETE, 0, 0, NULL); + + return kevent(poll_fd, &ev, 1, NULL, 0, NULL); +} + +static int64_t timer_next(struct uloop_interval *tm) +{ + int64_t t1 = tm->private.time.fired; + int64_t t2 = get_timestamp_us(); + + while (t1 < t2) + t1 += tm->private.time.msecs * 1000; + + return (t1 - t2) / 1000; +} diff --git a/uloop.c b/uloop.c index 8fc5aee..a3d3712 100644 --- a/uloop.c +++ b/uloop.c @@ -36,6 +36,7 @@ #endif #ifdef USE_EPOLL #include +#include #endif #include @@ -422,6 +423,21 @@ static void uloop_handle_processes(void) } +int uloop_interval_set(struct uloop_interval *timer, unsigned int msecs) +{ + return timer_register(timer, msecs); +} + +int uloop_interval_cancel(struct uloop_interval *timer) +{ + return timer_remove(timer); +} + +int64_t uloop_interval_remaining(struct uloop_interval *timer) +{ + return timer_next(timer); +} + static void uloop_signal_wake(void) { do { diff --git a/uloop.h b/uloop.h index 4fdd43f..b3f268c 100644 --- a/uloop.h +++ b/uloop.h @@ -35,10 +35,12 @@ struct uloop_fd; struct uloop_timeout; struct uloop_process; +struct uloop_interval; typedef void (*uloop_fd_handler)(struct uloop_fd *u, unsigned int events); typedef void (*uloop_timeout_handler)(struct uloop_timeout *t); typedef void (*uloop_process_handler)(struct uloop_process *c, int ret); +typedef void (*uloop_interval_handler)(struct uloop_interval *t); #define ULOOP_READ (1 << 0) #define ULOOP_WRITE (1 << 1) @@ -83,6 +85,20 @@ struct uloop_process pid_t pid; }; +struct uloop_interval +{ + uloop_interval_handler cb; + uint64_t expirations; + + union { + struct uloop_fd ufd; + struct { + int64_t fired; + unsigned int msecs; + } time; + } private; +}; + extern bool uloop_cancelled; extern bool uloop_handle_sigchld; extern uloop_fd_handler uloop_fd_set_cb; @@ -100,6 +116,10 @@ int64_t uloop_timeout_remaining64(struct uloop_timeout *timeout); int uloop_process_add(struct uloop_process *p); int uloop_process_delete(struct uloop_process *p); +int uloop_interval_set(struct uloop_interval *timer, unsigned int msecs); +int uloop_interval_cancel(struct uloop_interval *timer); +int64_t uloop_interval_remaining(struct uloop_interval *timer); + bool uloop_cancelling(void); static inline void uloop_end(void)