better event sending logic

This commit is contained in:
catvayor 2024-06-02 15:22:59 +02:00
parent 4f7dda439a
commit e289b8850b
3 changed files with 114 additions and 28 deletions

View file

@ -6,13 +6,16 @@ use rocket::{
response::stream::{Event, EventStream},
serde::Serialize,
tokio::{
select,
time::{self, Duration},
self, select,
time::{self, Duration, Instant},
},
Shutdown, State,
};
use rocket_dyn_templates::{context, Template};
use std::{collections::HashMap, sync::RwLock};
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, RwLock},
};
enum TrackedState {
Conscrit {
@ -130,7 +133,32 @@ fn build_vieux(id: String, name: String) -> Tracked {
}
}
type Tracking = RwLock<HashMap<String, RwLock<Tracked>>>;
struct QueuedEvent {
date: Instant,
evt: Event,
}
impl QueuedEvent {
fn expired(&self) -> bool {
self.date.elapsed() >= Duration::from_millis(100)
}
}
impl From<Event> for QueuedEvent {
fn from(evt: Event) -> QueuedEvent {
QueuedEvent {
date: Instant::now(),
evt,
}
}
}
impl From<QueuedEvent> for Event {
fn from(queued_evt: QueuedEvent) -> Event {
queued_evt.evt
}
}
type Tracking = Arc<RwLock<HashMap<String, RwLock<Tracked>>>>;
type TrackingEventQueue = Arc<RwLock<HashMap<String, RwLock<VecDeque<QueuedEvent>>>>>;
#[derive(Serialize)]
#[serde(crate = "rocket::serde")]
@ -216,31 +244,39 @@ fn tracked_view(id: &str, gpslog: Option<bool>, tracking: &State<Tracking>) -> O
}
}
fn info_to_send(id: &String, tracking: &Tracking) -> Vec<TrackedInfo> {
let tracking_lock = tracking.read().unwrap();
let watcher = tracking_lock.get(id).unwrap().read().unwrap();
let mut infos: Vec<TrackedInfo> = Vec::new();
for (_, tracked) in tracking_lock.iter() {
if let Some(info) = apparent_info(&watcher, &tracked.read().unwrap()) {
infos.push(info);
}
fn evts_to_send(id: &str, evt_queue: &TrackingEventQueue) -> Vec<Event> {
let mut evts = Vec::new();
for qevt in evt_queue
.read()
.unwrap()
.get(&id.to_string())
.unwrap()
.read()
.unwrap()
.iter()
.filter(|qevt| !qevt.expired())
{
evts.push(qevt.evt.clone());
}
infos
evts
}
#[get("/track/<id>/events")]
fn tracked_events<'a>(
id: &'a str,
tracking: &'a State<Tracking>,
evt_queue: &'a State<TrackingEventQueue>,
mut shutdown: Shutdown,
) -> Option<EventStream![Event + 'a]> {
if tracking.read().unwrap().contains_key(&id.to_string()) {
if evt_queue.read().unwrap().contains_key(&id.to_string()) {
Some(EventStream! {
let mut interval = time::interval(Duration::from_secs(5));
let mut interval = time::interval(Duration::from_millis(100));
loop {
select!{
_ = interval.tick() =>
yield Event::json(&info_to_send(&id.to_string(), &tracking)).event("coords"),
_ = interval.tick() =>{
for evt in evts_to_send(id, evt_queue){
yield evt;
}
},
_ = &mut shutdown => break
}
}
@ -250,14 +286,14 @@ fn tracked_events<'a>(
}
}
#[put("/track/<id>?<lat>&<long>", rank = 0)]
#[put("/track/<id>/pos?<lat>&<long>")]
fn store_pos(id: &str, lat: f32, long: f32, tracking: &State<Tracking>) {
if let Some(tracked) = tracking.read().unwrap().get(&id.to_string()) {
tracked.write().unwrap().pos = (lat, long);
}
}
#[put("/track/<id>?<inv>&<col>", rank = 1)]
#[put("/track/<id>/state?<inv>&<col>")]
fn set_state(id: &str, inv: bool, col: u8, tracking: &State<Tracking>) -> Option<()> {
let tracking_lock = tracking.read().unwrap();
let state = &mut tracking_lock
@ -284,9 +320,39 @@ fn index() -> &'static str {
"Hello, world!"
}
fn send_coords(tracking: &Tracking, evt_queue: &TrackingEventQueue) {
let tracking_lock = tracking.read().unwrap();
for (id, queue) in evt_queue.read().unwrap().iter() {
let watcher = tracking_lock.get(id).unwrap().read().unwrap();
let mut infos: Vec<TrackedInfo> = Vec::new();
for (_, tracked) in tracking_lock.iter() {
if let Some(info) = apparent_info(&watcher, &tracked.read().unwrap()) {
infos.push(info);
}
}
queue
.write()
.unwrap()
.push_back(Event::json(&infos).event("coords").into());
}
}
fn clean_expired_evt(evt_queues: &TrackingEventQueue) {
for (_, queue) in evt_queues.read().unwrap().iter() {
let queue = &mut queue.write().unwrap();
while let Some(queued_evt) = queue.front() {
if queued_evt.expired() {
queue.pop_front();
} else {
break;
}
}
}
}
#[launch]
fn rocket() -> _ {
let tracking = HashMap::<String, RwLock<Tracked>>::from([
async fn rocket() -> _ {
let tracking: Tracking = Arc::new(RwLock::new(HashMap::from([
(
"team00".to_string(),
RwLock::new(build_conscrit("team00".to_string(), "Équipe 0".to_string())),
@ -303,13 +369,33 @@ fn rocket() -> _ {
"npc1".to_string(),
RwLock::new(build_vieux("npc1".to_string(), "PNJ 1".to_string())),
),
]);
rocket::build()
])));
let evt_queue: TrackingEventQueue = Arc::new(RwLock::new(
tracking
.read()
.unwrap()
.iter()
.map(|(id, _)| (id.clone(), RwLock::new(VecDeque::new())))
.collect(),
));
let rocket = rocket::build()
.attach(Template::fairing())
.manage(RwLock::new(tracking))
.manage(tracking.clone())
.manage(evt_queue.clone())
.mount(
"/",
routes![index, store_pos, tracked_view, tracked_events, set_state],
)
.mount("/", FileServer::from(relative!("static")))
.mount("/", FileServer::from(relative!("static")));
tokio::spawn(async move {
let mut clean_interval = time::interval(Duration::from_millis(500));
let mut coord_interval = time::interval(Duration::from_millis(3000));
loop {
select! {
_ = coord_interval.tick() => send_coords(&tracking, &evt_queue),
_ = clean_interval.tick() => clean_expired_evt(&evt_queue),
}
}
});
rocket
}

View file

@ -125,7 +125,7 @@ function setup_geoLoc(){
const requestOptions = { method: 'PUT' };
function geoLoc_success(pos) {
fetch("/log/"+id+"?lat="+pos.coords.latitude+"&long="+pos.coords.longitude, requestOptions);
fetch("/log/"+id+"/pos?lat="+pos.coords.latitude+"&long="+pos.coords.longitude, requestOptions);
}
function geoLoc_error(err) {

View file

@ -84,7 +84,7 @@
for(entry of data)
nState[entry[0]] = entry[1];
nState.invisibility = "invisibility" in nState;
fetch("/track/{{id}}?inv="+nState.invisibility+
fetch("/track/{{id}}/state?inv="+nState.invisibility+
"&col="+nState.color,
{ method: 'PUT' }
);