Compare commits
4 commits
2067df8968
...
700d967ff3
Author | SHA1 | Date | |
---|---|---|---|
|
700d967ff3 | ||
|
84aaef1900 | ||
|
9124550ad0 | ||
|
a02bb292be |
10 changed files with 165 additions and 135 deletions
1
backend/.gitignore
vendored
Normal file
1
backend/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
/data.json
|
3
backend/rustfmt.toml
Normal file
3
backend/rustfmt.toml
Normal file
|
@ -0,0 +1,3 @@
|
|||
unstable_features = true
|
||||
group_imports = "StdExternalCrate"
|
||||
imports_granularity = "Module"
|
|
@ -1,14 +1,14 @@
|
|||
use crate::model::DB;
|
||||
use axum::{
|
||||
extract::{Request, State},
|
||||
http::StatusCode,
|
||||
middleware::Next,
|
||||
response::Response,
|
||||
};
|
||||
use axum_extra::{headers, typed_header::TypedHeader};
|
||||
use axum::extract::{Request, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::middleware::Next;
|
||||
use axum::response::Response;
|
||||
use axum_extra::headers;
|
||||
use axum_extra::typed_header::TypedHeader;
|
||||
use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::model::DB;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct Claims {
|
||||
sub: String,
|
||||
|
@ -32,7 +32,7 @@ fn check_token(token: &str, jwt: &str) -> Option<User> {
|
|||
match decoded_token {
|
||||
Ok(token_data) => {
|
||||
let user = token_data.claims.user;
|
||||
let is_cof =token_data.claims.is_cof;
|
||||
let is_cof = token_data.claims.is_cof;
|
||||
if token_data.claims.scope == "modify" {
|
||||
tracing::info!("Successful auth {user:?}");
|
||||
Some(User { name: user, is_cof })
|
||||
|
@ -41,7 +41,10 @@ fn check_token(token: &str, jwt: &str) -> Option<User> {
|
|||
None
|
||||
}
|
||||
}
|
||||
Err(err) => {tracing::debug!("Failed decoding token: {err:?}"); None},
|
||||
Err(err) => {
|
||||
tracing::debug!("Failed decoding token: {err:?}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,9 +72,9 @@ pub async fn jwt_middleware_cof(
|
|||
let token = auth.token();
|
||||
if let Some(user) = check_token(token, &state.static_state.jwt_key) {
|
||||
if user.is_cof {
|
||||
request.extensions_mut().insert(user);
|
||||
return Ok(next.run(request).await)
|
||||
request.extensions_mut().insert(user);
|
||||
return Ok(next.run(request).await);
|
||||
};
|
||||
};
|
||||
Err(StatusCode::FORBIDDEN)
|
||||
Err(StatusCode::FORBIDDEN)
|
||||
}
|
||||
|
|
|
@ -1,15 +1,16 @@
|
|||
use crate::authorization::User;
|
||||
use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColorAtom, DMXColor, DB};
|
||||
use axum::{
|
||||
debug_handler,
|
||||
extract::{Path, State},
|
||||
http::StatusCode,
|
||||
response::{sse::Event, IntoResponse, Sse},
|
||||
Extension, Json,
|
||||
};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_stream::{self as stream};
|
||||
|
||||
use axum::extract::{Path, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::sse::Event;
|
||||
use axum::response::{IntoResponse, Sse};
|
||||
use axum::{debug_handler, Extension, Json};
|
||||
use tokio_stream::{
|
||||
StreamExt, {self as stream},
|
||||
};
|
||||
|
||||
use crate::authorization::User;
|
||||
use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColor, DMXColorAtom, DB};
|
||||
|
||||
#[debug_handler]
|
||||
pub async fn healthcheck_handler() -> impl IntoResponse {
|
||||
|
@ -33,7 +34,12 @@ pub async fn batch_edit_value_handler(
|
|||
Json(body): Json<Vec<DMXColorAtom>>,
|
||||
) -> Result<(), StatusCode> {
|
||||
let mut lock = db.mut_state.write().await;
|
||||
if lock.ratelimit_info.get(&user).map(|d| d.elapsed() <= Duration::from_millis(500)).unwrap_or(false){
|
||||
if lock
|
||||
.ratelimit_info
|
||||
.get(&user)
|
||||
.map(|d| d.elapsed() <= Duration::from_millis(500))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Err(StatusCode::TOO_MANY_REQUESTS);
|
||||
}
|
||||
for i in &body {
|
||||
|
@ -103,13 +109,11 @@ pub async fn edit_motor_value_handler(
|
|||
let _ = db
|
||||
.static_state
|
||||
.change_channel
|
||||
.send(DMXAtom::Motor(lock.dmx.motor))
|
||||
;
|
||||
.send(DMXAtom::Motor(lock.dmx.motor));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
#[debug_handler]
|
||||
pub async fn sse_handler(State(db): State<DB>) -> impl IntoResponse {
|
||||
let rx = db.static_state.change_channel.subscribe();
|
||||
|
@ -120,12 +124,16 @@ pub async fn sse_handler(State(db): State<DB>) -> impl IntoResponse {
|
|||
data = lock.dmx.colors.clone();
|
||||
motor = lock.dmx.motor.clone();
|
||||
}
|
||||
let init_data = data.into_iter().enumerate().map(|(i, x)| {
|
||||
Ok(DMXAtom::Color(DMXColorAtom {
|
||||
address: i,
|
||||
value: x,
|
||||
}))
|
||||
}).chain(std::iter::once(Ok(DMXAtom::Motor(motor))));
|
||||
let init_data = data
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, x)| {
|
||||
Ok(DMXAtom::Color(DMXColorAtom {
|
||||
address: i,
|
||||
value: x,
|
||||
}))
|
||||
})
|
||||
.chain(std::iter::once(Ok(DMXAtom::Motor(motor))));
|
||||
let init = stream::iter(init_data);
|
||||
let stream = init
|
||||
.chain(stream::wrappers::BroadcastStream::new(rx))
|
||||
|
|
|
@ -2,19 +2,24 @@ mod authorization;
|
|||
mod handler;
|
||||
mod model;
|
||||
mod route;
|
||||
mod socket_listener;
|
||||
mod state_recorder;
|
||||
|
||||
use dotenvy;
|
||||
use route::create_router;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
use crate::socket_listener::listen_socket;
|
||||
use crate::state_recorder::record_state;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::registry()
|
||||
.with(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
|
||||
"info,axum::rejection=trace".into()
|
||||
}),
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "info,axum::rejection=trace".into()),
|
||||
)
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
|
@ -23,14 +28,17 @@ async fn main() {
|
|||
_ => (),
|
||||
}
|
||||
|
||||
let binding = dotenvy::var("BIND").unwrap_or(String::from("127.0.0.1:9999"));
|
||||
let db = model::make_db();
|
||||
|
||||
tokio::spawn(listen_socket(db.clone()));
|
||||
tokio::spawn(record_state(db.clone()));
|
||||
|
||||
let binding = dotenvy::var("BIND_HTTP").unwrap_or(String::from("127.0.0.1:9999"));
|
||||
let bind = binding.trim();
|
||||
tracing::debug!("Trying to bind at {bind}");
|
||||
let listener = tokio::net::TcpListener::bind(bind).await.unwrap();
|
||||
let app = create_router()
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
);
|
||||
|
||||
let app = create_router(db.clone()).layer(TraceLayer::new_for_http());
|
||||
tracing::info!("🚀 Server started successfully");
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
}
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
use crate::authorization::User;
|
||||
use dotenvy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use dotenvy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
|
||||
use crate::authorization::User;
|
||||
|
||||
#[derive(Debug, Default, Deserialize, Serialize, Copy, Clone)]
|
||||
pub struct DMXColor {
|
||||
pub red: u8,
|
||||
|
@ -123,7 +125,7 @@ pub fn make_db() -> DB {
|
|||
.as_ref(),
|
||||
)
|
||||
.expect("an JSON string"),
|
||||
change_channel: broadcast::Sender::new(512),
|
||||
change_channel: broadcast::Sender::new(5 * state_size),
|
||||
save_path,
|
||||
},
|
||||
mut_state,
|
||||
|
|
|
@ -1,93 +1,16 @@
|
|||
use crate::authorization::{ jwt_middleware, jwt_middleware_cof };
|
||||
use crate::handler;
|
||||
use crate::model;
|
||||
use axum::{handler::Handler, middleware};
|
||||
use axum::{routing::get, Router};
|
||||
use serde_json::to_writer;
|
||||
use std::fs::File;
|
||||
use tokio::task;
|
||||
use tokio::time::{sleep, Duration};
|
||||
use axum::handler::Handler;
|
||||
use axum::routing::get;
|
||||
use axum::{middleware, Router};
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
|
||||
use axum::extract::State;
|
||||
use serde::Deserialize;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
pub fn create_router() -> Router {
|
||||
let db = model::make_db();
|
||||
use crate::authorization::{jwt_middleware, jwt_middleware_cof};
|
||||
use crate::{handler, model};
|
||||
|
||||
pub fn create_router(db: model::DB) -> Router {
|
||||
let cors = CorsLayer::new()
|
||||
// allow requests from any origin
|
||||
.allow_origin(Any)
|
||||
.allow_headers(Any);
|
||||
let db_to_save = db.clone();
|
||||
task::spawn(async move {
|
||||
loop {
|
||||
sleep(Duration::from_millis(1000)).await;
|
||||
{
|
||||
let save_path = &db_to_save.static_state.save_path;
|
||||
let file = File::create(save_path);
|
||||
match file {
|
||||
Ok(f) => {
|
||||
let db = db_to_save.mut_state.read().await;
|
||||
match to_writer(f, &db.dmx) {
|
||||
Ok(()) => tracing::trace!("Saved data at {save_path}"),
|
||||
Err(e) => {
|
||||
tracing::debug!("Failed to save data: {e:?}");
|
||||
()
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::debug!("Failed to save data: {e:?}");
|
||||
()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let db_listener = db.clone();
|
||||
task::spawn(async move {
|
||||
//TODO: parametrisation
|
||||
tracing::debug!("Trying to bind at 10.10.10.13:1235");
|
||||
let listener = TcpListener::bind("10.10.10.1:1235")
|
||||
.await
|
||||
.expect("Failed to listen for direct connection.");
|
||||
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((socket, addr)) => {
|
||||
let db_socket = db_listener.clone();
|
||||
let mut buf_reader = BufReader::new(socket);
|
||||
let mut buf = String::new();
|
||||
tracing::debug!("Accepted {addr:?}");
|
||||
task::spawn(async move {
|
||||
loop {
|
||||
buf.clear();
|
||||
let _ = buf_reader.read_line(&mut buf).await;
|
||||
match model::DMXBeamChange::deserialize(
|
||||
&mut serde_json::Deserializer::from_str(&buf),
|
||||
) {
|
||||
Ok(data) => handler::edit_motor_value_handler(
|
||||
State(db_socket.clone()),
|
||||
axum::Json(data),
|
||||
)
|
||||
.await
|
||||
.unwrap_or(()),
|
||||
Err(e) => tracing::error!("Error reading incoming data: {e:?}"),
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
task::spawn(async move { tracing::error!("Failed to get client: {e:?}") })
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
Router::new()
|
||||
.route("/api/healthcheck", get(handler::healthcheck_handler))
|
||||
|
@ -108,10 +31,9 @@ pub fn create_router() -> Router {
|
|||
)
|
||||
.route(
|
||||
"/api/control-box",
|
||||
get(handler::get_motor_value_handler).post(
|
||||
handler::edit_motor_value_handler
|
||||
.layer(middleware::from_fn_with_state(db.clone(), jwt_middleware_cof)),
|
||||
),
|
||||
get(handler::get_motor_value_handler).post(handler::edit_motor_value_handler.layer(
|
||||
middleware::from_fn_with_state(db.clone(), jwt_middleware_cof),
|
||||
)),
|
||||
)
|
||||
.layer(cors)
|
||||
.with_state(db)
|
||||
|
|
51
backend/src/socket_listener.rs
Normal file
51
backend/src/socket_listener.rs
Normal file
|
@ -0,0 +1,51 @@
|
|||
use axum::extract::State;
|
||||
use serde::Deserialize;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task;
|
||||
|
||||
use crate::{handler, model};
|
||||
|
||||
pub async fn listen_socket(db: model::DB) {
|
||||
let binding = dotenvy::var("BIND_TCP").unwrap_or(String::from("127.0.0.1:1235"));
|
||||
tracing::debug!("Trying to bind at {binding}");
|
||||
let listener = TcpListener::bind(binding)
|
||||
.await
|
||||
.expect("Failed to listen for direct connection.");
|
||||
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((socket, addr)) => {
|
||||
let db_socket = db.clone();
|
||||
let mut buf_reader = BufReader::new(socket);
|
||||
let mut buf = String::new();
|
||||
tracing::debug!("Accepted {addr:?}");
|
||||
task::spawn(async move {
|
||||
loop {
|
||||
buf.clear();
|
||||
match buf_reader.read_line(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(_) => (),
|
||||
Err(e) => tracing::error!("Error while reading tcp stream: {e:?}"),
|
||||
};
|
||||
match model::DMXBeamChange::deserialize(
|
||||
&mut serde_json::Deserializer::from_str(&buf),
|
||||
) {
|
||||
Ok(data) => handler::edit_motor_value_handler(
|
||||
State(db_socket.clone()),
|
||||
axum::Json(data),
|
||||
)
|
||||
.await
|
||||
.unwrap_or(()),
|
||||
Err(e) => tracing::error!("Error reading incoming data: {e:?}"),
|
||||
}
|
||||
}
|
||||
tracing::debug!("Closing tcp connection {addr:?}");
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get client: {e:?}");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
32
backend/src/state_recorder.rs
Normal file
32
backend/src/state_recorder.rs
Normal file
|
@ -0,0 +1,32 @@
|
|||
use std::fs::File;
|
||||
|
||||
use serde_json::to_writer;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
use crate::model;
|
||||
|
||||
pub async fn record_state(db_to_save: model::DB) {
|
||||
loop {
|
||||
sleep(Duration::from_millis(1000)).await;
|
||||
{
|
||||
let save_path = &db_to_save.static_state.save_path;
|
||||
let file = File::create(save_path);
|
||||
match file {
|
||||
Ok(f) => {
|
||||
let db = db_to_save.mut_state.read().await;
|
||||
match to_writer(f, &db.dmx) {
|
||||
Ok(()) => tracing::trace!("Saved data at {save_path}"),
|
||||
Err(e) => {
|
||||
tracing::debug!("Failed to save data: {e:?}");
|
||||
()
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::debug!("Failed to save data: {e:?}");
|
||||
()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,6 +6,6 @@ pkgs.mkShell {
|
|||
pkgs.cargo-edit
|
||||
pkgs.rustc
|
||||
pkgs.rust-analyzer
|
||||
pkgs.rustfmt
|
||||
(pkgs.rustfmt.override { asNightly = true; })
|
||||
];
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue