Compare commits

...

4 commits

Author SHA1 Message Date
sinavir
700d967ff3 fix(backend): tcp error on closing + fmt 2024-10-13 20:11:10 +02:00
sinavir
84aaef1900 fix(gitignore): Add gitignore for backend 2024-10-13 19:51:48 +02:00
sinavir
9124550ad0 fix(rust-fmt): Better formatting 2024-10-13 19:51:08 +02:00
sinavir
a02bb292be chore: refactor task spawning 2024-10-13 19:45:43 +02:00
10 changed files with 165 additions and 135 deletions

1
backend/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/data.json

3
backend/rustfmt.toml Normal file
View file

@ -0,0 +1,3 @@
unstable_features = true
group_imports = "StdExternalCrate"
imports_granularity = "Module"

View file

@ -1,14 +1,14 @@
use crate::model::DB; use axum::extract::{Request, State};
use axum::{ use axum::http::StatusCode;
extract::{Request, State}, use axum::middleware::Next;
http::StatusCode, use axum::response::Response;
middleware::Next, use axum_extra::headers;
response::Response, use axum_extra::typed_header::TypedHeader;
};
use axum_extra::{headers, typed_header::TypedHeader};
use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::model::DB;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct Claims { struct Claims {
sub: String, sub: String,
@ -32,7 +32,7 @@ fn check_token(token: &str, jwt: &str) -> Option<User> {
match decoded_token { match decoded_token {
Ok(token_data) => { Ok(token_data) => {
let user = token_data.claims.user; let user = token_data.claims.user;
let is_cof =token_data.claims.is_cof; let is_cof = token_data.claims.is_cof;
if token_data.claims.scope == "modify" { if token_data.claims.scope == "modify" {
tracing::info!("Successful auth {user:?}"); tracing::info!("Successful auth {user:?}");
Some(User { name: user, is_cof }) Some(User { name: user, is_cof })
@ -41,7 +41,10 @@ fn check_token(token: &str, jwt: &str) -> Option<User> {
None None
} }
} }
Err(err) => {tracing::debug!("Failed decoding token: {err:?}"); None}, Err(err) => {
tracing::debug!("Failed decoding token: {err:?}");
None
}
} }
} }
@ -70,7 +73,7 @@ pub async fn jwt_middleware_cof(
if let Some(user) = check_token(token, &state.static_state.jwt_key) { if let Some(user) = check_token(token, &state.static_state.jwt_key) {
if user.is_cof { if user.is_cof {
request.extensions_mut().insert(user); request.extensions_mut().insert(user);
return Ok(next.run(request).await) return Ok(next.run(request).await);
}; };
}; };
Err(StatusCode::FORBIDDEN) Err(StatusCode::FORBIDDEN)

View file

@ -1,15 +1,16 @@
use crate::authorization::User;
use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColorAtom, DMXColor, DB};
use axum::{
debug_handler,
extract::{Path, State},
http::StatusCode,
response::{sse::Event, IntoResponse, Sse},
Extension, Json,
};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio_stream::StreamExt;
use tokio_stream::{self as stream}; use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::response::sse::Event;
use axum::response::{IntoResponse, Sse};
use axum::{debug_handler, Extension, Json};
use tokio_stream::{
StreamExt, {self as stream},
};
use crate::authorization::User;
use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColor, DMXColorAtom, DB};
#[debug_handler] #[debug_handler]
pub async fn healthcheck_handler() -> impl IntoResponse { pub async fn healthcheck_handler() -> impl IntoResponse {
@ -33,7 +34,12 @@ pub async fn batch_edit_value_handler(
Json(body): Json<Vec<DMXColorAtom>>, Json(body): Json<Vec<DMXColorAtom>>,
) -> Result<(), StatusCode> { ) -> Result<(), StatusCode> {
let mut lock = db.mut_state.write().await; let mut lock = db.mut_state.write().await;
if lock.ratelimit_info.get(&user).map(|d| d.elapsed() <= Duration::from_millis(500)).unwrap_or(false){ if lock
.ratelimit_info
.get(&user)
.map(|d| d.elapsed() <= Duration::from_millis(500))
.unwrap_or(false)
{
return Err(StatusCode::TOO_MANY_REQUESTS); return Err(StatusCode::TOO_MANY_REQUESTS);
} }
for i in &body { for i in &body {
@ -103,13 +109,11 @@ pub async fn edit_motor_value_handler(
let _ = db let _ = db
.static_state .static_state
.change_channel .change_channel
.send(DMXAtom::Motor(lock.dmx.motor)) .send(DMXAtom::Motor(lock.dmx.motor));
;
Ok(()) Ok(())
} }
#[debug_handler] #[debug_handler]
pub async fn sse_handler(State(db): State<DB>) -> impl IntoResponse { pub async fn sse_handler(State(db): State<DB>) -> impl IntoResponse {
let rx = db.static_state.change_channel.subscribe(); let rx = db.static_state.change_channel.subscribe();
@ -120,12 +124,16 @@ pub async fn sse_handler(State(db): State<DB>) -> impl IntoResponse {
data = lock.dmx.colors.clone(); data = lock.dmx.colors.clone();
motor = lock.dmx.motor.clone(); motor = lock.dmx.motor.clone();
} }
let init_data = data.into_iter().enumerate().map(|(i, x)| { let init_data = data
.into_iter()
.enumerate()
.map(|(i, x)| {
Ok(DMXAtom::Color(DMXColorAtom { Ok(DMXAtom::Color(DMXColorAtom {
address: i, address: i,
value: x, value: x,
})) }))
}).chain(std::iter::once(Ok(DMXAtom::Motor(motor)))); })
.chain(std::iter::once(Ok(DMXAtom::Motor(motor))));
let init = stream::iter(init_data); let init = stream::iter(init_data);
let stream = init let stream = init
.chain(stream::wrappers::BroadcastStream::new(rx)) .chain(stream::wrappers::BroadcastStream::new(rx))

View file

@ -2,19 +2,24 @@ mod authorization;
mod handler; mod handler;
mod model; mod model;
mod route; mod route;
mod socket_listener;
mod state_recorder;
use dotenvy; use dotenvy;
use route::create_router; use route::create_router;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use crate::socket_listener::listen_socket;
use crate::state_recorder::record_state;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::registry() tracing_subscriber::registry()
.with( .with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { tracing_subscriber::EnvFilter::try_from_default_env()
"info,axum::rejection=trace".into() .unwrap_or_else(|_| "info,axum::rejection=trace".into()),
}),
) )
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
@ -23,14 +28,17 @@ async fn main() {
_ => (), _ => (),
} }
let binding = dotenvy::var("BIND").unwrap_or(String::from("127.0.0.1:9999")); let db = model::make_db();
tokio::spawn(listen_socket(db.clone()));
tokio::spawn(record_state(db.clone()));
let binding = dotenvy::var("BIND_HTTP").unwrap_or(String::from("127.0.0.1:9999"));
let bind = binding.trim(); let bind = binding.trim();
tracing::debug!("Trying to bind at {bind}"); tracing::debug!("Trying to bind at {bind}");
let listener = tokio::net::TcpListener::bind(bind).await.unwrap(); let listener = tokio::net::TcpListener::bind(bind).await.unwrap();
let app = create_router()
.layer( let app = create_router(db.clone()).layer(TraceLayer::new_for_http());
TraceLayer::new_for_http()
);
tracing::info!("🚀 Server started successfully"); tracing::info!("🚀 Server started successfully");
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }

View file

@ -1,13 +1,15 @@
use crate::authorization::User;
use dotenvy;
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs; use std::fs;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use dotenvy;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, RwLock}; use tokio::sync::{broadcast, RwLock};
use crate::authorization::User;
#[derive(Debug, Default, Deserialize, Serialize, Copy, Clone)] #[derive(Debug, Default, Deserialize, Serialize, Copy, Clone)]
pub struct DMXColor { pub struct DMXColor {
pub red: u8, pub red: u8,
@ -123,7 +125,7 @@ pub fn make_db() -> DB {
.as_ref(), .as_ref(),
) )
.expect("an JSON string"), .expect("an JSON string"),
change_channel: broadcast::Sender::new(512), change_channel: broadcast::Sender::new(5 * state_size),
save_path, save_path,
}, },
mut_state, mut_state,

