Changed message throttling to use a rolling window.
This commit is contained in:
parent
172e55b623
commit
c7772c0479
2 changed files with 40 additions and 29 deletions
|
@ -3,12 +3,12 @@
|
||||||
use std::sync::{Arc, RwLock, RwLockReadGuard};
|
use std::sync::{Arc, RwLock, RwLockReadGuard};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures::{Async, Poll, Sink, StartSend, Stream};
|
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use tokio_io::codec::Framed;
|
use tokio_io::codec::Framed;
|
||||||
use tokio_timer;
|
use tokio_timer;
|
||||||
use tokio_timer::Interval;
|
use tokio_timer::{Interval, Sleep, Timer};
|
||||||
|
|
||||||
use error;
|
use error;
|
||||||
use client::data::Config;
|
use client::data::Config;
|
||||||
|
@ -20,9 +20,11 @@ where
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite,
|
||||||
{
|
{
|
||||||
inner: Framed<T, IrcCodec>,
|
inner: Framed<T, IrcCodec>,
|
||||||
burst_timer: Interval,
|
burst_timer: Timer,
|
||||||
max_burst_messages: u32,
|
rolling_burst_window: Sleep,
|
||||||
current_burst_messages: u32,
|
burst_window_length: u64,
|
||||||
|
max_burst_messages: u64,
|
||||||
|
current_burst_messages: u64,
|
||||||
ping_timer: Interval,
|
ping_timer: Interval,
|
||||||
ping_timeout: u64,
|
ping_timeout: u64,
|
||||||
last_ping_data: String,
|
last_ping_data: String,
|
||||||
|
@ -37,10 +39,14 @@ where
|
||||||
/// Creates a new `IrcTransport` from the given IRC stream.
|
/// Creates a new `IrcTransport` from the given IRC stream.
|
||||||
pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> {
|
pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> {
|
||||||
let timer = tokio_timer::wheel().build();
|
let timer = tokio_timer::wheel().build();
|
||||||
|
let burst = tokio_timer::wheel().build();
|
||||||
|
let burst_window = burst.sleep(Duration::from_secs(config.burst_window_length() as u64));
|
||||||
IrcTransport {
|
IrcTransport {
|
||||||
inner: inner,
|
inner: inner,
|
||||||
burst_timer: timer.interval(Duration::from_secs(config.burst_window_length() as u64)),
|
burst_timer: burst,
|
||||||
max_burst_messages: config.max_messages_in_burst(),
|
rolling_burst_window: burst_window,
|
||||||
|
burst_window_length: config.burst_window_length() as u64,
|
||||||
|
max_burst_messages: config.max_messages_in_burst() as u64,
|
||||||
current_burst_messages: 0,
|
current_burst_messages: 0,
|
||||||
ping_timer: timer.interval(Duration::from_secs(config.ping_time() as u64)),
|
ping_timer: timer.interval(Duration::from_secs(config.ping_time() as u64)),
|
||||||
ping_timeout: config.ping_timeout() as u64,
|
ping_timeout: config.ping_timeout() as u64,
|
||||||
|
@ -142,7 +148,32 @@ where
|
||||||
self.close()?;
|
self.close()?;
|
||||||
Err(error::ErrorKind::PingTimeout.into())
|
Err(error::ErrorKind::PingTimeout.into())
|
||||||
} else {
|
} else {
|
||||||
Ok(self.inner.start_send(item)?)
|
// Check if the last rolling window has ended.
|
||||||
|
match self.rolling_burst_window.poll()? {
|
||||||
|
Async::NotReady => (),
|
||||||
|
Async::Ready(()) => {
|
||||||
|
// Since it has ended, we reset the message count and start a new window.
|
||||||
|
self.current_burst_messages = 0;
|
||||||
|
self.rolling_burst_window = self.burst_timer.sleep(Duration::from_secs(
|
||||||
|
self.burst_window_length
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Throttling if too many messages have been sent recently.
|
||||||
|
if self.current_burst_messages >= self.max_burst_messages {
|
||||||
|
// When throttled, we know we need to finish sending what's already queued up.
|
||||||
|
self.poll_complete()?;
|
||||||
|
return Ok(AsyncSink::NotReady(item))
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.inner.start_send(item)? {
|
||||||
|
AsyncSink::NotReady(item) => Ok(AsyncSink::NotReady(item)),
|
||||||
|
AsyncSink::Ready => {
|
||||||
|
self.current_burst_messages += 1;
|
||||||
|
Ok(AsyncSink::Ready)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,26 +182,7 @@ where
|
||||||
self.close()?;
|
self.close()?;
|
||||||
Err(error::ErrorKind::PingTimeout.into())
|
Err(error::ErrorKind::PingTimeout.into())
|
||||||
} else {
|
} else {
|
||||||
match self.burst_timer.poll()? {
|
Ok(self.inner.poll_complete()?)
|
||||||
Async::NotReady => (),
|
|
||||||
Async::Ready(msg) => {
|
|
||||||
assert!(msg.is_some());
|
|
||||||
self.current_burst_messages = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Throttling if too many messages have been sent recently.
|
|
||||||
if self.current_burst_messages >= self.max_burst_messages {
|
|
||||||
return Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
|
|
||||||
match self.inner.poll_complete()? {
|
|
||||||
Async::NotReady => Ok(Async::NotReady),
|
|
||||||
Async::Ready(()) => {
|
|
||||||
self.current_burst_messages += 1;
|
|
||||||
Ok(Async::Ready(()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
//! Implementation of IRC codec for Tokio.
|
//! Implementation of IRC codec for Tokio.
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use tokio_io::codec::{Decoder, Encoder};
|
use tokio_io::codec::{Decoder, Encoder};
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue