diff --git a/Cargo.toml b/Cargo.toml index 472bce1..d5f7226 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,4 +29,5 @@ serde_json = "1.0" tokio-core = "0.1" tokio-io = "0.1" tokio-mockstream = "1.1" +tokio-timer = "0.1" tokio-tls = "0.1" diff --git a/src/client/transport.rs b/src/client/transport.rs index 7d7cdcc..8eba452 100644 --- a/src/client/transport.rs +++ b/src/client/transport.rs @@ -1,10 +1,13 @@ //! An IRC transport that wraps an IRC-framed stream to provide automatic PING replies. use std::sync::{Arc, RwLock, RwLockReadGuard}; -use std::time::Instant; +use std::time::{Duration, Instant}; use futures::{Async, Poll, Sink, StartSend, Stream}; +use time; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::Framed; +use tokio_timer; +use tokio_timer::Interval; use error; use client::data::Config; @@ -16,8 +19,11 @@ where T: AsyncRead + AsyncWrite, { inner: Framed, + ping_timer: Interval, ping_timeout: u64, - last_ping: Instant, + last_ping_data: String, + last_ping_sent: Instant, + last_pong_received: Instant, } impl IrcTransport @@ -28,8 +34,13 @@ where pub fn new(config: &Config, inner: Framed) -> IrcTransport { IrcTransport { inner: inner, - ping_timeout: config.ping_time() as u64, - last_ping: Instant::now(), + ping_timer: tokio_timer::wheel().build().interval( + Duration::from_secs(config.ping_time() as u64) + ), + ping_timeout: config.ping_timeout() as u64, + last_ping_data: String::new(), + last_ping_sent: Instant::now(), + last_pong_received: Instant::now(), } } @@ -37,6 +48,24 @@ where pub fn into_inner(self) -> Framed { self.inner } + + fn ping_timed_out(&self) -> bool { + if self.last_pong_received < self.last_ping_sent { + self.last_ping_sent.elapsed().as_secs() >= self.ping_timeout + } else { + false + } + } + + fn send_ping(&mut self) -> error::Result<()> { + self.last_ping_sent = Instant::now(); + self.last_ping_data = format!("{}", time::now().to_timespec().sec); + let data = self.last_ping_data.clone(); + let result = self.start_send(Command::PING(data, None).into())?; + assert!(result.is_ready()); + self.poll_complete()?; + Ok(()) + } } impl Stream for IrcTransport @@ -47,20 +76,49 @@ where type Error = error::Error; fn poll(&mut self) -> Poll, Self::Error> { - if self.last_ping.elapsed().as_secs() >= self.ping_timeout { + if self.ping_timed_out() { self.close()?; - Err(error::ErrorKind::PingTimeout.into()) - } else { - loop { - match try_ready!(self.inner.poll()) { - Some(Message { command: Command::PING(ref data, _), .. }) => { - self.last_ping = Instant::now(); + return Err(error::ErrorKind::PingTimeout.into()) + } + + let timer_poll = self.ping_timer.poll()?; + let inner_poll = self.inner.poll()?; + + match (inner_poll, timer_poll) { + (Async::NotReady, Async::NotReady) => Ok(Async::NotReady), + (Async::NotReady, Async::Ready(msg)) => { + assert!(msg.is_some()); + self.send_ping()?; + Ok(Async::NotReady) + } + (Async::Ready(None), _) => Ok(Async::Ready(None)), + (Async::Ready(Some(msg)), _) => { + match timer_poll { + Async::Ready(msg) => { + assert!(msg.is_some()); + self.send_ping()?; + } + Async::NotReady => (), + } + + match msg.command { + // Automatically respond to PINGs from the server. + Command::PING(ref data, _) => { let result = self.start_send(Command::PONG(data.to_owned(), None).into())?; assert!(result.is_ready()); self.poll_complete()?; } - message => return Ok(Async::Ready(message)), + // Check PONG responses from the server. + Command::PONG(ref data, None) | + Command::PONG(_, Some(ref data)) => { + if self.last_ping_data == &data[..] { + self.last_pong_received = Instant::now(); + } + } + _ => (), } + + Ok(Async::Ready(Some(msg))) } } } @@ -74,7 +132,7 @@ where type SinkError = error::Error; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - if self.last_ping.elapsed().as_secs() >= self.ping_timeout { + if self.ping_timed_out() { self.close()?; Err(error::ErrorKind::PingTimeout.into()) } else { @@ -83,7 +141,7 @@ where } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - if self.last_ping.elapsed().as_secs() >= self.ping_timeout { + if self.ping_timed_out() { self.close()?; Err(error::ErrorKind::PingTimeout.into()) } else { diff --git a/src/error.rs b/src/error.rs index 7f74cbe..e68a99f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,6 +9,7 @@ error_chain! { Recv(::std::sync::mpsc::RecvError); SendMessage(::futures::sync::mpsc::SendError<::proto::Message>); OneShotCancelled(::futures::sync::oneshot::Canceled); + Timer(::tokio_timer::TimerError); } errors { diff --git a/src/lib.rs b/src/lib.rs index 949031a..59aa436 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ //! A simple, thread-safe, and async-friendly IRC library. #![warn(missing_docs)] +#![recursion_limit="128"] extern crate bufstream; extern crate bytes; @@ -18,6 +19,7 @@ extern crate time; extern crate tokio_core; extern crate tokio_io; extern crate tokio_mockstream; +extern crate tokio_timer; extern crate tokio_tls; pub mod client;