diff --git a/src/client/transport.rs b/src/client/transport.rs index dd79947..454a66e 100644 --- a/src/client/transport.rs +++ b/src/client/transport.rs @@ -1,5 +1,6 @@ //! An IRC transport that wraps an IRC-framed stream to provide a number of features including //! automatic PING replies, automatic sending of PINGs, and message rate-limiting. +use std::collections::VecDeque; use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; @@ -21,7 +22,7 @@ where { inner: Framed, burst_timer: Timer, - rolling_burst_window: Sleep, + rolling_burst_window: VecDeque, burst_window_length: u64, max_burst_messages: u64, current_burst_messages: u64, @@ -39,12 +40,10 @@ where /// Creates a new `IrcTransport` from the given IRC stream. pub fn new(config: &Config, inner: Framed) -> IrcTransport { let timer = tokio_timer::wheel().build(); - let burst = tokio_timer::wheel().build(); - let burst_window = burst.sleep(Duration::from_secs(config.burst_window_length() as u64)); IrcTransport { inner: inner, - burst_timer: burst, - rolling_burst_window: burst_window, + burst_timer: tokio_timer::wheel().build(), + rolling_burst_window: VecDeque::new(), burst_window_length: config.burst_window_length() as u64, max_burst_messages: config.max_messages_in_burst() as u64, current_burst_messages: 0, @@ -78,6 +77,10 @@ where self.poll_complete()?; Ok(()) } + + fn rolling_burst_window_front(&mut self) -> Result, tokio_timer::TimerError> { + self.rolling_burst_window.front_mut().map(|w| w.poll()).unwrap_or(Ok(Async::NotReady)) + } } impl Stream for IrcTransport @@ -148,15 +151,12 @@ where self.close()?; Err(error::ErrorKind::PingTimeout.into()) } else { - // Check if the last rolling window has ended. - match self.rolling_burst_window.poll()? { + // Check if the oldest message in the rolling window is discounted. + match self.rolling_burst_window_front()? { Async::NotReady => (), Async::Ready(()) => { - // Since it has ended, we reset the message count and start a new window. - self.current_burst_messages = 0; - self.rolling_burst_window = self.burst_timer.sleep(Duration::from_secs( - self.burst_window_length - )); + self.current_burst_messages -= 1; + self.rolling_burst_window.pop_front(); } } @@ -171,6 +171,9 @@ where AsyncSink::NotReady(item) => Ok(AsyncSink::NotReady(item)), AsyncSink::Ready => { self.current_burst_messages += 1; + self.rolling_burst_window.push_back(self.burst_timer.sleep(Duration::from_secs( + self.burst_window_length + ))); Ok(AsyncSink::Ready) } }