forked from DGNum/liminix
simplify protocol for watchers of service output directories
Previously: the service wrote a timestamp and the receiver read and parsed it to see if there was new data Now: the service writes and removes a .lock file to prevent the receiver reading partial data. The receiver is responsible for remembering the *previous* state and only updating if it's changed
This commit is contained in:
parent
5532144747
commit
3900683413
2 changed files with 83 additions and 51 deletions
|
@ -1,6 +1,39 @@
|
||||||
|
|
||||||
(local inotify (require :inotify))
|
(local inotify (require :inotify))
|
||||||
|
|
||||||
|
(fn merge [table1 table2]
|
||||||
|
(collect [k v (pairs table2) &into table1]
|
||||||
|
k v))
|
||||||
|
|
||||||
|
(fn split [sep string]
|
||||||
|
(icollect [v (string.gmatch string (.. "([^" sep "]+)"))]
|
||||||
|
v))
|
||||||
|
|
||||||
|
(fn parse-prefix [str]
|
||||||
|
(fn parse-extra [s]
|
||||||
|
(let [out {}]
|
||||||
|
(each [name val (string.gmatch s ",(.-)=([^,]+)")]
|
||||||
|
(tset out name val))
|
||||||
|
out))
|
||||||
|
(let [(prefix len preferred valid extra)
|
||||||
|
(string.match str "(.-)::/(%d+),(%d+),(%d+)(.*)$")]
|
||||||
|
(merge {: prefix : len : preferred : valid} (parse-extra extra))))
|
||||||
|
|
||||||
|
|
||||||
|
;; Format: <prefix>/<length>,preferred,valid[,excluded=<excluded-prefix>/<length>][,class=<prefix class #>]
|
||||||
|
|
||||||
|
;;(parse-prefix "2001:8b0:de3a:40dc::/64,7198,7198")
|
||||||
|
;;(parse-prefix "2001:8b0:de3a:1001::/64,7198,7188,excluded=1/2,thi=10")
|
||||||
|
|
||||||
|
|
||||||
|
(fn file-exists? [name]
|
||||||
|
(match (io.open name :r)
|
||||||
|
f (do (f:close) true)
|
||||||
|
_ false))
|
||||||
|
|
||||||
|
(fn read-line [name]
|
||||||
|
(with-open [f (assert (io.open name :r) (.. "can't open file " name))]
|
||||||
|
(f:read "*l")))
|
||||||
|
|
||||||
(fn watch-fsevents [directory-name]
|
(fn watch-fsevents [directory-name]
|
||||||
(let [handle (inotify.init)]
|
(let [handle (inotify.init)]
|
||||||
(handle:addwatch directory-name
|
(handle:addwatch directory-name
|
||||||
|
@ -13,58 +46,56 @@
|
||||||
inotify.IN_CLOSE_WRITE)
|
inotify.IN_CLOSE_WRITE)
|
||||||
handle))
|
handle))
|
||||||
|
|
||||||
|
(fn watch-directory [pathname]
|
||||||
|
(let [watcher (watch-fsevents pathname)]
|
||||||
|
{
|
||||||
|
:has-file? (fn [_ filename] (file-exists? (.. pathname "/" filename)))
|
||||||
|
:wait-events (fn [] (watcher:read))
|
||||||
|
:ready? (fn [self]
|
||||||
|
(and (self:has-file? "state") (not (self:has-file? ".lock"))))
|
||||||
|
:read-line (fn [_ filename] (read-line (.. pathname "/" filename)))
|
||||||
|
:close #(watcher:close)
|
||||||
|
}))
|
||||||
|
|
||||||
(fn merge [table1 table2]
|
(local bound-states
|
||||||
(collect [k v (pairs table2) &into table1]
|
{ :bound true
|
||||||
k v))
|
:rebound true
|
||||||
|
:informed true
|
||||||
|
:updated true
|
||||||
|
:ra-updated true
|
||||||
|
})
|
||||||
|
|
||||||
(fn parse-extra [s]
|
; (local { : view } (require :fennel))
|
||||||
(let [out {}]
|
|
||||||
(each [name val (string.gmatch s ",(.-)=([^,]+)")]
|
|
||||||
(tset out name val))
|
|
||||||
out))
|
|
||||||
|
|
||||||
(fn parse-prefixes [prefixes]
|
(fn changes [old-prefixes new-prefixes]
|
||||||
(icollect [val (string.gmatch prefixes "([^ ]+)")]
|
(let [added {}
|
||||||
(let [(prefix len preferred valid extra)
|
deleted {}
|
||||||
(string.match val "(.-)::/(%d+),(%d+),(%d+)(.*)$")]
|
old-set (collect [_ v (ipairs old-prefixes)] (values v true))
|
||||||
(merge {: prefix : len : preferred : valid} (parse-extra extra))
|
new-set (collect [_ v (ipairs new-prefixes)] (values v true))]
|
||||||
)))
|
(each [_ prefix (ipairs new-prefixes)]
|
||||||
|
(if (not (. old-set prefix))
|
||||||
|
(table.insert added (parse-prefix prefix))))
|
||||||
|
(each [_ prefix (ipairs old-prefixes)]
|
||||||
|
(if (not (. new-set prefix))
|
||||||
|
(table.insert deleted (parse-prefix prefix))))
|
||||||
|
(values added deleted)))
|
||||||
|
|
||||||
;; Format: <prefix>/<length>,preferred,valid[,excluded=<excluded-prefix>/<length>][,class=<prefix class #>]
|
;;(fn execute [s] (do (print s) true))
|
||||||
|
(fn execute [s] (assert (os.execute s)))
|
||||||
;; (parse-prefixes "2001:8b0:de3a:40dc::/64,7198,7198 2001:8b0:de3a:1001::/64,7198,7188,excluded=1/2,thi=10")
|
|
||||||
|
|
||||||
|
|
||||||
(fn file-exists? [name]
|
|
||||||
(match (io.open name :r)
|
|
||||||
f (do (f:close) true)
|
|
||||||
_ false))
|
|
||||||
|
|
||||||
|
|
||||||
(fn read-line [name]
|
|
||||||
(with-open [f (assert (io.open name :r) (.. "can't open file " name))]
|
|
||||||
(f:read "*l")))
|
|
||||||
|
|
||||||
(var last-update 0)
|
|
||||||
(fn event-time [directory]
|
|
||||||
(if (file-exists? (.. directory "/state"))
|
|
||||||
(tonumber (read-line (.. directory "/last-update")))
|
|
||||||
nil))
|
|
||||||
|
|
||||||
(fn wait-for-update [directory fsevents]
|
|
||||||
(while (<= (or (event-time directory) 0) last-update)
|
|
||||||
(fsevents:read))
|
|
||||||
(set last-update (event-time directory))
|
|
||||||
true)
|
|
||||||
|
|
||||||
(let [[state-directory lan-device] arg
|
(let [[state-directory lan-device] arg
|
||||||
fsevents (watch-fsevents state-directory)]
|
dir (watch-directory state-directory)]
|
||||||
(while (wait-for-update state-directory fsevents)
|
(var prefixes [])
|
||||||
(match (read-line (.. state-directory "/state"))
|
(while true
|
||||||
(where (or :bound :rebound :informed :updated :ra-updated))
|
(while (not (dir:ready?)) (dir:wait-events))
|
||||||
(let [[{ : prefix : len : preferred : valid }]
|
(if (. bound-states (dir:read-line "state"))
|
||||||
(parse-prefixes (read-line (.. state-directory "/prefixes")))]
|
(let [new-prefixes (split " " (dir:read-line "/prefixes"))
|
||||||
(os.execute (.. "ip address add " prefix "::1/" len
|
(added deleted) (changes prefixes new-prefixes)]
|
||||||
" dev " lan-device)))
|
(each [_ p (ipairs added)]
|
||||||
_ (os.exit 1))))
|
(execute
|
||||||
|
(.. "ip address add " p.prefix "::1/" p.len " dev " lan-device)))
|
||||||
|
(each [_ p (ipairs deleted)]
|
||||||
|
(execute
|
||||||
|
(.. "ip address del " p.prefix "::1/" p.len " dev " lan-device)))
|
||||||
|
(set prefixes new-prefixes)))
|
||||||
|
(dir:wait-events)))
|
||||||
|
|
|
@ -56,8 +56,9 @@
|
||||||
"unbound" false
|
"unbound" false
|
||||||
"stopped" false
|
"stopped" false
|
||||||
_ true)]
|
_ true)]
|
||||||
(write-value "last-update" (tostring (os.time)))
|
(write-value ".lock" (tostring (os.time)))
|
||||||
(write-value "ifname" ifname)
|
(write-value "ifname" ifname)
|
||||||
(write-value "state" state)
|
(write-value "state" state)
|
||||||
|
(os.remove (.. state-directory "/.lock"))
|
||||||
(when ready
|
(when ready
|
||||||
(with-open [fd (io.open "/proc/self/fd/10" :w)] (fd:write "\n"))))
|
(with-open [fd (io.open "/proc/self/fd/10" :w)] (fd:write "\n"))))
|
||||||
|
|
Loading…
Reference in a new issue