chore: refactor task spawning

This commit is contained in:
sinavir 2024-10-13 19:21:19 +02:00
parent 2067df8968
commit a02bb292be
5 changed files with 97 additions and 87 deletions

View file

@ -2,8 +2,12 @@ mod authorization;
mod handler; mod handler;
mod model; mod model;
mod route; mod route;
mod state_recorder;
mod socket_listener;
use dotenvy; use dotenvy;
use crate::state_recorder::record_state;
use crate::socket_listener::listen_socket;
use route::create_router; use route::create_router;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
@ -23,11 +27,18 @@ async fn main() {
_ => (), _ => (),
} }
let binding = dotenvy::var("BIND").unwrap_or(String::from("127.0.0.1:9999")); let db = model::make_db();
tokio::spawn(listen_socket(db.clone()));
tokio::spawn(record_state(db.clone()));
let binding = dotenvy::var("BIND_HTTP").unwrap_or(String::from("127.0.0.1:9999"));
let bind = binding.trim(); let bind = binding.trim();
tracing::debug!("Trying to bind at {bind}"); tracing::debug!("Trying to bind at {bind}");
let listener = tokio::net::TcpListener::bind(bind).await.unwrap(); let listener = tokio::net::TcpListener::bind(bind).await.unwrap();
let app = create_router()
let app = create_router(db.clone())
.layer( .layer(
TraceLayer::new_for_http() TraceLayer::new_for_http()
); );

View file

@ -123,7 +123,7 @@ pub fn make_db() -> DB {
.as_ref(), .as_ref(),
) )
.expect("an JSON string"), .expect("an JSON string"),
change_channel: broadcast::Sender::new(512), change_channel: broadcast::Sender::new(5*state_size),
save_path, save_path,
}, },
mut_state, mut_state,

View file

@ -1,93 +1,15 @@
use crate::authorization::{ jwt_middleware, jwt_middleware_cof }; use crate::authorization::{jwt_middleware, jwt_middleware_cof};
use crate::handler; use crate::handler;
use crate::model; use crate::model;
use axum::{handler::Handler, middleware}; use axum::{handler::Handler, middleware};
use axum::{routing::get, Router}; use axum::{routing::get, Router};
use serde_json::to_writer;
use std::fs::File;
use tokio::task;
use tokio::time::{sleep, Duration};
use tower_http::cors::{Any, CorsLayer}; use tower_http::cors::{Any, CorsLayer};
use axum::extract::State; pub fn create_router(db: model::DB) -> Router {
use serde::Deserialize;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::net::TcpListener;
pub fn create_router() -> Router {
let db = model::make_db();
let cors = CorsLayer::new() let cors = CorsLayer::new()
// allow requests from any origin // allow requests from any origin
.allow_origin(Any) .allow_origin(Any)
.allow_headers(Any); .allow_headers(Any);
let db_to_save = db.clone();
task::spawn(async move {
loop {
sleep(Duration::from_millis(1000)).await;
{
let save_path = &db_to_save.static_state.save_path;
let file = File::create(save_path);
match file {
Ok(f) => {
let db = db_to_save.mut_state.read().await;
match to_writer(f, &db.dmx) {
Ok(()) => tracing::trace!("Saved data at {save_path}"),
Err(e) => {
tracing::debug!("Failed to save data: {e:?}");
()
}
}
}
Err(e) => {
tracing::debug!("Failed to save data: {e:?}");
()
}
}
}
}
});
let db_listener = db.clone();
task::spawn(async move {
//TODO: parametrisation
tracing::debug!("Trying to bind at 10.10.10.13:1235");
let listener = TcpListener::bind("10.10.10.1:1235")
.await
.expect("Failed to listen for direct connection.");
loop {
match listener.accept().await {
Ok((socket, addr)) => {
let db_socket = db_listener.clone();
let mut buf_reader = BufReader::new(socket);
let mut buf = String::new();
tracing::debug!("Accepted {addr:?}");
task::spawn(async move {
loop {
buf.clear();
let _ = buf_reader.read_line(&mut buf).await;
match model::DMXBeamChange::deserialize(
&mut serde_json::Deserializer::from_str(&buf),
) {
Ok(data) => handler::edit_motor_value_handler(
State(db_socket.clone()),
axum::Json(data),
)
.await
.unwrap_or(()),
Err(e) => tracing::error!("Error reading incoming data: {e:?}"),
}
}
})
}
Err(e) => {
task::spawn(async move { tracing::error!("Failed to get client: {e:?}") })
}
};
}
});
Router::new() Router::new()
.route("/api/healthcheck", get(handler::healthcheck_handler)) .route("/api/healthcheck", get(handler::healthcheck_handler))
@ -108,10 +30,9 @@ pub fn create_router() -> Router {
) )
.route( .route(
"/api/control-box", "/api/control-box",
get(handler::get_motor_value_handler).post( get(handler::get_motor_value_handler).post(handler::edit_motor_value_handler.layer(
handler::edit_motor_value_handler middleware::from_fn_with_state(db.clone(), jwt_middleware_cof),
.layer(middleware::from_fn_with_state(db.clone(), jwt_middleware_cof)), )),
),
) )
.layer(cors) .layer(cors)
.with_state(db) .with_state(db)

View file

@ -0,0 +1,47 @@
use tokio::task;
use axum::extract::State;
use serde::Deserialize;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::net::TcpListener;
use crate::{handler, model };
pub async fn listen_socket(db: model::DB) {
let binding = dotenvy::var("BIND_TCP").unwrap_or(String::from("127.0.0.1:1235"));
tracing::debug!("Trying to bind at {binding}");
let listener = TcpListener::bind(binding)
.await
.expect("Failed to listen for direct connection.");
loop {
match listener.accept().await {
Ok((socket, addr)) => {
let db_socket = db.clone();
let mut buf_reader = BufReader::new(socket);
let mut buf = String::new();
tracing::debug!("Accepted {addr:?}");
task::spawn(async move {
loop {
buf.clear();
let _ = buf_reader.read_line(&mut buf).await;
match model::DMXBeamChange::deserialize(
&mut serde_json::Deserializer::from_str(&buf),
) {
Ok(data) => handler::edit_motor_value_handler(
State(db_socket.clone()),
axum::Json(data),
)
.await
.unwrap_or(()),
Err(e) => tracing::error!("Error reading incoming data: {e:?}"),
}
}
});
}
Err(e) => {
tracing::error!("Failed to get client: {e:?}");
}
};
}
}

View file

@ -0,0 +1,31 @@
use serde_json::to_writer;
use std::fs::File;
use tokio::time::{sleep, Duration};
use crate::model;
pub async fn record_state(db_to_save: model::DB) {
loop {
sleep(Duration::from_millis(1000)).await;
{
let save_path = &db_to_save.static_state.save_path;
let file = File::create(save_path);
match file {
Ok(f) => {
let db = db_to_save.mut_state.read().await;
match to_writer(f, &db.dmx) {
Ok(()) => tracing::trace!("Saved data at {save_path}"),
Err(e) => {
tracing::debug!("Failed to save data: {e:?}");
()
}
}
}
Err(e) => {
tracing::debug!("Failed to save data: {e:?}");
()
}
}
}
}
}