diff --git a/backend/src/authorization.rs b/backend/src/authorization.rs index f1a0647..52f1766 100644 --- a/backend/src/authorization.rs +++ b/backend/src/authorization.rs @@ -1,14 +1,14 @@ -use crate::model::DB; -use axum::{ - extract::{Request, State}, - http::StatusCode, - middleware::Next, - response::Response, -}; -use axum_extra::{headers, typed_header::TypedHeader}; +use axum::extract::{Request, State}; +use axum::http::StatusCode; +use axum::middleware::Next; +use axum::response::Response; +use axum_extra::headers; +use axum_extra::typed_header::TypedHeader; use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; use serde::{Deserialize, Serialize}; +use crate::model::DB; + #[derive(Debug, Serialize, Deserialize)] struct Claims { sub: String, @@ -32,7 +32,7 @@ fn check_token(token: &str, jwt: &str) -> Option { match decoded_token { Ok(token_data) => { let user = token_data.claims.user; - let is_cof =token_data.claims.is_cof; + let is_cof = token_data.claims.is_cof; if token_data.claims.scope == "modify" { tracing::info!("Successful auth {user:?}"); Some(User { name: user, is_cof }) @@ -41,7 +41,10 @@ fn check_token(token: &str, jwt: &str) -> Option { None } } - Err(err) => {tracing::debug!("Failed decoding token: {err:?}"); None}, + Err(err) => { + tracing::debug!("Failed decoding token: {err:?}"); + None + } } } @@ -69,9 +72,9 @@ pub async fn jwt_middleware_cof( let token = auth.token(); if let Some(user) = check_token(token, &state.static_state.jwt_key) { if user.is_cof { - request.extensions_mut().insert(user); - return Ok(next.run(request).await) + request.extensions_mut().insert(user); + return Ok(next.run(request).await); }; }; - Err(StatusCode::FORBIDDEN) + Err(StatusCode::FORBIDDEN) } diff --git a/backend/src/handler.rs b/backend/src/handler.rs index 1aadadb..44fe947 100644 --- a/backend/src/handler.rs +++ b/backend/src/handler.rs @@ -1,15 +1,16 @@ -use crate::authorization::User; -use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColorAtom, DMXColor, DB}; -use axum::{ - debug_handler, - extract::{Path, State}, - http::StatusCode, - response::{sse::Event, IntoResponse, Sse}, - Extension, Json, -}; use std::time::{Duration, Instant}; -use tokio_stream::StreamExt; -use tokio_stream::{self as stream}; + +use axum::extract::{Path, State}; +use axum::http::StatusCode; +use axum::response::sse::Event; +use axum::response::{IntoResponse, Sse}; +use axum::{debug_handler, Extension, Json}; +use tokio_stream::{ + StreamExt, {self as stream}, +}; + +use crate::authorization::User; +use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColor, DMXColorAtom, DB}; #[debug_handler] pub async fn healthcheck_handler() -> impl IntoResponse { @@ -33,7 +34,12 @@ pub async fn batch_edit_value_handler( Json(body): Json>, ) -> Result<(), StatusCode> { let mut lock = db.mut_state.write().await; - if lock.ratelimit_info.get(&user).map(|d| d.elapsed() <= Duration::from_millis(500)).unwrap_or(false){ + if lock + .ratelimit_info + .get(&user) + .map(|d| d.elapsed() <= Duration::from_millis(500)) + .unwrap_or(false) + { return Err(StatusCode::TOO_MANY_REQUESTS); } for i in &body { @@ -103,13 +109,11 @@ pub async fn edit_motor_value_handler( let _ = db .static_state .change_channel - .send(DMXAtom::Motor(lock.dmx.motor)) - ; + .send(DMXAtom::Motor(lock.dmx.motor)); Ok(()) } - #[debug_handler] pub async fn sse_handler(State(db): State) -> impl IntoResponse { let rx = db.static_state.change_channel.subscribe(); @@ -120,12 +124,16 @@ pub async fn sse_handler(State(db): State) -> impl IntoResponse { data = lock.dmx.colors.clone(); motor = lock.dmx.motor.clone(); } - let init_data = data.into_iter().enumerate().map(|(i, x)| { - Ok(DMXAtom::Color(DMXColorAtom { - address: i, - value: x, - })) - }).chain(std::iter::once(Ok(DMXAtom::Motor(motor)))); + let init_data = data + .into_iter() + .enumerate() + .map(|(i, x)| { + Ok(DMXAtom::Color(DMXColorAtom { + address: i, + value: x, + })) + }) + .chain(std::iter::once(Ok(DMXAtom::Motor(motor)))); let init = stream::iter(init_data); let stream = init .chain(stream::wrappers::BroadcastStream::new(rx)) diff --git a/backend/src/main.rs b/backend/src/main.rs index aa32370..fc279af 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -2,23 +2,24 @@ mod authorization; mod handler; mod model; mod route; -mod state_recorder; mod socket_listener; +mod state_recorder; use dotenvy; -use crate::state_recorder::record_state; -use crate::socket_listener::listen_socket; use route::create_router; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tower_http::trace::TraceLayer; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +use crate::socket_listener::listen_socket; +use crate::state_recorder::record_state; #[tokio::main] async fn main() { tracing_subscriber::registry() .with( - tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { - "info,axum::rejection=trace".into() - }), + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info,axum::rejection=trace".into()), ) .with(tracing_subscriber::fmt::layer()) .init(); @@ -37,11 +38,7 @@ async fn main() { tracing::debug!("Trying to bind at {bind}"); let listener = tokio::net::TcpListener::bind(bind).await.unwrap(); - - let app = create_router(db.clone()) - .layer( - TraceLayer::new_for_http() - ); + let app = create_router(db.clone()).layer(TraceLayer::new_for_http()); tracing::info!("🚀 Server started successfully"); axum::serve(listener, app).await.unwrap(); } diff --git a/backend/src/model.rs b/backend/src/model.rs index e2428f4..0f4e3df 100644 --- a/backend/src/model.rs +++ b/backend/src/model.rs @@ -1,13 +1,15 @@ -use crate::authorization::User; -use dotenvy; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; use std::str::FromStr; use std::sync::Arc; use std::time::Instant; + +use dotenvy; +use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, RwLock}; +use crate::authorization::User; + #[derive(Debug, Default, Deserialize, Serialize, Copy, Clone)] pub struct DMXColor { pub red: u8, @@ -123,7 +125,7 @@ pub fn make_db() -> DB { .as_ref(), ) .expect("an JSON string"), - change_channel: broadcast::Sender::new(5*state_size), + change_channel: broadcast::Sender::new(5 * state_size), save_path, }, mut_state, diff --git a/backend/src/route.rs b/backend/src/route.rs index f455a51..94ad6d9 100644 --- a/backend/src/route.rs +++ b/backend/src/route.rs @@ -1,10 +1,11 @@ -use crate::authorization::{jwt_middleware, jwt_middleware_cof}; -use crate::handler; -use crate::model; -use axum::{handler::Handler, middleware}; -use axum::{routing::get, Router}; +use axum::handler::Handler; +use axum::routing::get; +use axum::{middleware, Router}; use tower_http::cors::{Any, CorsLayer}; +use crate::authorization::{jwt_middleware, jwt_middleware_cof}; +use crate::{handler, model}; + pub fn create_router(db: model::DB) -> Router { let cors = CorsLayer::new() // allow requests from any origin diff --git a/backend/src/socket_listener.rs b/backend/src/socket_listener.rs index 3d8bf96..bc725f6 100644 --- a/backend/src/socket_listener.rs +++ b/backend/src/socket_listener.rs @@ -1,47 +1,51 @@ -use tokio::task; - use axum::extract::State; use serde::Deserialize; -use tokio::io::AsyncBufReadExt; -use tokio::io::BufReader; +use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::net::TcpListener; -use crate::{handler, model }; +use tokio::task; + +use crate::{handler, model}; pub async fn listen_socket(db: model::DB) { - let binding = dotenvy::var("BIND_TCP").unwrap_or(String::from("127.0.0.1:1235")); - tracing::debug!("Trying to bind at {binding}"); - let listener = TcpListener::bind(binding) - .await - .expect("Failed to listen for direct connection."); + let binding = dotenvy::var("BIND_TCP").unwrap_or(String::from("127.0.0.1:1235")); + tracing::debug!("Trying to bind at {binding}"); + let listener = TcpListener::bind(binding) + .await + .expect("Failed to listen for direct connection."); - loop { - match listener.accept().await { - Ok((socket, addr)) => { - let db_socket = db.clone(); - let mut buf_reader = BufReader::new(socket); - let mut buf = String::new(); - tracing::debug!("Accepted {addr:?}"); - task::spawn(async move { - loop { - buf.clear(); - let _ = buf_reader.read_line(&mut buf).await; - match model::DMXBeamChange::deserialize( - &mut serde_json::Deserializer::from_str(&buf), - ) { - Ok(data) => handler::edit_motor_value_handler( - State(db_socket.clone()), - axum::Json(data), - ) - .await - .unwrap_or(()), - Err(e) => tracing::error!("Error reading incoming data: {e:?}"), - } + loop { + match listener.accept().await { + Ok((socket, addr)) => { + let db_socket = db.clone(); + let mut buf_reader = BufReader::new(socket); + let mut buf = String::new(); + tracing::debug!("Accepted {addr:?}"); + task::spawn(async move { + loop { + buf.clear(); + match buf_reader.read_line(&mut buf).await { + Ok(0) => break, + Ok(_) => (), + Err(e) => tracing::error!("Error while reading tcp stream: {e:?}"), + }; + match model::DMXBeamChange::deserialize( + &mut serde_json::Deserializer::from_str(&buf), + ) { + Ok(data) => handler::edit_motor_value_handler( + State(db_socket.clone()), + axum::Json(data), + ) + .await + .unwrap_or(()), + Err(e) => tracing::error!("Error reading incoming data: {e:?}"), } - }); - } - Err(e) => { - tracing::error!("Failed to get client: {e:?}"); - } - }; - } + } + tracing::debug!("Closing tcp connection {addr:?}"); + }); + } + Err(e) => { + tracing::error!("Failed to get client: {e:?}"); + } + }; } +} diff --git a/backend/src/state_recorder.rs b/backend/src/state_recorder.rs index a6497a7..1122604 100644 --- a/backend/src/state_recorder.rs +++ b/backend/src/state_recorder.rs @@ -1,7 +1,8 @@ +use std::fs::File; use serde_json::to_writer; -use std::fs::File; use tokio::time::{sleep, Duration}; + use crate::model; pub async fn record_state(db_to_save: model::DB) {