From c7772c047974349553a4e07390db8dcb9e72f17a Mon Sep 17 00:00:00 2001 From: Aaron Weiss Date: Mon, 1 Jan 2018 21:18:33 -0500 Subject: [PATCH] Changed message throttling to use a rolling window. --- src/client/transport.rs | 68 ++++++++++++++++++++++++----------------- src/proto/irc.rs | 1 - 2 files changed, 40 insertions(+), 29 deletions(-) diff --git a/src/client/transport.rs b/src/client/transport.rs index b3bd727..dd79947 100644 --- a/src/client/transport.rs +++ b/src/client/transport.rs @@ -3,12 +3,12 @@ use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; -use futures::{Async, Poll, Sink, StartSend, Stream}; +use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; use chrono::prelude::*; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::Framed; use tokio_timer; -use tokio_timer::Interval; +use tokio_timer::{Interval, Sleep, Timer}; use error; use client::data::Config; @@ -20,9 +20,11 @@ where T: AsyncRead + AsyncWrite, { inner: Framed, - burst_timer: Interval, - max_burst_messages: u32, - current_burst_messages: u32, + burst_timer: Timer, + rolling_burst_window: Sleep, + burst_window_length: u64, + max_burst_messages: u64, + current_burst_messages: u64, ping_timer: Interval, ping_timeout: u64, last_ping_data: String, @@ -37,10 +39,14 @@ where /// Creates a new `IrcTransport` from the given IRC stream. pub fn new(config: &Config, inner: Framed) -> IrcTransport { let timer = tokio_timer::wheel().build(); + let burst = tokio_timer::wheel().build(); + let burst_window = burst.sleep(Duration::from_secs(config.burst_window_length() as u64)); IrcTransport { inner: inner, - burst_timer: timer.interval(Duration::from_secs(config.burst_window_length() as u64)), - max_burst_messages: config.max_messages_in_burst(), + burst_timer: burst, + rolling_burst_window: burst_window, + burst_window_length: config.burst_window_length() as u64, + max_burst_messages: config.max_messages_in_burst() as u64, current_burst_messages: 0, ping_timer: timer.interval(Duration::from_secs(config.ping_time() as u64)), ping_timeout: config.ping_timeout() as u64, @@ -142,7 +148,32 @@ where self.close()?; Err(error::ErrorKind::PingTimeout.into()) } else { - Ok(self.inner.start_send(item)?) + // Check if the last rolling window has ended. + match self.rolling_burst_window.poll()? { + Async::NotReady => (), + Async::Ready(()) => { + // Since it has ended, we reset the message count and start a new window. + self.current_burst_messages = 0; + self.rolling_burst_window = self.burst_timer.sleep(Duration::from_secs( + self.burst_window_length + )); + } + } + + // Throttling if too many messages have been sent recently. + if self.current_burst_messages >= self.max_burst_messages { + // When throttled, we know we need to finish sending what's already queued up. + self.poll_complete()?; + return Ok(AsyncSink::NotReady(item)) + } + + match self.inner.start_send(item)? { + AsyncSink::NotReady(item) => Ok(AsyncSink::NotReady(item)), + AsyncSink::Ready => { + self.current_burst_messages += 1; + Ok(AsyncSink::Ready) + } + } } } @@ -151,26 +182,7 @@ where self.close()?; Err(error::ErrorKind::PingTimeout.into()) } else { - match self.burst_timer.poll()? { - Async::NotReady => (), - Async::Ready(msg) => { - assert!(msg.is_some()); - self.current_burst_messages = 0; - } - } - - // Throttling if too many messages have been sent recently. - if self.current_burst_messages >= self.max_burst_messages { - return Ok(Async::NotReady) - } - - match self.inner.poll_complete()? { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(()) => { - self.current_burst_messages += 1; - Ok(Async::Ready(())) - } - } + Ok(self.inner.poll_complete()?) } } diff --git a/src/proto/irc.rs b/src/proto/irc.rs index a0d51a2..5487ce5 100644 --- a/src/proto/irc.rs +++ b/src/proto/irc.rs @@ -1,5 +1,4 @@ //! Implementation of IRC codec for Tokio. - use bytes::BytesMut; use tokio_io::codec::{Decoder, Encoder};