Added support for message throttling.

This commit is contained in:
Aaron Weiss 2017-06-28 23:07:02 -07:00
parent ccefda229b
commit a0f43cb80b
No known key found for this signature in database
GPG key ID: 0237035D9BF03AE2
3 changed files with 52 additions and 4 deletions

View file

@ -92,6 +92,8 @@ for obvious reasons). That being said, here's an example of a complete configura
"source": "https://github.com/aatxe/irc", "source": "https://github.com/aatxe/irc",
"ping_time": 180, "ping_time": 180,
"ping_timeout": 10, "ping_timeout": 10,
"burst_window_length": 8,
"max_messages_in_burst": 15,
"should_ghost": false, "should_ghost": false,
"ghost_sequence": [], "ghost_sequence": [],
"options": { "options": {

View file

@ -56,6 +56,11 @@ pub struct Config {
pub ping_time: Option<u32>, pub ping_time: Option<u32>,
/// The amount of time in seconds for a client to reconnect due to no ping response. /// The amount of time in seconds for a client to reconnect due to no ping response.
pub ping_timeout: Option<u32>, pub ping_timeout: Option<u32>,
/// The amount of time in seconds to consider a window for burst messages.
pub burst_window_length: Option<u32>,
/// The maximum number of messages that can be sent in a burst window before they'll be delayed.
/// Messages are automatically delayed until the start of the next window.
pub max_messages_in_burst: Option<u32>,
/// Whether the client should use NickServ GHOST to reclaim its primary nickname if it is in /// Whether the client should use NickServ GHOST to reclaim its primary nickname if it is in
/// use. This has no effect if `nick_password` is not set. /// use. This has no effect if `nick_password` is not set.
pub should_ghost: Option<bool>, pub should_ghost: Option<bool>,
@ -248,6 +253,19 @@ impl Config {
self.ping_timeout.as_ref().cloned().unwrap_or(10) self.ping_timeout.as_ref().cloned().unwrap_or(10)
} }
/// The amount of time in seconds to consider a window for burst messages.
/// This defaults to 8 seconds when not specified.
pub fn burst_window_length(&self) -> u32 {
self.burst_window_length.as_ref().cloned().unwrap_or(8)
}
/// The maximum number of messages that can be sent in a burst window before they'll be delayed.
/// Messages are automatically delayed until the start of the next window.
/// This defaults to 15 messages when not specified.
pub fn max_messages_in_burst(&self) -> u32 {
self.max_messages_in_burst.as_ref().cloned().unwrap_or(15)
}
/// Gets whether or not to attempt nickname reclamation using NickServ GHOST. /// Gets whether or not to attempt nickname reclamation using NickServ GHOST.
/// This defaults to false when not specified. /// This defaults to false when not specified.
pub fn should_ghost(&self) -> bool { pub fn should_ghost(&self) -> bool {
@ -317,6 +335,8 @@ mod test {
source: None, source: None,
ping_time: None, ping_time: None,
ping_timeout: None, ping_timeout: None,
burst_window_length: None,
max_messages_in_burst: None,
should_ghost: None, should_ghost: None,
ghost_sequence: None, ghost_sequence: None,
options: Some(HashMap::new()), options: Some(HashMap::new()),
@ -349,6 +369,8 @@ mod test {
source: None, source: None,
ping_time: None, ping_time: None,
ping_timeout: None, ping_timeout: None,
burst_window_length: None,
max_messages_in_burst: None,
should_ghost: None, should_ghost: None,
ghost_sequence: None, ghost_sequence: None,
options: Some(HashMap::new()), options: Some(HashMap::new()),

View file

@ -19,6 +19,9 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
{ {
inner: Framed<T, IrcCodec>, inner: Framed<T, IrcCodec>,
burst_timer: Interval,
max_burst_messages: u32,
current_burst_messages: u32,
ping_timer: Interval, ping_timer: Interval,
ping_timeout: u64, ping_timeout: u64,
last_ping_data: String, last_ping_data: String,
@ -32,11 +35,13 @@ where
{ {
/// Creates a new `IrcTransport` from the given IRC stream. /// Creates a new `IrcTransport` from the given IRC stream.
pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> { pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> {
let timer = tokio_timer::wheel().build();
IrcTransport { IrcTransport {
inner: inner, inner: inner,
ping_timer: tokio_timer::wheel().build().interval( burst_timer: timer.interval(Duration::from_secs(config.burst_window_length() as u64)),
Duration::from_secs(config.ping_time() as u64) max_burst_messages: config.max_messages_in_burst(),
), current_burst_messages: 0,
ping_timer: timer.interval(Duration::from_secs(config.ping_time() as u64)),
ping_timeout: config.ping_timeout() as u64, ping_timeout: config.ping_timeout() as u64,
last_ping_data: String::new(), last_ping_data: String::new(),
last_ping_sent: Instant::now(), last_ping_sent: Instant::now(),
@ -145,7 +150,26 @@ where
self.close()?; self.close()?;
Err(error::ErrorKind::PingTimeout.into()) Err(error::ErrorKind::PingTimeout.into())
} else { } else {
Ok(self.inner.poll_complete()?) match self.burst_timer.poll()? {
Async::NotReady => (),
Async::Ready(msg) => {
assert!(msg.is_some());
self.current_burst_messages = 0;
}
}
// Throttling if too many messages have been sent recently.
if self.current_burst_messages >= self.max_burst_messages {
return Ok(Async::NotReady)
}
match self.inner.poll_complete()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(()) => {
self.current_burst_messages += 1;
Ok(Async::Ready(()))
}
}
} }
} }