diff --git a/backend/.gitignore b/backend/.gitignore new file mode 100644 index 0000000..8a9e73c --- /dev/null +++ b/backend/.gitignore @@ -0,0 +1 @@ +/data.json diff --git a/backend/rustfmt.toml b/backend/rustfmt.toml new file mode 100644 index 0000000..5cf55b7 --- /dev/null +++ b/backend/rustfmt.toml @@ -0,0 +1,3 @@ +unstable_features = true +group_imports = "StdExternalCrate" +imports_granularity = "Module" diff --git a/backend/src/authorization.rs b/backend/src/authorization.rs index f1a0647..52f1766 100644 --- a/backend/src/authorization.rs +++ b/backend/src/authorization.rs @@ -1,14 +1,14 @@ -use crate::model::DB; -use axum::{ - extract::{Request, State}, - http::StatusCode, - middleware::Next, - response::Response, -}; -use axum_extra::{headers, typed_header::TypedHeader}; +use axum::extract::{Request, State}; +use axum::http::StatusCode; +use axum::middleware::Next; +use axum::response::Response; +use axum_extra::headers; +use axum_extra::typed_header::TypedHeader; use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; use serde::{Deserialize, Serialize}; +use crate::model::DB; + #[derive(Debug, Serialize, Deserialize)] struct Claims { sub: String, @@ -32,7 +32,7 @@ fn check_token(token: &str, jwt: &str) -> Option { match decoded_token { Ok(token_data) => { let user = token_data.claims.user; - let is_cof =token_data.claims.is_cof; + let is_cof = token_data.claims.is_cof; if token_data.claims.scope == "modify" { tracing::info!("Successful auth {user:?}"); Some(User { name: user, is_cof }) @@ -41,7 +41,10 @@ fn check_token(token: &str, jwt: &str) -> Option { None } } - Err(err) => {tracing::debug!("Failed decoding token: {err:?}"); None}, + Err(err) => { + tracing::debug!("Failed decoding token: {err:?}"); + None + } } } @@ -69,9 +72,9 @@ pub async fn jwt_middleware_cof( let token = auth.token(); if let Some(user) = check_token(token, &state.static_state.jwt_key) { if user.is_cof { - request.extensions_mut().insert(user); - return Ok(next.run(request).await) + request.extensions_mut().insert(user); + return Ok(next.run(request).await); }; }; - Err(StatusCode::FORBIDDEN) + Err(StatusCode::FORBIDDEN) } diff --git a/backend/src/handler.rs b/backend/src/handler.rs index 1aadadb..44fe947 100644 --- a/backend/src/handler.rs +++ b/backend/src/handler.rs @@ -1,15 +1,16 @@ -use crate::authorization::User; -use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColorAtom, DMXColor, DB}; -use axum::{ - debug_handler, - extract::{Path, State}, - http::StatusCode, - response::{sse::Event, IntoResponse, Sse}, - Extension, Json, -}; use std::time::{Duration, Instant}; -use tokio_stream::StreamExt; -use tokio_stream::{self as stream}; + +use axum::extract::{Path, State}; +use axum::http::StatusCode; +use axum::response::sse::Event; +use axum::response::{IntoResponse, Sse}; +use axum::{debug_handler, Extension, Json}; +use tokio_stream::{ + StreamExt, {self as stream}, +}; + +use crate::authorization::User; +use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColor, DMXColorAtom, DB}; #[debug_handler] pub async fn healthcheck_handler() -> impl IntoResponse { @@ -33,7 +34,12 @@ pub async fn batch_edit_value_handler( Json(body): Json>, ) -> Result<(), StatusCode> { let mut lock = db.mut_state.write().await; - if lock.ratelimit_info.get(&user).map(|d| d.elapsed() <= Duration::from_millis(500)).unwrap_or(false){ + if lock + .ratelimit_info + .get(&user) + .map(|d| d.elapsed() <= Duration::from_millis(500)) + .unwrap_or(false) + { return Err(StatusCode::TOO_MANY_REQUESTS); } for i in &body { @@ -103,13 +109,11 @@ pub async fn edit_motor_value_handler( let _ = db .static_state .change_channel - .send(DMXAtom::Motor(lock.dmx.motor)) - ; + .send(DMXAtom::Motor(lock.dmx.motor)); Ok(()) } - #[debug_handler] pub async fn sse_handler(State(db): State) -> impl IntoResponse { let rx = db.static_state.change_channel.subscribe(); @@ -120,12 +124,16 @@ pub async fn sse_handler(State(db): State) -> impl IntoResponse { data = lock.dmx.colors.clone(); motor = lock.dmx.motor.clone(); } - let init_data = data.into_iter().enumerate().map(|(i, x)| { - Ok(DMXAtom::Color(DMXColorAtom { - address: i, - value: x, - })) - }).chain(std::iter::once(Ok(DMXAtom::Motor(motor)))); + let init_data = data + .into_iter() + .enumerate() + .map(|(i, x)| { + Ok(DMXAtom::Color(DMXColorAtom { + address: i, + value: x, + })) + }) + .chain(std::iter::once(Ok(DMXAtom::Motor(motor)))); let init = stream::iter(init_data); let stream = init .chain(stream::wrappers::BroadcastStream::new(rx)) diff --git a/backend/src/main.rs b/backend/src/main.rs index 1738f7e..fc279af 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -2,19 +2,24 @@ mod authorization; mod handler; mod model; mod route; +mod socket_listener; +mod state_recorder; use dotenvy; use route::create_router; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tower_http::trace::TraceLayer; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +use crate::socket_listener::listen_socket; +use crate::state_recorder::record_state; #[tokio::main] async fn main() { tracing_subscriber::registry() .with( - tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { - "info,axum::rejection=trace".into() - }), + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info,axum::rejection=trace".into()), ) .with(tracing_subscriber::fmt::layer()) .init(); @@ -23,14 +28,17 @@ async fn main() { _ => (), } - let binding = dotenvy::var("BIND").unwrap_or(String::from("127.0.0.1:9999")); + let db = model::make_db(); + + tokio::spawn(listen_socket(db.clone())); + tokio::spawn(record_state(db.clone())); + + let binding = dotenvy::var("BIND_HTTP").unwrap_or(String::from("127.0.0.1:9999")); let bind = binding.trim(); tracing::debug!("Trying to bind at {bind}"); let listener = tokio::net::TcpListener::bind(bind).await.unwrap(); - let app = create_router() - .layer( - TraceLayer::new_for_http() - ); + + let app = create_router(db.clone()).layer(TraceLayer::new_for_http()); tracing::info!("🚀 Server started successfully"); axum::serve(listener, app).await.unwrap(); } diff --git a/backend/src/model.rs b/backend/src/model.rs index de82a59..0f4e3df 100644 --- a/backend/src/model.rs +++ b/backend/src/model.rs @@ -1,13 +1,15 @@ -use crate::authorization::User; -use dotenvy; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; use std::str::FromStr; use std::sync::Arc; use std::time::Instant; + +use dotenvy; +use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, RwLock}; +use crate::authorization::User; + #[derive(Debug, Default, Deserialize, Serialize, Copy, Clone)] pub struct DMXColor { pub red: u8, @@ -123,7 +125,7 @@ pub fn make_db() -> DB { .as_ref(), ) .expect("an JSON string"), - change_channel: broadcast::Sender::new(512), + change_channel: broadcast::Sender::new(5 * state_size), save_path, }, mut_state, diff --git a/backend/src/route.rs b/backend/src/route.rs index 2342ed5..94ad6d9 100644 --- a/backend/src/route.rs +++ b/backend/src/route.rs @@ -1,93 +1,16 @@ -use crate::authorization::{ jwt_middleware, jwt_middleware_cof }; -use crate::handler; -use crate::model; -use axum::{handler::Handler, middleware}; -use axum::{routing::get, Router}; -use serde_json::to_writer; -use std::fs::File; -use tokio::task; -use tokio::time::{sleep, Duration}; +use axum::handler::Handler; +use axum::routing::get; +use axum::{middleware, Router}; use tower_http::cors::{Any, CorsLayer}; -use axum::extract::State; -use serde::Deserialize; -use tokio::io::AsyncBufReadExt; -use tokio::io::BufReader; -use tokio::net::TcpListener; - -pub fn create_router() -> Router { - let db = model::make_db(); +use crate::authorization::{jwt_middleware, jwt_middleware_cof}; +use crate::{handler, model}; +pub fn create_router(db: model::DB) -> Router { let cors = CorsLayer::new() // allow requests from any origin .allow_origin(Any) .allow_headers(Any); - let db_to_save = db.clone(); - task::spawn(async move { - loop { - sleep(Duration::from_millis(1000)).await; - { - let save_path = &db_to_save.static_state.save_path; - let file = File::create(save_path); - match file { - Ok(f) => { - let db = db_to_save.mut_state.read().await; - match to_writer(f, &db.dmx) { - Ok(()) => tracing::trace!("Saved data at {save_path}"), - Err(e) => { - tracing::debug!("Failed to save data: {e:?}"); - () - } - } - } - Err(e) => { - tracing::debug!("Failed to save data: {e:?}"); - () - } - } - } - } - }); - - let db_listener = db.clone(); - task::spawn(async move { - //TODO: parametrisation - tracing::debug!("Trying to bind at 10.10.10.13:1235"); - let listener = TcpListener::bind("10.10.10.1:1235") - .await - .expect("Failed to listen for direct connection."); - - loop { - match listener.accept().await { - Ok((socket, addr)) => { - let db_socket = db_listener.clone(); - let mut buf_reader = BufReader::new(socket); - let mut buf = String::new(); - tracing::debug!("Accepted {addr:?}"); - task::spawn(async move { - loop { - buf.clear(); - let _ = buf_reader.read_line(&mut buf).await; - match model::DMXBeamChange::deserialize( - &mut serde_json::Deserializer::from_str(&buf), - ) { - Ok(data) => handler::edit_motor_value_handler( - State(db_socket.clone()), - axum::Json(data), - ) - .await - .unwrap_or(()), - Err(e) => tracing::error!("Error reading incoming data: {e:?}"), - } - } - }) - } - Err(e) => { - task::spawn(async move { tracing::error!("Failed to get client: {e:?}") }) - } - }; - } - }); Router::new() .route("/api/healthcheck", get(handler::healthcheck_handler)) @@ -108,10 +31,9 @@ pub fn create_router() -> Router { ) .route( "/api/control-box", - get(handler::get_motor_value_handler).post( - handler::edit_motor_value_handler - .layer(middleware::from_fn_with_state(db.clone(), jwt_middleware_cof)), - ), + get(handler::get_motor_value_handler).post(handler::edit_motor_value_handler.layer( + middleware::from_fn_with_state(db.clone(), jwt_middleware_cof), + )), ) .layer(cors) .with_state(db) diff --git a/backend/src/socket_listener.rs b/backend/src/socket_listener.rs new file mode 100644 index 0000000..bc725f6 --- /dev/null +++ b/backend/src/socket_listener.rs @@ -0,0 +1,51 @@ +use axum::extract::State; +use serde::Deserialize; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::net::TcpListener; +use tokio::task; + +use crate::{handler, model}; + +pub async fn listen_socket(db: model::DB) { + let binding = dotenvy::var("BIND_TCP").unwrap_or(String::from("127.0.0.1:1235")); + tracing::debug!("Trying to bind at {binding}"); + let listener = TcpListener::bind(binding) + .await + .expect("Failed to listen for direct connection."); + + loop { + match listener.accept().await { + Ok((socket, addr)) => { + let db_socket = db.clone(); + let mut buf_reader = BufReader::new(socket); + let mut buf = String::new(); + tracing::debug!("Accepted {addr:?}"); + task::spawn(async move { + loop { + buf.clear(); + match buf_reader.read_line(&mut buf).await { + Ok(0) => break, + Ok(_) => (), + Err(e) => tracing::error!("Error while reading tcp stream: {e:?}"), + }; + match model::DMXBeamChange::deserialize( + &mut serde_json::Deserializer::from_str(&buf), + ) { + Ok(data) => handler::edit_motor_value_handler( + State(db_socket.clone()), + axum::Json(data), + ) + .await + .unwrap_or(()), + Err(e) => tracing::error!("Error reading incoming data: {e:?}"), + } + } + tracing::debug!("Closing tcp connection {addr:?}"); + }); + } + Err(e) => { + tracing::error!("Failed to get client: {e:?}"); + } + }; + } +} diff --git a/backend/src/state_recorder.rs b/backend/src/state_recorder.rs new file mode 100644 index 0000000..1122604 --- /dev/null +++ b/backend/src/state_recorder.rs @@ -0,0 +1,32 @@ +use std::fs::File; + +use serde_json::to_writer; +use tokio::time::{sleep, Duration}; + +use crate::model; + +pub async fn record_state(db_to_save: model::DB) { + loop { + sleep(Duration::from_millis(1000)).await; + { + let save_path = &db_to_save.static_state.save_path; + let file = File::create(save_path); + match file { + Ok(f) => { + let db = db_to_save.mut_state.read().await; + match to_writer(f, &db.dmx) { + Ok(()) => tracing::trace!("Saved data at {save_path}"), + Err(e) => { + tracing::debug!("Failed to save data: {e:?}"); + () + } + } + } + Err(e) => { + tracing::debug!("Failed to save data: {e:?}"); + () + } + } + } + } +} diff --git a/provisioning/shell.nix b/provisioning/shell.nix index 54ae55a..1179fd7 100644 --- a/provisioning/shell.nix +++ b/provisioning/shell.nix @@ -6,6 +6,6 @@ pkgs.mkShell { pkgs.cargo-edit pkgs.rustc pkgs.rust-analyzer - pkgs.rustfmt + (pkgs.rustfmt.override { asNightly = true; }) ]; }