From 97a8ae1c84ce8a361afb1081a60008377c7bf93d Mon Sep 17 00:00:00 2001 From: Daniel Barlow Date: Tue, 23 Apr 2024 20:15:02 +0100 Subject: [PATCH] devout: add event loop and main `run` function --- pkgs/devout/default.nix | 4 +- pkgs/devout/devout.fnl | 82 ++++++++++++++++++++++++++++++++++++++++- pkgs/devout/test.fnl | 49 +++++++++++++++++++++++- 3 files changed, 130 insertions(+), 5 deletions(-) diff --git a/pkgs/devout/default.nix b/pkgs/devout/default.nix index df204bb..04a36b2 100644 --- a/pkgs/devout/default.nix +++ b/pkgs/devout/default.nix @@ -6,6 +6,7 @@ , fennel , stdenv , fennelrepl +, minisock }: stdenv.mkDerivation { name = "devout"; @@ -14,11 +15,12 @@ stdenv.mkDerivation { installPhase = '' mkdir -p $out/bin cp -p ${writeFennel "devout" { - packages = [fennel anoia nellie lua.pkgs.luafilesystem]; + packages = [fennel anoia nellie lua.pkgs.luafilesystem minisock]; mainFunction = "run"; } ./devout.fnl} $out/bin/devout ''; checkPhase = '' + LUA_CPATH=${minisock}/lib/lua/5.3/?.so\;$LUA_CPATH \ fennelrepl ./test.fnl ''; doCheck = true; diff --git a/pkgs/devout/devout.fnl b/pkgs/devout/devout.fnl index ea34409..b9ef9e4 100644 --- a/pkgs/devout/devout.fnl +++ b/pkgs/devout/devout.fnl @@ -1,3 +1,9 @@ +(local sock (require :minisock)) +(local { : view } (require :fennel)) + +(fn trace [expr] + (doto expr (print :TRACE (view expr)))) + (fn parse-uevent [s] (let [at (string.find s "@" 1 true) (nl nxt) (string.find s "\0" 1 true)] @@ -42,8 +48,80 @@ :subscribe (fn [_ id callback terms] (tset subscribers id {: callback : terms })) :unsubscribe (fn [_ id] (tset subscribers id nil)) - })) +;; #define POLLIN 0x0001 +;; #define POLLPRI 0x0002 +;; #define POLLOUT 0x0004 +;; #define POLLERR 0x0008 +;; #define POLLHUP 0x0010 +;; #define POLLNVAL 0x0020 -{ : database } +(fn unix-socket [name] + (let [addr (.. "\1\0" name "\0\0\0\0\0") + (sock err) (sock.bind addr)] + (assert sock err))) + +(fn pollfds-for [fds] + (table.concat (icollect [_ v (ipairs fds)] (string.pack "iHH" v 1 0)))) + +(fn unpack-pollfds [pollfds] + (var i 1) + (let [fds {}] + (while (< i (# pollfds)) + (let [(fd _ revents i_) (string.unpack "iHH" pollfds i)] + (if (> revents 0) (tset fds fd revents)) + (set i i_))) + fds)) + +(fn parse-terms [str] + (print :terms str) + (collect [n (string.gmatch str "([^ ]+)")] + (string.match n "(.-)=(.+)"))) + +(fn handle-client [db client] + (match (trace (sock.read client)) + "" (do + (db:unsubscribe client) + false) + s (do + (db:subscribe + client + (fn [e] + (sock.write client (view e))) + (parse-terms s)) + true) + (nil err) (do (print err) false))) + +(fn event-loop [] + (let [fds {}] + { + :register #(tset fds $2 $3) + :feed (fn [_ revents] + (each [fd revent (pairs revents)] + (when (not ((. fds fd) fd)) + (tset fds fd nil) + (sock.close fd)))) + :fds #(icollect [fd _ (pairs fds)] fd) + :_tbl #(do fds) ;exposed for tests + })) + +(fn run [] + (let [[sockname] arg + s (unix-socket sockname) + db (database) + loop (event-loop)] + (loop:register + s + #(match (sock.accept s) + (client addr) + (do + (loop:register client (partial handle-client db)) + true))) + (while true + (let [pollfds (pollfds-for (loop:fds)) + (rpollfds numfds) (sock.poll pollfds 1000)] + (when (> numfds 0) + (loop:feed (unpack-pollfds rpollfds))))))) + +{ : database : run : event-loop } diff --git a/pkgs/devout/test.fnl b/pkgs/devout/test.fnl index 2daff4d..a9f988c 100644 --- a/pkgs/devout/test.fnl +++ b/pkgs/devout/test.fnl @@ -1,6 +1,7 @@ -(local { : database } (require :devout)) +(local { : database : event-loop } (require :devout)) (local { : view } (require :fennel)) -(import-macros { : expect= } :anoia.assert) +(local sock (require :minisock)) +(import-macros { : expect : expect= } :anoia.assert) (var failed false) (fn fail [d msg] (set failed true) (print :FAIL d (.. "\n" msg))) @@ -137,4 +138,48 @@ MINOR=17") (db:add sdb1-remove) (expect= (# received) 0))) + +;;; test for event loop + +(example + "I can register a fd with a callback" + (let [loop (event-loop) + cb #(print $1)] + (loop:register 3 cb) + (expect= (. (loop:_tbl) 3) cb))) + +(example + "when the fd is ready, my callback is called" + (let [loop (event-loop)] + (var ran? false) + (loop:register 3 #(set ran? true)) + (loop:feed {3 1}) + (expect= ran? true) + )) + +(example + "when the callback returns true it remains registered" + (let [loop (event-loop)] + (loop:register 3 #true) + (loop:feed {3 1}) + (expect (. (loop:_tbl) 3)) + )) + +(fn new-fd [] + (let [fd (sock.bind (.. "\1\0" "/tmp/test-socket" "\0\0\0\0\0"))] + (os.remove "/tmp/test-socket") + fd)) + +(example + "when the callback returns false it is unregistered and the fd is closed" + (let [loop (event-loop) + fd (new-fd)] + (expect (> fd 2)) + (loop:register 3 #false) + (loop:feed {3 1}) + (expect (not (. (loop:_tbl) 3))) + (assert (not (os.execute (string.format "test -e /dev/fd/%d" fd)))) + )) + + (if failed (os.exit 1) (print "OK"))