View file

@ -1,93 +1,16 @@
use crate::authorization::{ jwt_middleware, jwt_middleware_cof }; use axum::handler::Handler;
use crate::handler; use axum::routing::get;
use crate::model; use axum::{middleware, Router};
use axum::{handler::Handler, middleware};
use axum::{routing::get, Router};
use serde_json::to_writer;
use std::fs::File;
use tokio::task;
use tokio::time::{sleep, Duration};
use tower_http::cors::{Any, CorsLayer}; use tower_http::cors::{Any, CorsLayer};
use axum::extract::State; use crate::authorization::{jwt_middleware, jwt_middleware_cof};
use serde::Deserialize; use crate::{handler, model};
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::net::TcpListener;
pub fn create_router() -> Router {
let db = model::make_db();
pub fn create_router(db: model::DB) -> Router {
let cors = CorsLayer::new() let cors = CorsLayer::new()
// allow requests from any origin // allow requests from any origin
.allow_origin(Any) .allow_origin(Any)
.allow_headers(Any); .allow_headers(Any);
let db_to_save = db.clone();
task::spawn(async move {
loop {
sleep(Duration::from_millis(1000)).await;
{
let save_path = &db_to_save.static_state.save_path;
let file = File::create(save_path);
match file {
Ok(f) => {
let db = db_to_save.mut_state.read().await;
match to_writer(f, &db.dmx) {
Ok(()) => tracing::trace!("Saved data at {save_path}"),
Err(e) => {
tracing::debug!("Failed to save data: {e:?}");
()
}
}
}
Err(e) => {
tracing::debug!("Failed to save data: {e:?}");
()
}
}
}
}
});
let db_listener = db.clone();
task::spawn(async move {
//TODO: parametrisation
tracing::debug!("Trying to bind at 10.10.10.13:1235");
let listener = TcpListener::bind("10.10.10.1:1235")
.await
.expect("Failed to listen for direct connection.");
loop {
match listener.accept().await {
Ok((socket, addr)) => {
let db_socket = db_listener.clone();
let mut buf_reader = BufReader::new(socket);
let mut buf = String::new();
tracing::debug!("Accepted {addr:?}");
task::spawn(async move {
loop {
buf.clear();
let _ = buf_reader.read_line(&mut buf).await;
match model::DMXBeamChange::deserialize(
&mut serde_json::Deserializer::from_str(&buf),
) {
Ok(data) => handler::edit_motor_value_handler(
State(db_socket.clone()),
axum::Json(data),
)
.await
.unwrap_or(()),
Err(e) => tracing::error!("Error reading incoming data: {e:?}"),
}
}
})
}
Err(e) => {
task::spawn(async move { tracing::error!("Failed to get client: {e:?}") })
}
};
}
});
Router::new() Router::new()
.route("/api/healthcheck", get(handler::healthcheck_handler)) .route("/api/healthcheck", get(handler::healthcheck_handler))
@ -108,10 +31,9 @@ pub fn create_router() -> Router {
) )
.route( .route(
"/api/control-box", "/api/control-box",
get(handler::get_motor_value_handler).post( get(handler::get_motor_value_handler).post(handler::edit_motor_value_handler.layer(
handler::edit_motor_value_handler middleware::from_fn_with_state(db.clone(), jwt_middleware_cof),
.layer(middleware::from_fn_with_state(db.clone(), jwt_middleware_cof)), )),
),
) )
.layer(cors) .layer(cors)
.with_state(db) .with_state(db)

View file

@ -0,0 +1,51 @@
use axum::extract::State;
use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::TcpListener;
use tokio::task;
use crate::{handler, model};
pub async fn listen_socket(db: model::DB) {
let binding = dotenvy::var("BIND_TCP").unwrap_or(String::from("127.0.0.1:1235"));
tracing::debug!("Trying to bind at {binding}");
let listener = TcpListener::bind(binding)
.await
.expect("Failed to listen for direct connection.");
loop {
match listener.accept().await {
Ok((socket, addr)) => {
let db_socket = db.clone();
let mut buf_reader = BufReader::new(socket);
let mut buf = String::new();
tracing::debug!("Accepted {addr:?}");
task::spawn(async move {
loop {
buf.clear();
match buf_reader.read_line(&mut buf).await {
Ok(0) => break,
Ok(_) => (),
Err(e) => tracing::error!("Error while reading tcp stream: {e:?}"),
};
match model::DMXBeamChange::deserialize(
&mut serde_json::Deserializer::from_str(&buf),
) {
Ok(data) => handler::edit_motor_value_handler(
State(db_socket.clone()),
axum::Json(data),
)
.await
.unwrap_or(()),
Err(e) => tracing::error!("Error reading incoming data: {e:?}"),
}
}
tracing::debug!("Closing tcp connection {addr:?}");
});
}
Err(e) => {
tracing::error!("Failed to get client: {e:?}");
}
};
}
}

View file

@ -0,0 +1,32 @@
use std::fs::File;
use serde_json::to_writer;
use tokio::time::{sleep, Duration};
use crate::model;
pub async fn record_state(db_to_save: model::DB) {
loop {
sleep(Duration::from_millis(1000)).await;
{
let save_path = &db_to_save.static_state.save_path;
let file = File::create(save_path);
match file {
Ok(f) => {
let db = db_to_save.mut_state.read().await;
match to_writer(f, &db.dmx) {
Ok(()) => tracing::trace!("Saved data at {save_path}"),
Err(e) => {
tracing::debug!("Failed to save data: {e:?}");
()
}
}
}
Err(e) => {
tracing::debug!("Failed to save data: {e:?}");
()
}
}
}
}
}

View file

@ -6,6 +6,6 @@ pkgs.mkShell {
pkgs.cargo-edit pkgs.cargo-edit
pkgs.rustc pkgs.rustc
pkgs.rust-analyzer pkgs.rust-analyzer
pkgs.rustfmt (pkgs.rustfmt.override { asNightly = true; })
]; ];
} }