From 97c9efb151b8e0d7aa9b065f84ec10a10d11308d Mon Sep 17 00:00:00 2001 From: Raito Bezarius Date: Fri, 11 Oct 2024 12:29:26 +0200 Subject: [PATCH] feat(ircv3): add simple internal batch processing The BATCH feature is a form of a server-suggested message processing delays. To implement this, we can record all inflight batches, handle start/end in handle_batch and collect all pending messages in the entrypoint of the message handler as long as they have a correct batch id. Signed-off-by: Raito Bezarius --- Cargo.toml | 3 ++- src/client/mod.rs | 63 +++++++++++++++++++++++++++++++++++++++++++++++ src/error.rs | 8 ++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6d20f66..7bb7a5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,9 +24,10 @@ members = [ "./", "irc-proto/" ] [features] -default = ["ctcp", "tls-native", "channel-lists", "toml_config"] +default = ["ctcp", "tls-native", "channel-lists", "batch", "toml_config"] ctcp = [] channel-lists = [] +batch = [] json_config = ["serde", "serde/derive", "serde_derive", "serde_json"] toml_config = ["serde", "serde/derive", "serde_derive", "toml"] diff --git a/src/client/mod.rs b/src/client/mod.rs index 0fffaee..74c06bc 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -76,6 +76,7 @@ use crate::{ }, error, proto::{ + BatchSubCommand, mode::ModeType, CapSubCommand::{END, LS, REQ}, Capability, ChannelMode, Command, @@ -498,6 +499,8 @@ struct ClientState { config: Config, /// A thread-safe map of channels to the list of users in them. chanlists: RwLock>>, + /// A thread-safe map of in-progress batch + inflight_batches: RwLock>>, /// A thread-safe index to track the current alternative nickname being used. alt_nick_index: RwLock, /// Default ghost sequence to send if one is required but none is configured. @@ -509,6 +512,7 @@ impl ClientState { ClientState { sender, config, + inflight_batches: RwLock::new(HashMap::new()), chanlists: RwLock::new(HashMap::new()), alt_nick_index: RwLock::new(0), default_ghost_sequence: vec![String::from("GHOST")], @@ -553,6 +557,30 @@ impl ClientState { /// Handles received messages internally for basic client functionality. fn handle_message(&self, msg: &Message) -> error::Result<()> { log::trace!("[RECV] {}", msg.to_string()); + + if let Some(tags) = msg.tags { + if let Some(batch_tag) = tags.into_iter().find(|tag| tag.0 == "batch") { + match batch_tag.1 { + Some(batch_id) => { + let inflight_batches = self.inflight_batches.read(); + // TODO: check if we negotiated batch as well. + if !inflight_batches.contains_key(&batch_id) { + } else { + // Release the read lock, we are upgrading to a write lock. + drop(inflight_batches); + let mut inflight_batches = self.inflight_batches.write(); + // This message processing is delayed until the batch is finished. + inflight_batches.entry(batch_id).and_modify(|messages| messages.push(msg.clone())); + return Ok(()); + } + }, + None => { + // TODO: Return an invalid message error. + } + } + } + } + match msg.command { JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan), PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan), @@ -579,6 +607,7 @@ impl ClientState { } } } + Command::BATCH(ref reference_tag, ref sub, ref params) => self.handle_batch(reference_tag, sub.as_ref(), params.as_ref())?, Command::Response(Response::RPL_NAMREPLY, ref args) => self.handle_namreply(args), Command::Response(Response::RPL_ENDOFMOTD, _) | Command::Response(Response::ERR_NOMOTD, _) => { @@ -672,6 +701,40 @@ impl ClientState { } } + #[cfg(not(feature = "batch"))] + fn handle_batch(&self, _: &str, _: Option<&BatchSubCommand>, _: Option<&Vec>) -> error::Result<()> {} + + #[cfg(feature = "batch")] + fn handle_batch(&self, reference_tag: &str, sub: Option<&BatchSubCommand>, params: Option<&Vec>) -> error::Result<()> { + // TODO: increase type safety here. + let is_start = reference_tag.chars().nth(0).unwrap() == '+'; + let mut inflight_batches = self.inflight_batches.write(); + + // TODO: handle nested batches better. + // TODO: handling sub commands such as netsplit and netjoin could be done by having extra + // handlers for end users to warn them of an incoming netsplit or netjoin batch. + // If this is chathistory, handle_chathistory could also be designed. + + let identifier = reference_tag[1..].to_string(); + if is_start { + if inflight_batches.contains_key(&identifier) { + return Err(error::Error::BatchAlreadyExists(identifier)); + } + + // Create a new pending batch. + inflight_batches.insert(identifier, Vec::new()); + } else { + // Remove the pending batches and replay all the messages. + let pending_messages = inflight_batches.remove(&reference_tag[1..].to_string()).ok_or(error::Error::BatchDisappearedBeforeEndOfBatchProcessed)?; + // Replay all delayed messages now. + for message in pending_messages { + self.handle_message(&message)?; + } + } + + Ok(()) + } + #[cfg(not(feature = "channel-lists"))] fn handle_join(&self, _: &str, _: &str) {} diff --git a/src/error.rs b/src/error.rs index 391340d..5ff356b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -123,6 +123,14 @@ pub enum Error { /// Stream has already been configured. #[error("stream has already been configured")] StreamAlreadyConfigured, + + /// A end of batch message was sent after a pending batch was removed from the inflight list + #[error("batch disappeared before end of batch was processed")] + BatchDisappearedBeforeEndOfBatchProcessed, + + /// A new batch was started with the same reference tag + #[error("batch {} already exists but a new batch request with the same tag was sent", .0)] + BatchAlreadyExists(String), } /// Errors that occur with configurations.