Compare commits
3 commits
381ecddfee
...
30748d7ba3
Author | SHA1 | Date | |
---|---|---|---|
|
30748d7ba3 | ||
|
e85fd07719 | ||
|
c42b1572de |
7 changed files with 422 additions and 2 deletions
|
@ -24,9 +24,10 @@ members = [ "./", "irc-proto/" ]
|
|||
|
||||
|
||||
[features]
|
||||
default = ["ctcp", "tls-native", "channel-lists", "toml_config"]
|
||||
default = ["ctcp", "tls-native", "channel-lists", "batch", "toml_config"]
|
||||
ctcp = []
|
||||
channel-lists = []
|
||||
batch = []
|
||||
|
||||
json_config = ["serde", "serde/derive", "serde_derive", "serde_json"]
|
||||
toml_config = ["serde", "serde/derive", "serde_derive", "toml"]
|
||||
|
@ -70,6 +71,7 @@ tokio-rustls = { version = "0.24.0", features = ["dangerous_configuration"], opt
|
|||
rustls-pemfile = { version = "1.0.2", optional = true }
|
||||
tokio-native-tls = { version = "0.3.1", optional = true }
|
||||
webpki-roots = { version = "0.23.0", optional = true }
|
||||
atomic-counter = "1.0.1"
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
@ -5,6 +5,7 @@ use crate::chan::ChannelExt;
|
|||
use crate::error::MessageParseError;
|
||||
use crate::mode::{ChannelMode, Mode, UserMode};
|
||||
use crate::response::Response;
|
||||
use crate::standard_reply::{StandardTypes, StandardCodes};
|
||||
|
||||
/// List of all client commands as defined in [RFC 2812](http://tools.ietf.org/html/rfc2812). This
|
||||
/// also includes commands from the
|
||||
|
@ -200,6 +201,9 @@ pub enum Command {
|
|||
// Default option.
|
||||
/// An IRC response code with arguments and optional suffix.
|
||||
Response(Response, Vec<String>),
|
||||
/// https://ircv3.net/specs/extensions/standard-replies
|
||||
/// [FAIL | WARN | NOTE] <command> <code> [<context>] <description>
|
||||
StandardResponse(StandardTypes, String, StandardCodes, Vec<String>, String),
|
||||
/// A raw IRC command unknown to the crate.
|
||||
Raw(String, Vec<String>),
|
||||
}
|
||||
|
@ -408,6 +412,10 @@ impl<'a> From<&'a Command> for String {
|
|||
&format!("{:03}", *resp as u16),
|
||||
&a.iter().map(|s| &s[..]).collect::<Vec<_>>(),
|
||||
),
|
||||
Command::StandardResponse(ref r#type, ref command, ref code, ref args, ref description) => {
|
||||
match r#type {
|
||||
}
|
||||
},
|
||||
Command::Raw(ref c, ref a) => {
|
||||
stringify(c, &a.iter().map(|s| &s[..]).collect::<Vec<_>>())
|
||||
}
|
||||
|
@ -958,6 +966,14 @@ impl Command {
|
|||
} else {
|
||||
raw(cmd, args)
|
||||
}
|
||||
} else if StandardTypes::is_standard_type(cmd) {
|
||||
let code = StandardCodes::from_message(args[0], args[1]);
|
||||
let mut std_args: Vec<String> = args.iter().skip(2).map(|&s| s.to_owned()).collect();
|
||||
let desc = std_args.pop().ok_or_else(|| MessageParseError::MissingDescriptionInStandardReply)?;
|
||||
|
||||
Command::StandardResponse(
|
||||
StandardTypes::from_str(cmd).map_err(MessageParseError::InvalidStandardReplyType)?,
|
||||
args[0].to_owned(), code, std_args, desc)
|
||||
} else if let Ok(resp) = cmd.parse() {
|
||||
Command::Response(resp, args.into_iter().map(|s| s.to_owned()).collect())
|
||||
} else {
|
||||
|
@ -1128,6 +1144,7 @@ mod test {
|
|||
use super::Command;
|
||||
use super::Response;
|
||||
use crate::Message;
|
||||
use crate::standard_reply::{StandardTypes, StandardCodes};
|
||||
|
||||
#[test]
|
||||
fn format_response() {
|
||||
|
@ -1155,4 +1172,20 @@ mod test {
|
|||
cmd
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_standard_reply() {
|
||||
let msg = "FAIL BOX BOXES_INVALID STACK CLOCKWISE :Given boxes are not supported".parse::<Message>().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
msg.command,
|
||||
Command::StandardResponse(StandardTypes::Fail, "BOX".to_string(), StandardCodes::Custom("BOXES_INVALID".to_string()), vec!["STACK".to_string(), "CLOCKWISE".to_string()], "Given boxes are not supported".to_string())
|
||||
);
|
||||
|
||||
let msg = "NOTE * OPER_MESSAGE :Registering new accounts and channels has been disabled temporarily while we deal with the spam. Thanks for flying ExampleNet! -dan".parse::<Message>().unwrap();
|
||||
assert_eq!(
|
||||
msg.command,
|
||||
Command::StandardResponse(StandardTypes::Note, "*".to_string(), StandardCodes::Custom("OPER_MESSAGE".to_string()), vec![], "Registering new accounts and channels has been disabled temporarily while we deal with the spam. Thanks for flying ExampleNet! -dan".to_string())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,6 +58,14 @@ pub enum MessageParseError {
|
|||
/// The invalid subcommand.
|
||||
sub: String,
|
||||
},
|
||||
|
||||
/// The standard reply missed a description
|
||||
#[error("missing description in standard reply")]
|
||||
MissingDescriptionInStandardReply,
|
||||
|
||||
/// Invalid standard reply type
|
||||
#[error("invalid standard reply type: {}", .0)]
|
||||
InvalidStandardReplyType(&'static str)
|
||||
}
|
||||
|
||||
/// Errors that occur while parsing mode strings.
|
||||
|
|
|
@ -15,6 +15,7 @@ pub mod message;
|
|||
pub mod mode;
|
||||
pub mod prefix;
|
||||
pub mod response;
|
||||
pub mod standard_reply;
|
||||
|
||||
pub use self::caps::{Capability, NegotiationVersion};
|
||||
pub use self::chan::ChannelExt;
|
||||
|
|
286
irc-proto/src/standard_reply.rs
Normal file
286
irc-proto/src/standard_reply.rs
Normal file
|
@ -0,0 +1,286 @@
|
|||
use std::str::FromStr;
|
||||
|
||||
/// Support for https://ircv3.net/specs/extensions/standard-replies
|
||||
/// Implements the list of reply codes in the IRCv3 registry: https://ircv3.net/registry
|
||||
|
||||
trait FromCode {
|
||||
fn from_code(code: &str) -> Option<Self> where Self: Sized;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum MultilineCodes {
|
||||
MaxBytes,
|
||||
MaxLines,
|
||||
InvalidTarget,
|
||||
Invalid
|
||||
}
|
||||
|
||||
impl FromCode for MultilineCodes {
|
||||
fn from_code(code: &str) -> Option<Self> {
|
||||
Some(match code {
|
||||
"MULTILINE_MAX_BYTES" => Self::MaxBytes,
|
||||
"MULTILINE_MAX_LINES" => Self::MaxLines,
|
||||
"MULTILINE_INVALID_TARGET" => Self::InvalidTarget,
|
||||
"MULTILINE_INVALID" => Self::Invalid,
|
||||
_ => return None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ChatHistoryCodes {
|
||||
InvalidParams,
|
||||
InvalidTarget,
|
||||
MessageError,
|
||||
NeedMoreParams,
|
||||
UnknownCommand
|
||||
}
|
||||
|
||||
impl FromCode for ChatHistoryCodes {
|
||||
fn from_code(code: &str) -> Option<Self> {
|
||||
Some(match code {
|
||||
"INVALID_PARAMS" => Self::InvalidParams,
|
||||
"INVALID_TARGET" => Self::InvalidTarget,
|
||||
"MESSAGE_ERROR" => Self::MessageError,
|
||||
"NEED_MORE_PARAMS" => Self::NeedMoreParams,
|
||||
"UNKNOWN_COMMAND" => Self::UnknownCommand,
|
||||
_ => return None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum JoinCodes {
|
||||
ChannelRenamed
|
||||
}
|
||||
|
||||
impl FromCode for JoinCodes {
|
||||
fn from_code(code: &str) -> Option<Self> {
|
||||
Some(match code {
|
||||
"CHANNEL_RENAMED" => Self::ChannelRenamed,
|
||||
_ => return None
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum NickCodes {
|
||||
Reserved
|
||||
}
|
||||
|
||||
impl FromCode for NickCodes {
|
||||
fn from_code(code: &str) -> Option<Self> {
|
||||
Some(match code {
|
||||
"NICKNAME_RESERVED" => Self::Reserved,
|
||||
_ => return None
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RedactCodes {
|
||||
InvalidTarget,
|
||||
Forbidden,
|
||||
WindowExpired,
|
||||
UnknownMsgid,
|
||||
}
|
||||
|
||||
impl FromCode for RedactCodes {
|
||||
fn from_code(code: &str) -> Option<Self> {
|
||||
Some(match code {
|
||||
"INVALID_TARGET" => Self::InvalidTarget,
|
||||
"REACT_FORBIDDEN" => Self::Forbidden,
|
||||
"REACT_WINDOW_EXPIRED" => Self::WindowExpired,
|
||||
"UNKNOWN_MSGID" => Self::UnknownMsgid,
|
||||
_ => return None
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RegisterCodes {
|
||||
AccountExists,
|
||||
AccountNameMustBeNick,
|
||||
AlreadyAuthenticated,
|
||||
BadAccountName,
|
||||
CompleteConnectionRequired,
|
||||
InvalidEmail,
|
||||
NeedNick,
|
||||
TemporarilyUnavailable,
|
||||
UnacceptableEmail,
|
||||
UnacceptablePassword,
|
||||
WeakPassword
|
||||
}
|
||||
|
||||
impl FromCode for RegisterCodes {
|
||||
fn from_code(code: &str) -> Option<Self> {
|
||||
Some(match code {
|
||||
"ACCOUNT_EXISTS" => Self::AccountExists,
|
||||
"ACCOUNT_NAME_MUST_BE_NICK" => Self::AccountNameMustBeNick,
|
||||
"ALREADY_AUTHENTICATED" => Self::AlreadyAuthenticated,
|
||||
"BAD_ACCOUNT_NAME" => Self::BadAccountName,
|
||||
"COMPLETE_CONNECTION_REQUIRED" => Self::CompleteConnectionRequired,
|
||||
"INVALID_EMAIL" => Self::InvalidEmail,
|
||||
"NEED_NICK" => Self::NeedNick,
|
||||
"TEMPORARILY_UNAVAILABLE" => Self::TemporarilyUnavailable,
|
||||
"UNACCEPTABLE_EMAIL" => Self::UnacceptableEmail,
|
||||
"UNACCEPTABLE_PASSWORD" => Self::UnacceptablePassword,
|
||||
"WEAK_PASSWORD" => Self::WeakPassword,
|
||||
_ => return None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RenameCodes {
|
||||
ChannelNameInUse,
|
||||
CannotRename
|
||||
}
|
||||
|
||||
impl FromCode for RenameCodes {
|
||||
fn from_code(code: &str) -> Option<Self> {
|
||||
Some(match code {
|
||||
"CHANNEL_NAME_IN_USE" => Self::ChannelNameInUse,
|
||||
"CANNOT_RENAME" => Self::CannotRename,
|
||||
_ => return None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum SetNameCodes {
|
||||
CannotChangeRealname,
|
||||
InvalidRealname
|
||||
}
|
||||
|
||||
impl FromCode for SetNameCodes {
|
||||
fn from_code(code: &str) -> Option<Self> {
|
||||
Some(match code {
|
||||
"CANNOT_CHANGE_REALNAME" => Self::CannotChangeRealname,
|
||||
"INVALID_REALNAME" => Self::InvalidRealname,
|
||||
_ => return None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum VerifyCodes {
|
||||
AlreadyAuthenticated,
|
||||
InvalidCode,
|
||||
CompleteConnectionRequired,
|
||||
TemporarilyUnavailable
|
||||
}
|
||||
|
||||
impl FromCode for VerifyCodes {
|
||||
fn from_code(code: &str) -> Option<Self> {
|
||||
Some(match code {
|
||||
"ALREADY_AUTHENTICATED" => Self::AlreadyAuthenticated,
|
||||
"INVALID_CODE" => Self::InvalidCode,
|
||||
"COMPLETE_CONNECTION_REQUIRED" => Self::CompleteConnectionRequired,
|
||||
"TEMPORARILY_UNAVAILABLE" => Self::TemporarilyUnavailable,
|
||||
_ => return None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum StandardCodes {
|
||||
AccountRequired,
|
||||
InvalidUtf8,
|
||||
|
||||
Multiline(MultilineCodes),
|
||||
ChatHistory(ChatHistoryCodes),
|
||||
Join(JoinCodes),
|
||||
Nick(NickCodes),
|
||||
Redact(RedactCodes),
|
||||
Register(RegisterCodes),
|
||||
Rename(RenameCodes),
|
||||
SetName(SetNameCodes),
|
||||
Verify(VerifyCodes),
|
||||
|
||||
Custom(String)
|
||||
}
|
||||
|
||||
impl StandardCodes {
|
||||
fn known_from_message(command: &str, code: &str) -> Option<Self> {
|
||||
Some(match command {
|
||||
"BATCH" => Self::Multiline(MultilineCodes::from_code(code)?),
|
||||
"CHATHISTORY" => Self::ChatHistory(ChatHistoryCodes::from_code(code)?),
|
||||
"JOIN" => Self::Join(JoinCodes::from_code(code)?),
|
||||
"NICK" => Self::Nick(NickCodes::from_code(code)?),
|
||||
"REDACT" => Self::Redact(RedactCodes::from_code(code)?),
|
||||
"REGISTER" => Self::Register(RegisterCodes::from_code(code)?),
|
||||
"RENAME" => Self::Rename(RenameCodes::from_code(code)?),
|
||||
"SETNAME" => Self::SetName(SetNameCodes::from_code(code)?),
|
||||
"VERIFY" => Self::Verify(VerifyCodes::from_code(code)?),
|
||||
_ => {
|
||||
match code {
|
||||
"ACCOUNT_REQUIRED" => Self::AccountRequired,
|
||||
"INVALID_UTF8" => Self::InvalidUtf8,
|
||||
_ => Self::Custom(code.to_string())
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn from_message(command: &str, code: &str) -> Self {
|
||||
Self::known_from_message(command, code).unwrap_or_else(|| Self::Custom(code.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum StandardTypes {
|
||||
Fail,
|
||||
Warn,
|
||||
Note
|
||||
}
|
||||
|
||||
impl StandardTypes {
|
||||
pub fn is_standard_type(s: &str) -> bool {
|
||||
s.eq_ignore_ascii_case("FAIL") || s.eq_ignore_ascii_case("WARN") || s.eq_ignore_ascii_case("NOTE")
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for StandardTypes {
|
||||
type Err = &'static str;
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s.to_ascii_uppercase().as_str() {
|
||||
"FAIL" => Ok(Self::Fail),
|
||||
"WARN" => Ok(Self::Warn),
|
||||
"NOTE" => Ok(Self::Note),
|
||||
_ => Err("Unexpected standard response type, neither fail, warn or note.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::StandardCodes;
|
||||
|
||||
#[test]
|
||||
fn parse_spec_example1() {
|
||||
let (command, code) = ("ACC", "REG_INVALID_CALLBACK");
|
||||
assert_eq!(StandardCodes::Custom("REG_INVALID_CALLBACK".to_string()), StandardCodes::from_message(command, code));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_example2() {
|
||||
let (command, code) = ("BOX", "BOXES_INVALID");
|
||||
assert_eq!(StandardCodes::Custom("BOXES_INVALID".to_string()), StandardCodes::from_message(command, code));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_example3() {
|
||||
let (command, code) = ("*", "ACCOUNT_REQUIRED_TO_CONNECT");
|
||||
assert_eq!(StandardCodes::Custom("ACCOUNT_REQUIRED_TO_CONNECT".to_string()), StandardCodes::from_message(command, code));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_batch_example() {
|
||||
let (command, code) = ("BATCH", "MULTILINE_MAX_BYTES");
|
||||
assert_eq!(StandardCodes::Multiline(crate::standard_reply::MultilineCodes::MaxBytes), StandardCodes::from_message(command, code));
|
||||
}
|
||||
}
|
|
@ -47,6 +47,7 @@
|
|||
//! # }
|
||||
//! ```
|
||||
|
||||
use atomic_counter::{AtomicCounter, RelaxedCounter};
|
||||
#[cfg(feature = "ctcp")]
|
||||
use chrono::prelude::*;
|
||||
use futures_util::{
|
||||
|
@ -76,12 +77,13 @@ use crate::{
|
|||
},
|
||||
error,
|
||||
proto::{
|
||||
BatchSubCommand,
|
||||
mode::ModeType,
|
||||
CapSubCommand::{END, LS, REQ},
|
||||
Capability, ChannelMode, Command,
|
||||
Command::{
|
||||
ChannelMODE, AUTHENTICATE, CAP, INVITE, JOIN, KICK, KILL, NICK, NICKSERV, NOTICE, OPER,
|
||||
PART, PASS, PONG, PRIVMSG, QUIT, SAMODE, SANICK, TOPIC, USER,
|
||||
PART, PASS, PONG, PRIVMSG, QUIT, SAMODE, SANICK, TOPIC, USER, BATCH
|
||||
},
|
||||
Message, Mode, NegotiationVersion, Response,
|
||||
},
|
||||
|
@ -496,8 +498,12 @@ struct ClientState {
|
|||
sender: Sender,
|
||||
/// The configuration used with this connection.
|
||||
config: Config,
|
||||
/// A thread-safe counter for batch IDs
|
||||
batch_id: RelaxedCounter,
|
||||
/// A thread-safe map of channels to the list of users in them.
|
||||
chanlists: RwLock<HashMap<String, Vec<User>>>,
|
||||
/// A thread-safe map of in-progress batch
|
||||
inflight_batches: RwLock<HashMap<String, Vec<Message>>>,
|
||||
/// A thread-safe index to track the current alternative nickname being used.
|
||||
alt_nick_index: RwLock<usize>,
|
||||
/// Default ghost sequence to send if one is required but none is configured.
|
||||
|
@ -509,6 +515,8 @@ impl ClientState {
|
|||
ClientState {
|
||||
sender,
|
||||
config,
|
||||
batch_id: RelaxedCounter::new(0),
|
||||
inflight_batches: RwLock::new(HashMap::new()),
|
||||
chanlists: RwLock::new(HashMap::new()),
|
||||
alt_nick_index: RwLock::new(0),
|
||||
default_ghost_sequence: vec![String::from("GHOST")],
|
||||
|
@ -553,6 +561,30 @@ impl ClientState {
|
|||
/// Handles received messages internally for basic client functionality.
|
||||
fn handle_message(&self, msg: &Message) -> error::Result<()> {
|
||||
log::trace!("[RECV] {}", msg.to_string());
|
||||
|
||||
if let Some(tags) = msg.tags {
|
||||
if let Some(batch_tag) = tags.into_iter().find(|tag| tag.0 == "batch") {
|
||||
match batch_tag.1 {
|
||||
Some(batch_id) => {
|
||||
let inflight_batches = self.inflight_batches.read();
|
||||
// TODO: check if we negotiated batch as well.
|
||||
if !inflight_batches.contains_key(&batch_id) {
|
||||
} else {
|
||||
// Release the read lock, we are upgrading to a write lock.
|
||||
drop(inflight_batches);
|
||||
let mut inflight_batches = self.inflight_batches.write();
|
||||
// This message processing is delayed until the batch is finished.
|
||||
inflight_batches.entry(batch_id).and_modify(|messages| messages.push(msg.clone()));
|
||||
return Ok(());
|
||||
}
|
||||
},
|
||||
None => {
|
||||
// TODO: Return an invalid message error.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match msg.command {
|
||||
JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan),
|
||||
PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan),
|
||||
|
@ -579,6 +611,7 @@ impl ClientState {
|
|||
}
|
||||
}
|
||||
}
|
||||
Command::BATCH(ref reference_tag, ref sub, ref params) => self.handle_batch(reference_tag, sub.as_ref(), params.as_ref())?,
|
||||
Command::Response(Response::RPL_NAMREPLY, ref args) => self.handle_namreply(args),
|
||||
Command::Response(Response::RPL_ENDOFMOTD, _)
|
||||
| Command::Response(Response::ERR_NOMOTD, _) => {
|
||||
|
@ -672,6 +705,40 @@ impl ClientState {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "batch"))]
|
||||
fn handle_batch(&self, _: &str, _: Option<&BatchSubCommand>, _: Option<&Vec<String>>) -> error::Result<()> {}
|
||||
|
||||
#[cfg(feature = "batch")]
|
||||
fn handle_batch(&self, reference_tag: &str, sub: Option<&BatchSubCommand>, params: Option<&Vec<String>>) -> error::Result<()> {
|
||||
// TODO: increase type safety here.
|
||||
let is_start = reference_tag.chars().nth(0).unwrap() == '+';
|
||||
let mut inflight_batches = self.inflight_batches.write();
|
||||
|
||||
// TODO: handle nested batches better.
|
||||
// TODO: handling sub commands such as netsplit and netjoin could be done by having extra
|
||||
// handlers for end users to warn them of an incoming netsplit or netjoin batch.
|
||||
// If this is chathistory, handle_chathistory could also be designed.
|
||||
|
||||
let identifier = reference_tag[1..].to_string();
|
||||
if is_start {
|
||||
if inflight_batches.contains_key(&identifier) {
|
||||
return Err(error::Error::BatchAlreadyExists(identifier));
|
||||
}
|
||||
|
||||
// Create a new pending batch.
|
||||
inflight_batches.insert(identifier, Vec::new());
|
||||
} else {
|
||||
// Remove the pending batches and replay all the messages.
|
||||
let pending_messages = inflight_batches.remove(&reference_tag[1..].to_string()).ok_or(error::Error::BatchDisappearedBeforeEndOfBatchProcessed)?;
|
||||
// Replay all delayed messages now.
|
||||
for message in pending_messages {
|
||||
self.handle_message(&message)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "channel-lists"))]
|
||||
fn handle_join(&self, _: &str, _: &str) {}
|
||||
|
||||
|
@ -807,6 +874,21 @@ impl ClientState {
|
|||
}
|
||||
|
||||
pub_state_base!();
|
||||
|
||||
/// Sends a client batch with an iterator as the body.
|
||||
pub fn send_client_batch<I: Iterator<Item=Message>>(&mut self, msg_iterator: I) -> error::Result<()> {
|
||||
let batch_id = format!("{}", self.batch_id.get());
|
||||
self.send(BATCH(batch_id.clone(), None, None))?;
|
||||
// Attach to all message that batch ID.
|
||||
msg_iterator.for_each(|msg| {
|
||||
self.send(msg);
|
||||
});
|
||||
// Close the batch.
|
||||
self.send(BATCH(batch_id, None, None))?;
|
||||
self.batch_id.inc();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Thread-safe sender that can be used with the client.
|
||||
|
|
|
@ -123,6 +123,14 @@ pub enum Error {
|
|||
/// Stream has already been configured.
|
||||
#[error("stream has already been configured")]
|
||||
StreamAlreadyConfigured,
|
||||
|
||||
/// A end of batch message was sent after a pending batch was removed from the inflight list
|
||||
#[error("batch disappeared before end of batch was processed")]
|
||||
BatchDisappearedBeforeEndOfBatchProcessed,
|
||||
|
||||
/// A new batch was started with the same reference tag
|
||||
#[error("batch {} already exists but a new batch request with the same tag was sent", .0)]
|
||||
BatchAlreadyExists(String),
|
||||
}
|
||||
|
||||
/// Errors that occur with configurations.
|
||||
|
|
Loading…
Reference in a new issue