forked from DGNum/liminix
devout: add event loop and main run
function
This commit is contained in:
parent
52eb283a26
commit
97a8ae1c84
3 changed files with 130 additions and 5 deletions
|
@ -6,6 +6,7 @@
|
||||||
, fennel
|
, fennel
|
||||||
, stdenv
|
, stdenv
|
||||||
, fennelrepl
|
, fennelrepl
|
||||||
|
, minisock
|
||||||
}:
|
}:
|
||||||
stdenv.mkDerivation {
|
stdenv.mkDerivation {
|
||||||
name = "devout";
|
name = "devout";
|
||||||
|
@ -14,11 +15,12 @@ stdenv.mkDerivation {
|
||||||
installPhase = ''
|
installPhase = ''
|
||||||
mkdir -p $out/bin
|
mkdir -p $out/bin
|
||||||
cp -p ${writeFennel "devout" {
|
cp -p ${writeFennel "devout" {
|
||||||
packages = [fennel anoia nellie lua.pkgs.luafilesystem];
|
packages = [fennel anoia nellie lua.pkgs.luafilesystem minisock];
|
||||||
mainFunction = "run";
|
mainFunction = "run";
|
||||||
} ./devout.fnl} $out/bin/devout
|
} ./devout.fnl} $out/bin/devout
|
||||||
'';
|
'';
|
||||||
checkPhase = ''
|
checkPhase = ''
|
||||||
|
LUA_CPATH=${minisock}/lib/lua/5.3/?.so\;$LUA_CPATH \
|
||||||
fennelrepl ./test.fnl
|
fennelrepl ./test.fnl
|
||||||
'';
|
'';
|
||||||
doCheck = true;
|
doCheck = true;
|
||||||
|
|
|
@ -1,3 +1,9 @@
|
||||||
|
(local sock (require :minisock))
|
||||||
|
(local { : view } (require :fennel))
|
||||||
|
|
||||||
|
(fn trace [expr]
|
||||||
|
(doto expr (print :TRACE (view expr))))
|
||||||
|
|
||||||
(fn parse-uevent [s]
|
(fn parse-uevent [s]
|
||||||
(let [at (string.find s "@" 1 true)
|
(let [at (string.find s "@" 1 true)
|
||||||
(nl nxt) (string.find s "\0" 1 true)]
|
(nl nxt) (string.find s "\0" 1 true)]
|
||||||
|
@ -42,8 +48,80 @@
|
||||||
:subscribe (fn [_ id callback terms]
|
:subscribe (fn [_ id callback terms]
|
||||||
(tset subscribers id {: callback : terms }))
|
(tset subscribers id {: callback : terms }))
|
||||||
:unsubscribe (fn [_ id] (tset subscribers id nil))
|
:unsubscribe (fn [_ id] (tset subscribers id nil))
|
||||||
|
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
;; #define POLLIN 0x0001
|
||||||
|
;; #define POLLPRI 0x0002
|
||||||
|
;; #define POLLOUT 0x0004
|
||||||
|
;; #define POLLERR 0x0008
|
||||||
|
;; #define POLLHUP 0x0010
|
||||||
|
;; #define POLLNVAL 0x0020
|
||||||
|
|
||||||
{ : database }
|
(fn unix-socket [name]
|
||||||
|
(let [addr (.. "\1\0" name "\0\0\0\0\0")
|
||||||
|
(sock err) (sock.bind addr)]
|
||||||
|
(assert sock err)))
|
||||||
|
|
||||||
|
(fn pollfds-for [fds]
|
||||||
|
(table.concat (icollect [_ v (ipairs fds)] (string.pack "iHH" v 1 0))))
|
||||||
|
|
||||||
|
(fn unpack-pollfds [pollfds]
|
||||||
|
(var i 1)
|
||||||
|
(let [fds {}]
|
||||||
|
(while (< i (# pollfds))
|
||||||
|
(let [(fd _ revents i_) (string.unpack "iHH" pollfds i)]
|
||||||
|
(if (> revents 0) (tset fds fd revents))
|
||||||
|
(set i i_)))
|
||||||
|
fds))
|
||||||
|
|
||||||
|
(fn parse-terms [str]
|
||||||
|
(print :terms str)
|
||||||
|
(collect [n (string.gmatch str "([^ ]+)")]
|
||||||
|
(string.match n "(.-)=(.+)")))
|
||||||
|
|
||||||
|
(fn handle-client [db client]
|
||||||
|
(match (trace (sock.read client))
|
||||||
|
"" (do
|
||||||
|
(db:unsubscribe client)
|
||||||
|
false)
|
||||||
|
s (do
|
||||||
|
(db:subscribe
|
||||||
|
client
|
||||||
|
(fn [e]
|
||||||
|
(sock.write client (view e)))
|
||||||
|
(parse-terms s))
|
||||||
|
true)
|
||||||
|
(nil err) (do (print err) false)))
|
||||||
|
|
||||||
|
(fn event-loop []
|
||||||
|
(let [fds {}]
|
||||||
|
{
|
||||||
|
:register #(tset fds $2 $3)
|
||||||
|
:feed (fn [_ revents]
|
||||||
|
(each [fd revent (pairs revents)]
|
||||||
|
(when (not ((. fds fd) fd))
|
||||||
|
(tset fds fd nil)
|
||||||
|
(sock.close fd))))
|
||||||
|
:fds #(icollect [fd _ (pairs fds)] fd)
|
||||||
|
:_tbl #(do fds) ;exposed for tests
|
||||||
|
}))
|
||||||
|
|
||||||
|
(fn run []
|
||||||
|
(let [[sockname] arg
|
||||||
|
s (unix-socket sockname)
|
||||||
|
db (database)
|
||||||
|
loop (event-loop)]
|
||||||
|
(loop:register
|
||||||
|
s
|
||||||
|
#(match (sock.accept s)
|
||||||
|
(client addr)
|
||||||
|
(do
|
||||||
|
(loop:register client (partial handle-client db))
|
||||||
|
true)))
|
||||||
|
(while true
|
||||||
|
(let [pollfds (pollfds-for (loop:fds))
|
||||||
|
(rpollfds numfds) (sock.poll pollfds 1000)]
|
||||||
|
(when (> numfds 0)
|
||||||
|
(loop:feed (unpack-pollfds rpollfds)))))))
|
||||||
|
|
||||||
|
{ : database : run : event-loop }
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
(local { : database } (require :devout))
|
(local { : database : event-loop } (require :devout))
|
||||||
(local { : view } (require :fennel))
|
(local { : view } (require :fennel))
|
||||||
(import-macros { : expect= } :anoia.assert)
|
(local sock (require :minisock))
|
||||||
|
(import-macros { : expect : expect= } :anoia.assert)
|
||||||
|
|
||||||
(var failed false)
|
(var failed false)
|
||||||
(fn fail [d msg] (set failed true) (print :FAIL d (.. "\n" msg)))
|
(fn fail [d msg] (set failed true) (print :FAIL d (.. "\n" msg)))
|
||||||
|
@ -137,4 +138,48 @@ MINOR=17")
|
||||||
(db:add sdb1-remove)
|
(db:add sdb1-remove)
|
||||||
(expect= (# received) 0)))
|
(expect= (# received) 0)))
|
||||||
|
|
||||||
|
|
||||||
|
;;; test for event loop
|
||||||
|
|
||||||
|
(example
|
||||||
|
"I can register a fd with a callback"
|
||||||
|
(let [loop (event-loop)
|
||||||
|
cb #(print $1)]
|
||||||
|
(loop:register 3 cb)
|
||||||
|
(expect= (. (loop:_tbl) 3) cb)))
|
||||||
|
|
||||||
|
(example
|
||||||
|
"when the fd is ready, my callback is called"
|
||||||
|
(let [loop (event-loop)]
|
||||||
|
(var ran? false)
|
||||||
|
(loop:register 3 #(set ran? true))
|
||||||
|
(loop:feed {3 1})
|
||||||
|
(expect= ran? true)
|
||||||
|
))
|
||||||
|
|
||||||
|
(example
|
||||||
|
"when the callback returns true it remains registered"
|
||||||
|
(let [loop (event-loop)]
|
||||||
|
(loop:register 3 #true)
|
||||||
|
(loop:feed {3 1})
|
||||||
|
(expect (. (loop:_tbl) 3))
|
||||||
|
))
|
||||||
|
|
||||||
|
(fn new-fd []
|
||||||
|
(let [fd (sock.bind (.. "\1\0" "/tmp/test-socket" "\0\0\0\0\0"))]
|
||||||
|
(os.remove "/tmp/test-socket")
|
||||||
|
fd))
|
||||||
|
|
||||||
|
(example
|
||||||
|
"when the callback returns false it is unregistered and the fd is closed"
|
||||||
|
(let [loop (event-loop)
|
||||||
|
fd (new-fd)]
|
||||||
|
(expect (> fd 2))
|
||||||
|
(loop:register 3 #false)
|
||||||
|
(loop:feed {3 1})
|
||||||
|
(expect (not (. (loop:_tbl) 3)))
|
||||||
|
(assert (not (os.execute (string.format "test -e /dev/fd/%d" fd))))
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
(if failed (os.exit 1) (print "OK"))
|
(if failed (os.exit 1) (print "OK"))
|
||||||
|
|
Loading…
Reference in a new issue