fix(backend): tcp error on closing + fmt
This commit is contained in:
parent
84aaef1900
commit
700d967ff3
7 changed files with 111 additions and 95 deletions
|
@ -1,14 +1,14 @@
|
|||
use crate::model::DB;
|
||||
use axum::{
|
||||
extract::{Request, State},
|
||||
http::StatusCode,
|
||||
middleware::Next,
|
||||
response::Response,
|
||||
};
|
||||
use axum_extra::{headers, typed_header::TypedHeader};
|
||||
use axum::extract::{Request, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::middleware::Next;
|
||||
use axum::response::Response;
|
||||
use axum_extra::headers;
|
||||
use axum_extra::typed_header::TypedHeader;
|
||||
use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::model::DB;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct Claims {
|
||||
sub: String,
|
||||
|
@ -32,7 +32,7 @@ fn check_token(token: &str, jwt: &str) -> Option<User> {
|
|||
match decoded_token {
|
||||
Ok(token_data) => {
|
||||
let user = token_data.claims.user;
|
||||
let is_cof =token_data.claims.is_cof;
|
||||
let is_cof = token_data.claims.is_cof;
|
||||
if token_data.claims.scope == "modify" {
|
||||
tracing::info!("Successful auth {user:?}");
|
||||
Some(User { name: user, is_cof })
|
||||
|
@ -41,7 +41,10 @@ fn check_token(token: &str, jwt: &str) -> Option<User> {
|
|||
None
|
||||
}
|
||||
}
|
||||
Err(err) => {tracing::debug!("Failed decoding token: {err:?}"); None},
|
||||
Err(err) => {
|
||||
tracing::debug!("Failed decoding token: {err:?}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,9 +72,9 @@ pub async fn jwt_middleware_cof(
|
|||
let token = auth.token();
|
||||
if let Some(user) = check_token(token, &state.static_state.jwt_key) {
|
||||
if user.is_cof {
|
||||
request.extensions_mut().insert(user);
|
||||
return Ok(next.run(request).await)
|
||||
request.extensions_mut().insert(user);
|
||||
return Ok(next.run(request).await);
|
||||
};
|
||||
};
|
||||
Err(StatusCode::FORBIDDEN)
|
||||
Err(StatusCode::FORBIDDEN)
|
||||
}
|
||||
|
|
|
@ -1,15 +1,16 @@
|
|||
use crate::authorization::User;
|
||||
use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColorAtom, DMXColor, DB};
|
||||
use axum::{
|
||||
debug_handler,
|
||||
extract::{Path, State},
|
||||
http::StatusCode,
|
||||
response::{sse::Event, IntoResponse, Sse},
|
||||
Extension, Json,
|
||||
};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_stream::{self as stream};
|
||||
|
||||
use axum::extract::{Path, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::sse::Event;
|
||||
use axum::response::{IntoResponse, Sse};
|
||||
use axum::{debug_handler, Extension, Json};
|
||||
use tokio_stream::{
|
||||
StreamExt, {self as stream},
|
||||
};
|
||||
|
||||
use crate::authorization::User;
|
||||
use crate::model::{ColorArray, DMXAtom, DMXBeam, DMXBeamChange, DMXColor, DMXColorAtom, DB};
|
||||
|
||||
#[debug_handler]
|
||||
pub async fn healthcheck_handler() -> impl IntoResponse {
|
||||
|
@ -33,7 +34,12 @@ pub async fn batch_edit_value_handler(
|
|||
Json(body): Json<Vec<DMXColorAtom>>,
|
||||
) -> Result<(), StatusCode> {
|
||||
let mut lock = db.mut_state.write().await;
|
||||
if lock.ratelimit_info.get(&user).map(|d| d.elapsed() <= Duration::from_millis(500)).unwrap_or(false){
|
||||
if lock
|
||||
.ratelimit_info
|
||||
.get(&user)
|
||||
.map(|d| d.elapsed() <= Duration::from_millis(500))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Err(StatusCode::TOO_MANY_REQUESTS);
|
||||
}
|
||||
for i in &body {
|
||||
|
@ -103,13 +109,11 @@ pub async fn edit_motor_value_handler(
|
|||
let _ = db
|
||||
.static_state
|
||||
.change_channel
|
||||
.send(DMXAtom::Motor(lock.dmx.motor))
|
||||
;
|
||||
.send(DMXAtom::Motor(lock.dmx.motor));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
#[debug_handler]
|
||||
pub async fn sse_handler(State(db): State<DB>) -> impl IntoResponse {
|
||||
let rx = db.static_state.change_channel.subscribe();
|
||||
|
@ -120,12 +124,16 @@ pub async fn sse_handler(State(db): State<DB>) -> impl IntoResponse {
|
|||
data = lock.dmx.colors.clone();
|
||||
motor = lock.dmx.motor.clone();
|
||||
}
|
||||
let init_data = data.into_iter().enumerate().map(|(i, x)| {
|
||||
Ok(DMXAtom::Color(DMXColorAtom {
|
||||
address: i,
|
||||
value: x,
|
||||
}))
|
||||
}).chain(std::iter::once(Ok(DMXAtom::Motor(motor))));
|
||||
let init_data = data
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, x)| {
|
||||
Ok(DMXAtom::Color(DMXColorAtom {
|
||||
address: i,
|
||||
value: x,
|
||||
}))
|
||||
})
|
||||
.chain(std::iter::once(Ok(DMXAtom::Motor(motor))));
|
||||
let init = stream::iter(init_data);
|
||||
let stream = init
|
||||
.chain(stream::wrappers::BroadcastStream::new(rx))
|
||||
|
|
|
@ -2,23 +2,24 @@ mod authorization;
|
|||
mod handler;
|
||||
mod model;
|
||||
mod route;
|
||||
mod state_recorder;
|
||||
mod socket_listener;
|
||||
mod state_recorder;
|
||||
|
||||
use dotenvy;
|
||||
use crate::state_recorder::record_state;
|
||||
use crate::socket_listener::listen_socket;
|
||||
use route::create_router;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
use crate::socket_listener::listen_socket;
|
||||
use crate::state_recorder::record_state;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::registry()
|
||||
.with(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
|
||||
"info,axum::rejection=trace".into()
|
||||
}),
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "info,axum::rejection=trace".into()),
|
||||
)
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
|
@ -37,11 +38,7 @@ async fn main() {
|
|||
tracing::debug!("Trying to bind at {bind}");
|
||||
let listener = tokio::net::TcpListener::bind(bind).await.unwrap();
|
||||
|
||||
|
||||
let app = create_router(db.clone())
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
);
|
||||
let app = create_router(db.clone()).layer(TraceLayer::new_for_http());
|
||||
tracing::info!("🚀 Server started successfully");
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
}
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
use crate::authorization::User;
|
||||
use dotenvy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use dotenvy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
|
||||
use crate::authorization::User;
|
||||
|
||||
#[derive(Debug, Default, Deserialize, Serialize, Copy, Clone)]
|
||||
pub struct DMXColor {
|
||||
pub red: u8,
|
||||
|
@ -123,7 +125,7 @@ pub fn make_db() -> DB {
|
|||
.as_ref(),
|
||||
)
|
||||
.expect("an JSON string"),
|
||||
change_channel: broadcast::Sender::new(5*state_size),
|
||||
change_channel: broadcast::Sender::new(5 * state_size),
|
||||
save_path,
|
||||
},
|
||||
mut_state,
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
use crate::authorization::{jwt_middleware, jwt_middleware_cof};
|
||||
use crate::handler;
|
||||
use crate::model;
|
||||
use axum::{handler::Handler, middleware};
|
||||
use axum::{routing::get, Router};
|
||||
use axum::handler::Handler;
|
||||
use axum::routing::get;
|
||||
use axum::{middleware, Router};
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
|
||||
use crate::authorization::{jwt_middleware, jwt_middleware_cof};
|
||||
use crate::{handler, model};
|
||||
|
||||
pub fn create_router(db: model::DB) -> Router {
|
||||
let cors = CorsLayer::new()
|
||||
// allow requests from any origin
|
||||
|
|
|
@ -1,47 +1,51 @@
|
|||
use tokio::task;
|
||||
|
||||
use axum::extract::State;
|
||||
use serde::Deserialize;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::net::TcpListener;
|
||||
use crate::{handler, model };
|
||||
use tokio::task;
|
||||
|
||||
use crate::{handler, model};
|
||||
|
||||
pub async fn listen_socket(db: model::DB) {
|
||||
let binding = dotenvy::var("BIND_TCP").unwrap_or(String::from("127.0.0.1:1235"));
|
||||
tracing::debug!("Trying to bind at {binding}");
|
||||
let listener = TcpListener::bind(binding)
|
||||
.await
|
||||
.expect("Failed to listen for direct connection.");
|
||||
let binding = dotenvy::var("BIND_TCP").unwrap_or(String::from("127.0.0.1:1235"));
|
||||
tracing::debug!("Trying to bind at {binding}");
|
||||
let listener = TcpListener::bind(binding)
|
||||
.await
|
||||
.expect("Failed to listen for direct connection.");
|
||||
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((socket, addr)) => {
|
||||
let db_socket = db.clone();
|
||||
let mut buf_reader = BufReader::new(socket);
|
||||
let mut buf = String::new();
|
||||
tracing::debug!("Accepted {addr:?}");
|
||||
task::spawn(async move {
|
||||
loop {
|
||||
buf.clear();
|
||||
let _ = buf_reader.read_line(&mut buf).await;
|
||||
match model::DMXBeamChange::deserialize(
|
||||
&mut serde_json::Deserializer::from_str(&buf),
|
||||
) {
|
||||
Ok(data) => handler::edit_motor_value_handler(
|
||||
State(db_socket.clone()),
|
||||
axum::Json(data),
|
||||
)
|
||||
.await
|
||||
.unwrap_or(()),
|
||||
Err(e) => tracing::error!("Error reading incoming data: {e:?}"),
|
||||
}
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((socket, addr)) => {
|
||||
let db_socket = db.clone();
|
||||
let mut buf_reader = BufReader::new(socket);
|
||||
let mut buf = String::new();
|
||||
tracing::debug!("Accepted {addr:?}");
|
||||
task::spawn(async move {
|
||||
loop {
|
||||
buf.clear();
|
||||
match buf_reader.read_line(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(_) => (),
|
||||
Err(e) => tracing::error!("Error while reading tcp stream: {e:?}"),
|
||||
};
|
||||
match model::DMXBeamChange::deserialize(
|
||||
&mut serde_json::Deserializer::from_str(&buf),
|
||||
) {
|
||||
Ok(data) => handler::edit_motor_value_handler(
|
||||
State(db_socket.clone()),
|
||||
axum::Json(data),
|
||||
)
|
||||
.await
|
||||
.unwrap_or(()),
|
||||
Err(e) => tracing::error!("Error reading incoming data: {e:?}"),
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get client: {e:?}");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
tracing::debug!("Closing tcp connection {addr:?}");
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get client: {e:?}");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use std::fs::File;
|
||||
|
||||
use serde_json::to_writer;
|
||||
use std::fs::File;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
use crate::model;
|
||||
|
||||
pub async fn record_state(db_to_save: model::DB) {
|
||||
|
|
Loading…
Reference in a new issue