feat(ircv3): add simple internal batch processing

The BATCH feature is a form of a server-suggested message processing
delays.

To implement this, we can record all inflight batches, handle start/end
in handle_batch and collect all pending messages in the entrypoint of
the message handler as long as they have a correct batch id.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
Raito Bezarius 2024-10-11 12:29:26 +02:00 committed by Ryan Lahfa
parent 1389b438e4
commit b1fc58ac42
3 changed files with 73 additions and 1 deletions

View file

@ -24,9 +24,10 @@ members = [ "./", "irc-proto/" ]
[features]
default = ["ctcp", "tls-native", "channel-lists", "toml_config"]
default = ["ctcp", "tls-native", "channel-lists", "batch", "toml_config"]
ctcp = []
channel-lists = []
batch = []
json_config = ["serde", "serde/derive", "serde_derive", "serde_json"]
toml_config = ["serde", "serde/derive", "serde_derive", "toml"]

View file

@ -76,6 +76,7 @@ use crate::{
},
error,
proto::{
BatchSubCommand,
mode::ModeType,
CapSubCommand::{END, LS, REQ},
Capability, ChannelMode, Command,
@ -498,6 +499,8 @@ struct ClientState {
config: Config,
/// A thread-safe map of channels to the list of users in them.
chanlists: RwLock<HashMap<String, Vec<User>>>,
/// A thread-safe map of in-progress batch
inflight_batches: RwLock<HashMap<String, Vec<Message>>>,
/// A thread-safe index to track the current alternative nickname being used.
alt_nick_index: RwLock<usize>,
/// Default ghost sequence to send if one is required but none is configured.
@ -509,6 +512,7 @@ impl ClientState {
ClientState {
sender,
config,
inflight_batches: RwLock::new(HashMap::new()),
chanlists: RwLock::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
default_ghost_sequence: vec![String::from("GHOST")],
@ -553,6 +557,30 @@ impl ClientState {
/// Handles received messages internally for basic client functionality.
fn handle_message(&self, msg: &Message) -> error::Result<()> {
log::trace!("[RECV] {}", msg.to_string());
if let Some(tags) = msg.tags {
if let Some(batch_tag) = tags.into_iter().find(|tag| tag.0 == "batch") {
match batch_tag.1 {
Some(batch_id) => {
let inflight_batches = self.inflight_batches.read();
// TODO: check if we negotiated batch as well.
if !inflight_batches.contains_key(&batch_id) {
} else {
// Release the read lock, we are upgrading to a write lock.
drop(inflight_batches);
let mut inflight_batches = self.inflight_batches.write();
// This message processing is delayed until the batch is finished.
inflight_batches.entry(batch_id).and_modify(|messages| messages.push(msg.clone()));
return Ok(());
}
},
None => {
// TODO: Return an invalid message error.
}
}
}
}
match msg.command {
JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan),
PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan),
@ -579,6 +607,7 @@ impl ClientState {
}
}
}
Command::BATCH(ref reference_tag, ref sub, ref params) => self.handle_batch(reference_tag, sub.as_ref(), params.as_ref())?,
Command::Response(Response::RPL_NAMREPLY, ref args) => self.handle_namreply(args),
Command::Response(Response::RPL_ENDOFMOTD, _)
| Command::Response(Response::ERR_NOMOTD, _) => {
@ -672,6 +701,40 @@ impl ClientState {
}
}
#[cfg(not(feature = "batch"))]
fn handle_batch(&self, _: &str, _: Option<&BatchSubCommand>, _: Option<&Vec<String>>) -> error::Result<()> {}
#[cfg(feature = "batch")]
fn handle_batch(&self, reference_tag: &str, sub: Option<&BatchSubCommand>, params: Option<&Vec<String>>) -> error::Result<()> {
// TODO: increase type safety here.
let is_start = reference_tag.chars().nth(0).unwrap() == '+';
let mut inflight_batches = self.inflight_batches.write();
// TODO: handle nested batches better.
// TODO: handling sub commands such as netsplit and netjoin could be done by having extra
// handlers for end users to warn them of an incoming netsplit or netjoin batch.
// If this is chathistory, handle_chathistory could also be designed.
let identifier = reference_tag[1..].to_string();
if is_start {
if inflight_batches.contains_key(&identifier) {
return Err(error::Error::BatchAlreadyExists(identifier));
}
// Create a new pending batch.
inflight_batches.insert(identifier, Vec::new());
} else {
// Remove the pending batches and replay all the messages.
let pending_messages = inflight_batches.remove(&reference_tag[1..].to_string()).ok_or(error::Error::BatchDisappearedBeforeEndOfBatchProcessed)?;
// Replay all delayed messages now.
for message in pending_messages {
self.handle_message(&message)?;
}
}
Ok(())
}
#[cfg(not(feature = "channel-lists"))]
fn handle_join(&self, _: &str, _: &str) {}

View file

@ -123,6 +123,14 @@ pub enum Error {
/// Stream has already been configured.
#[error("stream has already been configured")]
StreamAlreadyConfigured,
/// A end of batch message was sent after a pending batch was removed from the inflight list
#[error("batch disappeared before end of batch was processed")]
BatchDisappearedBeforeEndOfBatchProcessed,
/// A new batch was started with the same reference tag
#[error("batch {} already exists but a new batch request with the same tag was sent", .0)]
BatchAlreadyExists(String),
}
/// Errors that occur with configurations.