client-batch #4

Open
rlahfa wants to merge 3 commits from client-batch into develop
7 changed files with 422 additions and 2 deletions

View file

@ -24,9 +24,10 @@ members = [ "./", "irc-proto/" ]
[features] [features]
default = ["ctcp", "tls-native", "channel-lists", "toml_config"] default = ["ctcp", "tls-native", "channel-lists", "batch", "toml_config"]
ctcp = [] ctcp = []
channel-lists = [] channel-lists = []
batch = []
json_config = ["serde", "serde/derive", "serde_derive", "serde_json"] json_config = ["serde", "serde/derive", "serde_derive", "serde_json"]
toml_config = ["serde", "serde/derive", "serde_derive", "toml"] toml_config = ["serde", "serde/derive", "serde_derive", "toml"]
@ -70,6 +71,7 @@ tokio-rustls = { version = "0.24.0", features = ["dangerous_configuration"], opt
rustls-pemfile = { version = "1.0.2", optional = true } rustls-pemfile = { version = "1.0.2", optional = true }
tokio-native-tls = { version = "0.3.1", optional = true } tokio-native-tls = { version = "0.3.1", optional = true }
webpki-roots = { version = "0.23.0", optional = true } webpki-roots = { version = "0.23.0", optional = true }
atomic-counter = "1.0.1"
[dev-dependencies] [dev-dependencies]

View file

@ -5,6 +5,7 @@ use crate::chan::ChannelExt;
use crate::error::MessageParseError; use crate::error::MessageParseError;
use crate::mode::{ChannelMode, Mode, UserMode}; use crate::mode::{ChannelMode, Mode, UserMode};
use crate::response::Response; use crate::response::Response;
use crate::standard_reply::{StandardTypes, StandardCodes};
/// List of all client commands as defined in [RFC 2812](http://tools.ietf.org/html/rfc2812). This /// List of all client commands as defined in [RFC 2812](http://tools.ietf.org/html/rfc2812). This
/// also includes commands from the /// also includes commands from the
@ -200,6 +201,9 @@ pub enum Command {
// Default option. // Default option.
/// An IRC response code with arguments and optional suffix. /// An IRC response code with arguments and optional suffix.
Response(Response, Vec<String>), Response(Response, Vec<String>),
/// https://ircv3.net/specs/extensions/standard-replies
/// [FAIL | WARN | NOTE] <command> <code> [<context>] <description>
StandardResponse(StandardTypes, String, StandardCodes, Vec<String>, String),
/// A raw IRC command unknown to the crate. /// A raw IRC command unknown to the crate.
Raw(String, Vec<String>), Raw(String, Vec<String>),
} }
@ -408,6 +412,10 @@ impl<'a> From<&'a Command> for String {
&format!("{:03}", *resp as u16), &format!("{:03}", *resp as u16),
&a.iter().map(|s| &s[..]).collect::<Vec<_>>(), &a.iter().map(|s| &s[..]).collect::<Vec<_>>(),
), ),
Command::StandardResponse(ref r#type, ref command, ref code, ref args, ref description) => {
match r#type {
}
},
Command::Raw(ref c, ref a) => { Command::Raw(ref c, ref a) => {
stringify(c, &a.iter().map(|s| &s[..]).collect::<Vec<_>>()) stringify(c, &a.iter().map(|s| &s[..]).collect::<Vec<_>>())
} }
@ -958,6 +966,14 @@ impl Command {
} else { } else {
raw(cmd, args) raw(cmd, args)
} }
} else if StandardTypes::is_standard_type(cmd) {
let code = StandardCodes::from_message(args[0], args[1]);
let mut std_args: Vec<String> = args.iter().skip(2).map(|&s| s.to_owned()).collect();
let desc = std_args.pop().ok_or_else(|| MessageParseError::MissingDescriptionInStandardReply)?;
Command::StandardResponse(
StandardTypes::from_str(cmd).map_err(MessageParseError::InvalidStandardReplyType)?,
args[0].to_owned(), code, std_args, desc)
} else if let Ok(resp) = cmd.parse() { } else if let Ok(resp) = cmd.parse() {
Command::Response(resp, args.into_iter().map(|s| s.to_owned()).collect()) Command::Response(resp, args.into_iter().map(|s| s.to_owned()).collect())
} else { } else {
@ -1128,6 +1144,7 @@ mod test {
use super::Command; use super::Command;
use super::Response; use super::Response;
use crate::Message; use crate::Message;
use crate::standard_reply::{StandardTypes, StandardCodes};
#[test] #[test]
fn format_response() { fn format_response() {
@ -1155,4 +1172,20 @@ mod test {
cmd cmd
); );
} }
#[test]
fn parse_standard_reply() {
let msg = "FAIL BOX BOXES_INVALID STACK CLOCKWISE :Given boxes are not supported".parse::<Message>().unwrap();
assert_eq!(
msg.command,
Command::StandardResponse(StandardTypes::Fail, "BOX".to_string(), StandardCodes::Custom("BOXES_INVALID".to_string()), vec!["STACK".to_string(), "CLOCKWISE".to_string()], "Given boxes are not supported".to_string())
);
let msg = "NOTE * OPER_MESSAGE :Registering new accounts and channels has been disabled temporarily while we deal with the spam. Thanks for flying ExampleNet! -dan".parse::<Message>().unwrap();
assert_eq!(
msg.command,
Command::StandardResponse(StandardTypes::Note, "*".to_string(), StandardCodes::Custom("OPER_MESSAGE".to_string()), vec![], "Registering new accounts and channels has been disabled temporarily while we deal with the spam. Thanks for flying ExampleNet! -dan".to_string())
);
}
} }

