Changed throttling window to be properly rolling.

This commit is contained in:
Aaron Weiss 2018-01-01 21:47:59 -05:00
parent 16292fcc2c
commit 70ff2ec992
No known key found for this signature in database
GPG key ID: 047D32DF25DC22EF

View file

@ -1,5 +1,6 @@
//! An IRC transport that wraps an IRC-framed stream to provide a number of features including //! An IRC transport that wraps an IRC-framed stream to provide a number of features including
//! automatic PING replies, automatic sending of PINGs, and message rate-limiting. //! automatic PING replies, automatic sending of PINGs, and message rate-limiting.
use std::collections::VecDeque;
use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -21,7 +22,7 @@ where
{ {
inner: Framed<T, IrcCodec>, inner: Framed<T, IrcCodec>,
burst_timer: Timer, burst_timer: Timer,
rolling_burst_window: Sleep, rolling_burst_window: VecDeque<Sleep>,
burst_window_length: u64, burst_window_length: u64,
max_burst_messages: u64, max_burst_messages: u64,
current_burst_messages: u64, current_burst_messages: u64,
@ -39,12 +40,10 @@ where
/// Creates a new `IrcTransport` from the given IRC stream. /// Creates a new `IrcTransport` from the given IRC stream.
pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> { pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> {
let timer = tokio_timer::wheel().build(); let timer = tokio_timer::wheel().build();
let burst = tokio_timer::wheel().build();
let burst_window = burst.sleep(Duration::from_secs(config.burst_window_length() as u64));
IrcTransport { IrcTransport {
inner: inner, inner: inner,
burst_timer: burst, burst_timer: tokio_timer::wheel().build(),
rolling_burst_window: burst_window, rolling_burst_window: VecDeque::new(),
burst_window_length: config.burst_window_length() as u64, burst_window_length: config.burst_window_length() as u64,
max_burst_messages: config.max_messages_in_burst() as u64, max_burst_messages: config.max_messages_in_burst() as u64,
current_burst_messages: 0, current_burst_messages: 0,
@ -78,6 +77,10 @@ where
self.poll_complete()?; self.poll_complete()?;
Ok(()) Ok(())
} }
fn rolling_burst_window_front(&mut self) -> Result<Async<()>, tokio_timer::TimerError> {
self.rolling_burst_window.front_mut().map(|w| w.poll()).unwrap_or(Ok(Async::NotReady))
}
} }
impl<T> Stream for IrcTransport<T> impl<T> Stream for IrcTransport<T>
@ -148,15 +151,12 @@ where
self.close()?; self.close()?;
Err(error::ErrorKind::PingTimeout.into()) Err(error::ErrorKind::PingTimeout.into())
} else { } else {
// Check if the last rolling window has ended. // Check if the oldest message in the rolling window is discounted.
match self.rolling_burst_window.poll()? { match self.rolling_burst_window_front()? {
Async::NotReady => (), Async::NotReady => (),
Async::Ready(()) => { Async::Ready(()) => {
// Since it has ended, we reset the message count and start a new window. self.current_burst_messages -= 1;
self.current_burst_messages = 0; self.rolling_burst_window.pop_front();
self.rolling_burst_window = self.burst_timer.sleep(Duration::from_secs(
self.burst_window_length
));
} }
} }
@ -171,6 +171,9 @@ where
AsyncSink::NotReady(item) => Ok(AsyncSink::NotReady(item)), AsyncSink::NotReady(item) => Ok(AsyncSink::NotReady(item)),
AsyncSink::Ready => { AsyncSink::Ready => {
self.current_burst_messages += 1; self.current_burst_messages += 1;
self.rolling_burst_window.push_back(self.burst_timer.sleep(Duration::from_secs(
self.burst_window_length
)));
Ok(AsyncSink::Ready) Ok(AsyncSink::Ready)
} }
} }