From a0f43cb80b5e92f96a68cc751c583fac55c865fc Mon Sep 17 00:00:00 2001 From: Aaron Weiss Date: Wed, 28 Jun 2017 23:07:02 -0700 Subject: [PATCH] Added support for message throttling. --- README.md | 2 ++ src/client/data/config.rs | 22 ++++++++++++++++++++++ src/client/transport.rs | 32 ++++++++++++++++++++++++++++---- 3 files changed, 52 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 9047261..699433c 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,8 @@ for obvious reasons). That being said, here's an example of a complete configura "source": "https://github.com/aatxe/irc", "ping_time": 180, "ping_timeout": 10, + "burst_window_length": 8, + "max_messages_in_burst": 15, "should_ghost": false, "ghost_sequence": [], "options": { diff --git a/src/client/data/config.rs b/src/client/data/config.rs index b09cd7c..65656ec 100644 --- a/src/client/data/config.rs +++ b/src/client/data/config.rs @@ -56,6 +56,11 @@ pub struct Config { pub ping_time: Option, /// The amount of time in seconds for a client to reconnect due to no ping response. pub ping_timeout: Option, + /// The amount of time in seconds to consider a window for burst messages. + pub burst_window_length: Option, + /// The maximum number of messages that can be sent in a burst window before they'll be delayed. + /// Messages are automatically delayed until the start of the next window. + pub max_messages_in_burst: Option, /// Whether the client should use NickServ GHOST to reclaim its primary nickname if it is in /// use. This has no effect if `nick_password` is not set. pub should_ghost: Option, @@ -248,6 +253,19 @@ impl Config { self.ping_timeout.as_ref().cloned().unwrap_or(10) } + /// The amount of time in seconds to consider a window for burst messages. + /// This defaults to 8 seconds when not specified. + pub fn burst_window_length(&self) -> u32 { + self.burst_window_length.as_ref().cloned().unwrap_or(8) + } + + /// The maximum number of messages that can be sent in a burst window before they'll be delayed. + /// Messages are automatically delayed until the start of the next window. + /// This defaults to 15 messages when not specified. + pub fn max_messages_in_burst(&self) -> u32 { + self.max_messages_in_burst.as_ref().cloned().unwrap_or(15) + } + /// Gets whether or not to attempt nickname reclamation using NickServ GHOST. /// This defaults to false when not specified. pub fn should_ghost(&self) -> bool { @@ -317,6 +335,8 @@ mod test { source: None, ping_time: None, ping_timeout: None, + burst_window_length: None, + max_messages_in_burst: None, should_ghost: None, ghost_sequence: None, options: Some(HashMap::new()), @@ -349,6 +369,8 @@ mod test { source: None, ping_time: None, ping_timeout: None, + burst_window_length: None, + max_messages_in_burst: None, should_ghost: None, ghost_sequence: None, options: Some(HashMap::new()), diff --git a/src/client/transport.rs b/src/client/transport.rs index d324385..bf2906e 100644 --- a/src/client/transport.rs +++ b/src/client/transport.rs @@ -19,6 +19,9 @@ where T: AsyncRead + AsyncWrite, { inner: Framed, + burst_timer: Interval, + max_burst_messages: u32, + current_burst_messages: u32, ping_timer: Interval, ping_timeout: u64, last_ping_data: String, @@ -32,11 +35,13 @@ where { /// Creates a new `IrcTransport` from the given IRC stream. pub fn new(config: &Config, inner: Framed) -> IrcTransport { + let timer = tokio_timer::wheel().build(); IrcTransport { inner: inner, - ping_timer: tokio_timer::wheel().build().interval( - Duration::from_secs(config.ping_time() as u64) - ), + burst_timer: timer.interval(Duration::from_secs(config.burst_window_length() as u64)), + max_burst_messages: config.max_messages_in_burst(), + current_burst_messages: 0, + ping_timer: timer.interval(Duration::from_secs(config.ping_time() as u64)), ping_timeout: config.ping_timeout() as u64, last_ping_data: String::new(), last_ping_sent: Instant::now(), @@ -145,7 +150,26 @@ where self.close()?; Err(error::ErrorKind::PingTimeout.into()) } else { - Ok(self.inner.poll_complete()?) + match self.burst_timer.poll()? { + Async::NotReady => (), + Async::Ready(msg) => { + assert!(msg.is_some()); + self.current_burst_messages = 0; + } + } + + // Throttling if too many messages have been sent recently. + if self.current_burst_messages >= self.max_burst_messages { + return Ok(Async::NotReady) + } + + match self.inner.poll_complete()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(()) => { + self.current_burst_messages += 1; + Ok(Async::Ready(())) + } + } } }