diff --git a/src/main.rs b/src/main.rs index 169b4f2..5c7b0b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,13 +6,16 @@ use rocket::{ response::stream::{Event, EventStream}, serde::Serialize, tokio::{ - select, - time::{self, Duration}, + self, select, + time::{self, Duration, Instant}, }, Shutdown, State, }; use rocket_dyn_templates::{context, Template}; -use std::{collections::HashMap, sync::RwLock}; +use std::{ + collections::{HashMap, VecDeque}, + sync::{Arc, RwLock}, +}; enum TrackedState { Conscrit { @@ -130,7 +133,32 @@ fn build_vieux(id: String, name: String) -> Tracked { } } -type Tracking = RwLock>>; +struct QueuedEvent { + date: Instant, + evt: Event, +} + +impl QueuedEvent { + fn expired(&self) -> bool { + self.date.elapsed() >= Duration::from_millis(100) + } +} +impl From for QueuedEvent { + fn from(evt: Event) -> QueuedEvent { + QueuedEvent { + date: Instant::now(), + evt, + } + } +} +impl From for Event { + fn from(queued_evt: QueuedEvent) -> Event { + queued_evt.evt + } +} + +type Tracking = Arc>>>; +type TrackingEventQueue = Arc>>>>; #[derive(Serialize)] #[serde(crate = "rocket::serde")] @@ -216,31 +244,39 @@ fn tracked_view(id: &str, gpslog: Option, tracking: &State) -> O } } -fn info_to_send(id: &String, tracking: &Tracking) -> Vec { - let tracking_lock = tracking.read().unwrap(); - let watcher = tracking_lock.get(id).unwrap().read().unwrap(); - let mut infos: Vec = Vec::new(); - for (_, tracked) in tracking_lock.iter() { - if let Some(info) = apparent_info(&watcher, &tracked.read().unwrap()) { - infos.push(info); - } +fn evts_to_send(id: &str, evt_queue: &TrackingEventQueue) -> Vec { + let mut evts = Vec::new(); + for qevt in evt_queue + .read() + .unwrap() + .get(&id.to_string()) + .unwrap() + .read() + .unwrap() + .iter() + .filter(|qevt| !qevt.expired()) + { + evts.push(qevt.evt.clone()); } - infos + evts } #[get("/track//events")] fn tracked_events<'a>( id: &'a str, - tracking: &'a State, + evt_queue: &'a State, mut shutdown: Shutdown, ) -> Option { - if tracking.read().unwrap().contains_key(&id.to_string()) { + if evt_queue.read().unwrap().contains_key(&id.to_string()) { Some(EventStream! { - let mut interval = time::interval(Duration::from_secs(5)); + let mut interval = time::interval(Duration::from_millis(100)); loop { select!{ - _ = interval.tick() => - yield Event::json(&info_to_send(&id.to_string(), &tracking)).event("coords"), + _ = interval.tick() =>{ + for evt in evts_to_send(id, evt_queue){ + yield evt; + } + }, _ = &mut shutdown => break } } @@ -250,14 +286,14 @@ fn tracked_events<'a>( } } -#[put("/track/?&", rank = 0)] +#[put("/track//pos?&")] fn store_pos(id: &str, lat: f32, long: f32, tracking: &State) { if let Some(tracked) = tracking.read().unwrap().get(&id.to_string()) { tracked.write().unwrap().pos = (lat, long); } } -#[put("/track/?&", rank = 1)] +#[put("/track//state?&")] fn set_state(id: &str, inv: bool, col: u8, tracking: &State) -> Option<()> { let tracking_lock = tracking.read().unwrap(); let state = &mut tracking_lock @@ -284,9 +320,39 @@ fn index() -> &'static str { "Hello, world!" } +fn send_coords(tracking: &Tracking, evt_queue: &TrackingEventQueue) { + let tracking_lock = tracking.read().unwrap(); + for (id, queue) in evt_queue.read().unwrap().iter() { + let watcher = tracking_lock.get(id).unwrap().read().unwrap(); + let mut infos: Vec = Vec::new(); + for (_, tracked) in tracking_lock.iter() { + if let Some(info) = apparent_info(&watcher, &tracked.read().unwrap()) { + infos.push(info); + } + } + queue + .write() + .unwrap() + .push_back(Event::json(&infos).event("coords").into()); + } +} + +fn clean_expired_evt(evt_queues: &TrackingEventQueue) { + for (_, queue) in evt_queues.read().unwrap().iter() { + let queue = &mut queue.write().unwrap(); + while let Some(queued_evt) = queue.front() { + if queued_evt.expired() { + queue.pop_front(); + } else { + break; + } + } + } +} + #[launch] -fn rocket() -> _ { - let tracking = HashMap::>::from([ +async fn rocket() -> _ { + let tracking: Tracking = Arc::new(RwLock::new(HashMap::from([ ( "team00".to_string(), RwLock::new(build_conscrit("team00".to_string(), "Équipe 0".to_string())), @@ -303,13 +369,33 @@ fn rocket() -> _ { "npc1".to_string(), RwLock::new(build_vieux("npc1".to_string(), "PNJ 1".to_string())), ), - ]); - rocket::build() + ]))); + let evt_queue: TrackingEventQueue = Arc::new(RwLock::new( + tracking + .read() + .unwrap() + .iter() + .map(|(id, _)| (id.clone(), RwLock::new(VecDeque::new()))) + .collect(), + )); + let rocket = rocket::build() .attach(Template::fairing()) - .manage(RwLock::new(tracking)) + .manage(tracking.clone()) + .manage(evt_queue.clone()) .mount( "/", routes![index, store_pos, tracked_view, tracked_events, set_state], ) - .mount("/", FileServer::from(relative!("static"))) + .mount("/", FileServer::from(relative!("static"))); + tokio::spawn(async move { + let mut clean_interval = time::interval(Duration::from_millis(500)); + let mut coord_interval = time::interval(Duration::from_millis(3000)); + loop { + select! { + _ = coord_interval.tick() => send_coords(&tracking, &evt_queue), + _ = clean_interval.tick() => clean_expired_evt(&evt_queue), + } + } + }); + rocket } diff --git a/static/utils.js b/static/utils.js index 54d93b7..4830334 100644 --- a/static/utils.js +++ b/static/utils.js @@ -125,7 +125,7 @@ function setup_geoLoc(){ const requestOptions = { method: 'PUT' }; function geoLoc_success(pos) { - fetch("/log/"+id+"?lat="+pos.coords.latitude+"&long="+pos.coords.longitude, requestOptions); + fetch("/log/"+id+"/pos?lat="+pos.coords.latitude+"&long="+pos.coords.longitude, requestOptions); } function geoLoc_error(err) { diff --git a/templates/vieux.html.hbs b/templates/vieux.html.hbs index ecdd215..2ba3542 100644 --- a/templates/vieux.html.hbs +++ b/templates/vieux.html.hbs @@ -84,7 +84,7 @@ for(entry of data) nState[entry[0]] = entry[1]; nState.invisibility = "invisibility" in nState; - fetch("/track/{{id}}?inv="+nState.invisibility+ + fetch("/track/{{id}}/state?inv="+nState.invisibility+ "&col="+nState.color, { method: 'PUT' } );