diff --git a/src/client/transport.rs b/src/client/transport.rs index 1c42986..2b14456 100644 --- a/src/client/transport.rs +++ b/src/client/transport.rs @@ -1,5 +1,6 @@ //! An IRC transport that wraps an IRC-framed stream to provide a number of features including -//! automatic PING replies, automatic sending of PINGs, and message rate-limiting. +//! automatic PING replies, automatic sending of PINGs, and message rate-limiting. This can be used +//! as the basis for implementing a more full IRC client. use std::collections::VecDeque; use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; @@ -15,21 +16,34 @@ use error; use client::data::Config; use proto::{Command, IrcCodec, Message}; -/// An IRC transport that handles core functionality. +/// An IRC transport that handles core functionality for the IRC protocol. This is used in the +/// implementation of `Connection` and ultimately `IrcServer`, and plays an important role in +/// handling connection timeouts, message throttling, and ping response. pub struct IrcTransport where T: AsyncRead + AsyncWrite, { + /// The inner connection framed with an `IrcCodec`. inner: Framed, + /// A timer used in computing windows for message throttling. burst_timer: Timer, + /// A queue of tasks used to implement message throttling. rolling_burst_window: VecDeque, + /// The amount of time that each window for throttling should last (in seconds). burst_window_length: u64, + /// The maximum number of messages that can be sent in each window. max_burst_messages: u64, + /// The number of messages sent in the current window. current_burst_messages: u64, + /// A timer used to determine when to send the next ping messages to the server. ping_timer: Interval, + /// The amount of time to wait before timing out from no ping response. ping_timeout: u64, + /// The last data sent with a ping. last_ping_data: String, + /// The instant that the last ping was sent to the server. last_ping_sent: Instant, + /// The instant that the last pong was received from the server. last_pong_received: Instant, } @@ -41,7 +55,7 @@ where pub fn new(config: &Config, inner: Framed) -> IrcTransport { let timer = tokio_timer::wheel().build(); IrcTransport { - inner: inner, + inner, burst_timer: tokio_timer::wheel().build(), rolling_burst_window: VecDeque::new(), burst_window_length: u64::from(config.burst_window_length()), @@ -60,6 +74,7 @@ where self.inner } + /// Determines whether or not the transport has hit the ping timeout. fn ping_timed_out(&self) -> bool { if self.last_pong_received < self.last_ping_sent { self.last_ping_sent.elapsed().as_secs() >= self.ping_timeout @@ -68,18 +83,24 @@ where } } + /// Sends a ping via the transport. fn send_ping(&mut self) -> error::Result<()> { + // Creates new ping data using the local timestamp. let last_ping_data = format!("{}", Local::now().timestamp()); let data = last_ping_data.clone(); let result = self.start_send(Command::PING(data, None).into())?; if let AsyncSink::Ready = result { self.poll_complete()?; + // If we succeeded in sending the ping, we will update when the last ping was sent, and + // the data that was sent with it. self.last_ping_sent = Instant::now(); self.last_ping_data = last_ping_data; } Ok(()) } + /// Polls the most recent burst window from the queue, always returning `NotReady` if none are + /// left for whatever reason. fn rolling_burst_window_front(&mut self) -> Result, tokio_timer::TimerError> { self.rolling_burst_window.front_mut().map(|w| w.poll()).unwrap_or(Ok(Async::NotReady)) } @@ -93,42 +114,61 @@ where type Error = error::IrcError; fn poll(&mut self) -> Poll, Self::Error> { + // If the ping timeout has been reached, we close the connection out and return an error. if self.ping_timed_out() { self.close()?; return Err(error::IrcError::PingTimeout) } + // We poll both streams before doing any work because it is important to ensure that the + // task is correctly woken up when they are ready. let timer_poll = self.ping_timer.poll()?; let inner_poll = self.inner.poll()?; match (inner_poll, timer_poll) { + // If neither the stream nor the ping timer are ready, the transport is not ready. (Async::NotReady, Async::NotReady) => Ok(Async::NotReady), + + // If there's nothing available yet from the stream, but the ping timer is ready, we + // simply send a ping and indicate that the transport has nothing to yield yet. (Async::NotReady, Async::Ready(msg)) => { assert!(msg.is_some()); self.send_ping()?; Ok(Async::NotReady) } + + // If the stream yields `None`, the connection has been terminated. Thus, we don't need + // to worry about checking the ping timer, and can instead indicate that the transport + // has been terminated. (Async::Ready(None), _) => Ok(Async::Ready(None)), + + // If we have a new message available from the stream, we'll need to do some work, and + // then yield the message. (Async::Ready(Some(msg)), _) => { + // If the ping timer has returned, it is time to send another `PING` message! if let Async::Ready(msg) = timer_poll { assert!(msg.is_some()); self.send_ping()?; } match msg.command { - // Automatically respond to PINGs from the server. + // On receiving a `PING` message from the server, we automatically respond with + // the appropriate `PONG` message to keep the connection alive for transport. Command::PING(ref data, _) => { let result = self.start_send(Command::PONG(data.to_owned(), None).into())?; assert!(result.is_ready()); self.poll_complete()?; } - // Check PONG responses from the server. + + // Check `PONG` responses from the server. If it matches, we will update the + // last instant that the pong was received. This will prevent timeout. Command::PONG(ref data, None) | Command::PONG(_, Some(ref data)) => { if self.last_ping_data == data[..] { self.last_pong_received = Instant::now(); } } + _ => (), } @@ -146,50 +186,52 @@ where type SinkError = error::IrcError; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + // If the ping timeout has been reached, we close the connection out and return an error. if self.ping_timed_out() { self.close()?; - Err(error::IrcError::PingTimeout) - } else { - // Check if the oldest message in the rolling window is discounted. - if let Async::Ready(()) = self.rolling_burst_window_front()? { - self.current_burst_messages -= 1; - self.rolling_burst_window.pop_front(); - } + return Err(error::IrcError::PingTimeout) + } - // Throttling if too many messages have been sent recently. - if self.current_burst_messages >= self.max_burst_messages { - // When throttled, we know we need to finish sending what's already queued up. - self.poll_complete()?; - return Ok(AsyncSink::NotReady(item)) - } + // Check if the oldest message in the rolling window is discounted. + if let Async::Ready(()) = self.rolling_burst_window_front()? { + self.current_burst_messages -= 1; + self.rolling_burst_window.pop_front(); + } - match self.inner.start_send(item)? { - AsyncSink::NotReady(item) => Ok(AsyncSink::NotReady(item)), - AsyncSink::Ready => { - self.current_burst_messages += 1; - self.rolling_burst_window.push_back(self.burst_timer.sleep(Duration::from_secs( - self.burst_window_length - ))); - Ok(AsyncSink::Ready) - } + // Throttling if too many messages have been sent recently. + if self.current_burst_messages >= self.max_burst_messages { + // When throttled, we know we need to finish sending what's already queued up. + self.poll_complete()?; + return Ok(AsyncSink::NotReady(item)) + } + + match self.inner.start_send(item)? { + AsyncSink::NotReady(item) => Ok(AsyncSink::NotReady(item)), + AsyncSink::Ready => { + self.current_burst_messages += 1; + self.rolling_burst_window.push_back(self.burst_timer.sleep(Duration::from_secs( + self.burst_window_length + ))); + Ok(AsyncSink::Ready) } } } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + // If the ping timeout has been reached, we close the connection out and return an error. if self.ping_timed_out() { self.close()?; - Err(error::IrcError::PingTimeout) - } else { - // If it's time to send a ping, we should do it! This is necessary to ensure that the - // sink half will close even if the stream half closed without a ping timeout. - if let Async::Ready(msg) = self.ping_timer.poll()? { - assert!(msg.is_some()); - self.send_ping()?; - } - - Ok(self.inner.poll_complete()?) + return Err(error::IrcError::PingTimeout) } + + // If it's time to send a ping, we should do it! This is necessary to ensure that the + // sink half will close even if the stream half closed without a ping timeout. + if let Async::Ready(msg) = self.ping_timer.poll()? { + assert!(msg.is_some()); + self.send_ping()?; + } + + Ok(self.inner.poll_complete()?) } fn close(&mut self) -> Poll<(), Self::SinkError> { @@ -218,22 +260,16 @@ impl LogView { /// A logged version of the `IrcTransport` that records all sent and received messages. /// Note: this will introduce some performance overhead by cloning all messages. -pub struct Logged -where - T: AsyncRead + AsyncWrite, -{ +pub struct Logged where T: AsyncRead + AsyncWrite { inner: IrcTransport, view: LogView, } -impl Logged -where - T: AsyncRead + AsyncWrite, -{ +impl Logged where T: AsyncRead + AsyncWrite { /// Wraps the given `IrcTransport` in logging. pub fn wrap(inner: IrcTransport) -> Logged { Logged { - inner: inner, + inner, view: LogView { sent: Arc::new(RwLock::new(vec![])), received: Arc::new(RwLock::new(vec![])),