View file

@ -58,6 +58,14 @@ pub enum MessageParseError {
/// The invalid subcommand. /// The invalid subcommand.
sub: String, sub: String,
}, },
/// The standard reply missed a description
#[error("missing description in standard reply")]
MissingDescriptionInStandardReply,
/// Invalid standard reply type
#[error("invalid standard reply type: {}", .0)]
InvalidStandardReplyType(&'static str)
} }
/// Errors that occur while parsing mode strings. /// Errors that occur while parsing mode strings.

View file

@ -15,6 +15,7 @@ pub mod message;
pub mod mode; pub mod mode;
pub mod prefix; pub mod prefix;
pub mod response; pub mod response;
pub mod standard_reply;
pub use self::caps::{Capability, NegotiationVersion}; pub use self::caps::{Capability, NegotiationVersion};
pub use self::chan::ChannelExt; pub use self::chan::ChannelExt;

View file

@ -0,0 +1,286 @@
use std::str::FromStr;
/// Support for https://ircv3.net/specs/extensions/standard-replies
/// Implements the list of reply codes in the IRCv3 registry: https://ircv3.net/registry
trait FromCode {
fn from_code(code: &str) -> Option<Self> where Self: Sized;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MultilineCodes {
MaxBytes,
MaxLines,
InvalidTarget,
Invalid
}
impl FromCode for MultilineCodes {
fn from_code(code: &str) -> Option<Self> {
Some(match code {
"MULTILINE_MAX_BYTES" => Self::MaxBytes,
"MULTILINE_MAX_LINES" => Self::MaxLines,
"MULTILINE_INVALID_TARGET" => Self::InvalidTarget,
"MULTILINE_INVALID" => Self::Invalid,
_ => return None,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChatHistoryCodes {
InvalidParams,
InvalidTarget,
MessageError,
NeedMoreParams,
UnknownCommand
}
impl FromCode for ChatHistoryCodes {
fn from_code(code: &str) -> Option<Self> {
Some(match code {
"INVALID_PARAMS" => Self::InvalidParams,
"INVALID_TARGET" => Self::InvalidTarget,
"MESSAGE_ERROR" => Self::MessageError,
"NEED_MORE_PARAMS" => Self::NeedMoreParams,
"UNKNOWN_COMMAND" => Self::UnknownCommand,
_ => return None,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinCodes {
ChannelRenamed
}
impl FromCode for JoinCodes {
fn from_code(code: &str) -> Option<Self> {
Some(match code {
"CHANNEL_RENAMED" => Self::ChannelRenamed,
_ => return None
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NickCodes {
Reserved
}
impl FromCode for NickCodes {
fn from_code(code: &str) -> Option<Self> {
Some(match code {
"NICKNAME_RESERVED" => Self::Reserved,
_ => return None
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RedactCodes {
InvalidTarget,
Forbidden,
WindowExpired,
UnknownMsgid,
}
impl FromCode for RedactCodes {
fn from_code(code: &str) -> Option<Self> {
Some(match code {
"INVALID_TARGET" => Self::InvalidTarget,
"REACT_FORBIDDEN" => Self::Forbidden,
"REACT_WINDOW_EXPIRED" => Self::WindowExpired,
"UNKNOWN_MSGID" => Self::UnknownMsgid,
_ => return None
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegisterCodes {
AccountExists,
AccountNameMustBeNick,
AlreadyAuthenticated,
BadAccountName,
CompleteConnectionRequired,
InvalidEmail,
NeedNick,
TemporarilyUnavailable,
UnacceptableEmail,
UnacceptablePassword,
WeakPassword
}
impl FromCode for RegisterCodes {
fn from_code(code: &str) -> Option<Self> {
Some(match code {
"ACCOUNT_EXISTS" => Self::AccountExists,
"ACCOUNT_NAME_MUST_BE_NICK" => Self::AccountNameMustBeNick,
"ALREADY_AUTHENTICATED" => Self::AlreadyAuthenticated,
"BAD_ACCOUNT_NAME" => Self::BadAccountName,
"COMPLETE_CONNECTION_REQUIRED" => Self::CompleteConnectionRequired,
"INVALID_EMAIL" => Self::InvalidEmail,
"NEED_NICK" => Self::NeedNick,
"TEMPORARILY_UNAVAILABLE" => Self::TemporarilyUnavailable,
"UNACCEPTABLE_EMAIL" => Self::UnacceptableEmail,
"UNACCEPTABLE_PASSWORD" => Self::UnacceptablePassword,
"WEAK_PASSWORD" => Self::WeakPassword,
_ => return None,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RenameCodes {
ChannelNameInUse,
CannotRename
}
impl FromCode for RenameCodes {
fn from_code(code: &str) -> Option<Self> {
Some(match code {
"CHANNEL_NAME_IN_USE" => Self::ChannelNameInUse,
"CANNOT_RENAME" => Self::CannotRename,
_ => return None,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SetNameCodes {
CannotChangeRealname,
InvalidRealname
}
impl FromCode for SetNameCodes {
fn from_code(code: &str) -> Option<Self> {
Some(match code {
"CANNOT_CHANGE_REALNAME" => Self::CannotChangeRealname,
"INVALID_REALNAME" => Self::InvalidRealname,
_ => return None,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VerifyCodes {
AlreadyAuthenticated,
InvalidCode,
CompleteConnectionRequired,
TemporarilyUnavailable
}
impl FromCode for VerifyCodes {
fn from_code(code: &str) -> Option<Self> {
Some(match code {
"ALREADY_AUTHENTICATED" => Self::AlreadyAuthenticated,
"INVALID_CODE" => Self::InvalidCode,
"COMPLETE_CONNECTION_REQUIRED" => Self::CompleteConnectionRequired,
"TEMPORARILY_UNAVAILABLE" => Self::TemporarilyUnavailable,
_ => return None,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StandardCodes {
AccountRequired,
InvalidUtf8,
Multiline(MultilineCodes),
ChatHistory(ChatHistoryCodes),
Join(JoinCodes),
Nick(NickCodes),
Redact(RedactCodes),
Register(RegisterCodes),
Rename(RenameCodes),
SetName(SetNameCodes),
Verify(VerifyCodes),
Custom(String)
}
impl StandardCodes {
fn known_from_message(command: &str, code: &str) -> Option<Self> {
Some(match command {
"BATCH" => Self::Multiline(MultilineCodes::from_code(code)?),
"CHATHISTORY" => Self::ChatHistory(ChatHistoryCodes::from_code(code)?),
"JOIN" => Self::Join(JoinCodes::from_code(code)?),
"NICK" => Self::Nick(NickCodes::from_code(code)?),
"REDACT" => Self::Redact(RedactCodes::from_code(code)?),
"REGISTER" => Self::Register(RegisterCodes::from_code(code)?),
"RENAME" => Self::Rename(RenameCodes::from_code(code)?),
"SETNAME" => Self::SetName(SetNameCodes::from_code(code)?),
"VERIFY" => Self::Verify(VerifyCodes::from_code(code)?),
_ => {
match code {
"ACCOUNT_REQUIRED" => Self::AccountRequired,
"INVALID_UTF8" => Self::InvalidUtf8,
_ => Self::Custom(code.to_string())
}
}
})
}
pub fn from_message(command: &str, code: &str) -> Self {
Self::known_from_message(command, code).unwrap_or_else(|| Self::Custom(code.to_string()))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StandardTypes {
Fail,
Warn,
Note
}
impl StandardTypes {
pub fn is_standard_type(s: &str) -> bool {
s.eq_ignore_ascii_case("FAIL") || s.eq_ignore_ascii_case("WARN") || s.eq_ignore_ascii_case("NOTE")
}
}
impl FromStr for StandardTypes {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_uppercase().as_str() {
"FAIL" => Ok(Self::Fail),
"WARN" => Ok(Self::Warn),
"NOTE" => Ok(Self::Note),
_ => Err("Unexpected standard response type, neither fail, warn or note.")
}
}
}
#[cfg(test)]
mod test {
use super::StandardCodes;
#[test]
fn parse_spec_example1() {
let (command, code) = ("ACC", "REG_INVALID_CALLBACK");
assert_eq!(StandardCodes::Custom("REG_INVALID_CALLBACK".to_string()), StandardCodes::from_message(command, code));
}
#[test]
fn parse_spec_example2() {
let (command, code) = ("BOX", "BOXES_INVALID");
assert_eq!(StandardCodes::Custom("BOXES_INVALID".to_string()), StandardCodes::from_message(command, code));
}
#[test]
fn parse_spec_example3() {
let (command, code) = ("*", "ACCOUNT_REQUIRED_TO_CONNECT");
assert_eq!(StandardCodes::Custom("ACCOUNT_REQUIRED_TO_CONNECT".to_string()), StandardCodes::from_message(command, code));
}
#[test]
fn parse_batch_example() {
let (command, code) = ("BATCH", "MULTILINE_MAX_BYTES");
assert_eq!(StandardCodes::Multiline(crate::standard_reply::MultilineCodes::MaxBytes), StandardCodes::from_message(command, code));
}
}

View file

@ -47,6 +47,7 @@
//! # } //! # }
//! ``` //! ```
use atomic_counter::{AtomicCounter, RelaxedCounter};
#[cfg(feature = "ctcp")] #[cfg(feature = "ctcp")]
use chrono::prelude::*; use chrono::prelude::*;
use futures_util::{ use futures_util::{
@ -76,12 +77,13 @@ use crate::{
}, },
error, error,
proto::{ proto::{
BatchSubCommand,
mode::ModeType, mode::ModeType,
CapSubCommand::{END, LS, REQ}, CapSubCommand::{END, LS, REQ},
Capability, ChannelMode, Command, Capability, ChannelMode, Command,
Command::{ Command::{
ChannelMODE, AUTHENTICATE, CAP, INVITE, JOIN, KICK, KILL, NICK, NICKSERV, NOTICE, OPER, ChannelMODE, AUTHENTICATE, CAP, INVITE, JOIN, KICK, KILL, NICK, NICKSERV, NOTICE, OPER,
PART, PASS, PONG, PRIVMSG, QUIT, SAMODE, SANICK, TOPIC, USER, PART, PASS, PONG, PRIVMSG, QUIT, SAMODE, SANICK, TOPIC, USER, BATCH
}, },
Message, Mode, NegotiationVersion, Response, Message, Mode, NegotiationVersion, Response,
}, },
@ -496,8 +498,12 @@ struct ClientState {
sender: Sender, sender: Sender,
/// The configuration used with this connection. /// The configuration used with this connection.
config: Config, config: Config,
/// A thread-safe counter for batch IDs
batch_id: RelaxedCounter,
/// A thread-safe map of channels to the list of users in them. /// A thread-safe map of channels to the list of users in them.
chanlists: RwLock<HashMap<String, Vec<User>>>, chanlists: RwLock<HashMap<String, Vec<User>>>,
/// A thread-safe map of in-progress batch
inflight_batches: RwLock<HashMap<String, Vec<Message>>>,
/// A thread-safe index to track the current alternative nickname being used. /// A thread-safe index to track the current alternative nickname being used.
alt_nick_index: RwLock<usize>, alt_nick_index: RwLock<usize>,
/// Default ghost sequence to send if one is required but none is configured. /// Default ghost sequence to send if one is required but none is configured.
@ -509,6 +515,8 @@ impl ClientState {
ClientState { ClientState {
sender, sender,
config, config,
batch_id: RelaxedCounter::new(0),
inflight_batches: RwLock::new(HashMap::new()),
chanlists: RwLock::new(HashMap::new()), chanlists: RwLock::new(HashMap::new()),
alt_nick_index: RwLock::new(0), alt_nick_index: RwLock::new(0),
default_ghost_sequence: vec![String::from("GHOST")], default_ghost_sequence: vec![String::from("GHOST")],
@ -553,6 +561,30 @@ impl ClientState {
/// Handles received messages internally for basic client functionality. /// Handles received messages internally for basic client functionality.
fn handle_message(&self, msg: &Message) -> error::Result<()> { fn handle_message(&self, msg: &Message) -> error::Result<()> {
log::trace!("[RECV] {}", msg.to_string()); log::trace!("[RECV] {}", msg.to_string());
if let Some(tags) = msg.tags {
if let Some(batch_tag) = tags.into_iter().find(|tag| tag.0 == "batch") {
match batch_tag.1 {
Some(batch_id) => {
let inflight_batches = self.inflight_batches.read();
// TODO: check if we negotiated batch as well.
if !inflight_batches.contains_key(&batch_id) {
} else {
// Release the read lock, we are upgrading to a write lock.
drop(inflight_batches);
let mut inflight_batches = self.inflight_batches.write();
// This message processing is delayed until the batch is finished.
inflight_batches.entry(batch_id).and_modify(|messages| messages.push(msg.clone()));
return Ok(());
}
},
None => {
// TODO: Return an invalid message error.
}
}
}
}
match msg.command { match msg.command {
JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan), JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan),
PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan), PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan),
@ -579,6 +611,7 @@ impl ClientState {
} }
} }
} }
Command::BATCH(ref reference_tag, ref sub, ref params) => self.handle_batch(reference_tag, sub.as_ref(), params.as_ref())?,
Command::Response(Response::RPL_NAMREPLY, ref args) => self.handle_namreply(args), Command::Response(Response::RPL_NAMREPLY, ref args) => self.handle_namreply(args),
Command::Response(Response::RPL_ENDOFMOTD, _) Command::Response(Response::RPL_ENDOFMOTD, _)
| Command::Response(Response::ERR_NOMOTD, _) => { | Command::Response(Response::ERR_NOMOTD, _) => {
@ -672,6 +705,40 @@ impl ClientState {
} }
} }
#[cfg(not(feature = "batch"))]
fn handle_batch(&self, _: &str, _: Option<&BatchSubCommand>, _: Option<&Vec<String>>) -> error::Result<()> {}
#[cfg(feature = "batch")]
fn handle_batch(&self, reference_tag: &str, sub: Option<&BatchSubCommand>, params: Option<&Vec<String>>) -> error::Result<()> {
// TODO: increase type safety here.
let is_start = reference_tag.chars().nth(0).unwrap() == '+';
let mut inflight_batches = self.inflight_batches.write();
// TODO: handle nested batches better.
// TODO: handling sub commands such as netsplit and netjoin could be done by having extra
// handlers for end users to warn them of an incoming netsplit or netjoin batch.
// If this is chathistory, handle_chathistory could also be designed.
let identifier = reference_tag[1..].to_string();
if is_start {
if inflight_batches.contains_key(&identifier) {
return Err(error::Error::BatchAlreadyExists(identifier));
}
// Create a new pending batch.
inflight_batches.insert(identifier, Vec::new());
} else {
// Remove the pending batches and replay all the messages.
let pending_messages = inflight_batches.remove(&reference_tag[1..].to_string()).ok_or(error::Error::BatchDisappearedBeforeEndOfBatchProcessed)?;
// Replay all delayed messages now.
for message in pending_messages {
self.handle_message(&message)?;
}
}
Ok(())
}
#[cfg(not(feature = "channel-lists"))] #[cfg(not(feature = "channel-lists"))]
fn handle_join(&self, _: &str, _: &str) {} fn handle_join(&self, _: &str, _: &str) {}
@ -807,6 +874,21 @@ impl ClientState {
} }
pub_state_base!(); pub_state_base!();
/// Sends a client batch with an iterator as the body.
pub fn send_client_batch<I: Iterator<Item=Message>>(&mut self, msg_iterator: I) -> error::Result<()> {
let batch_id = format!("{}", self.batch_id.get());
self.send(BATCH(batch_id.clone(), None, None))?;
// Attach to all message that batch ID.
msg_iterator.for_each(|msg| {
self.send(msg);
});
// Close the batch.
self.send(BATCH(batch_id, None, None))?;
self.batch_id.inc();
Ok(())
}
} }
/// Thread-safe sender that can be used with the client. /// Thread-safe sender that can be used with the client.

View file

@ -123,6 +123,14 @@ pub enum Error {
/// Stream has already been configured. /// Stream has already been configured.
#[error("stream has already been configured")] #[error("stream has already been configured")]
StreamAlreadyConfigured, StreamAlreadyConfigured,
/// A end of batch message was sent after a pending batch was removed from the inflight list
#[error("batch disappeared before end of batch was processed")]
BatchDisappearedBeforeEndOfBatchProcessed,
/// A new batch was started with the same reference tag
#[error("batch {} already exists but a new batch request with the same tag was sent", .0)]
BatchAlreadyExists(String),
} }
/// Errors that occur with configurations. /// Errors that occur with configurations.