Implemented ping sending inside of the transport.

This commit is contained in:
Aaron Weiss 2017-06-27 18:21:11 -07:00
parent 07614fb26f
commit 4d3f4c556a
No known key found for this signature in database
GPG key ID: 0237035D9BF03AE2
4 changed files with 76 additions and 14 deletions

View file

@ -29,4 +29,5 @@ serde_json = "1.0"
tokio-core = "0.1" tokio-core = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-mockstream = "1.1" tokio-mockstream = "1.1"
tokio-timer = "0.1"
tokio-tls = "0.1" tokio-tls = "0.1"

View file

@ -1,10 +1,13 @@
//! An IRC transport that wraps an IRC-framed stream to provide automatic PING replies. //! An IRC transport that wraps an IRC-framed stream to provide automatic PING replies.
use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Instant; use std::time::{Duration, Instant};
use futures::{Async, Poll, Sink, StartSend, Stream}; use futures::{Async, Poll, Sink, StartSend, Stream};
use time;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Framed; use tokio_io::codec::Framed;
use tokio_timer;
use tokio_timer::Interval;
use error; use error;
use client::data::Config; use client::data::Config;
@ -16,8 +19,11 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
{ {
inner: Framed<T, IrcCodec>, inner: Framed<T, IrcCodec>,
ping_timer: Interval,
ping_timeout: u64, ping_timeout: u64,
last_ping: Instant, last_ping_data: String,
last_ping_sent: Instant,
last_pong_received: Instant,
} }
impl<T> IrcTransport<T> impl<T> IrcTransport<T>
@ -28,8 +34,13 @@ where
pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> { pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> {
IrcTransport { IrcTransport {
inner: inner, inner: inner,
ping_timeout: config.ping_time() as u64, ping_timer: tokio_timer::wheel().build().interval(
last_ping: Instant::now(), Duration::from_secs(config.ping_time() as u64)
),
ping_timeout: config.ping_timeout() as u64,
last_ping_data: String::new(),
last_ping_sent: Instant::now(),
last_pong_received: Instant::now(),
} }
} }
@ -37,6 +48,24 @@ where
pub fn into_inner(self) -> Framed<T, IrcCodec> { pub fn into_inner(self) -> Framed<T, IrcCodec> {
self.inner self.inner
} }
fn ping_timed_out(&self) -> bool {
if self.last_pong_received < self.last_ping_sent {
self.last_ping_sent.elapsed().as_secs() >= self.ping_timeout
} else {
false
}
}
fn send_ping(&mut self) -> error::Result<()> {
self.last_ping_sent = Instant::now();
self.last_ping_data = format!("{}", time::now().to_timespec().sec);
let data = self.last_ping_data.clone();
let result = self.start_send(Command::PING(data, None).into())?;
assert!(result.is_ready());
self.poll_complete()?;
Ok(())
}
} }
impl<T> Stream for IrcTransport<T> impl<T> Stream for IrcTransport<T>
@ -47,20 +76,49 @@ where
type Error = error::Error; type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.last_ping.elapsed().as_secs() >= self.ping_timeout { if self.ping_timed_out() {
self.close()?; self.close()?;
Err(error::ErrorKind::PingTimeout.into()) return Err(error::ErrorKind::PingTimeout.into())
} else { }
loop {
match try_ready!(self.inner.poll()) { let timer_poll = self.ping_timer.poll()?;
Some(Message { command: Command::PING(ref data, _), .. }) => { let inner_poll = self.inner.poll()?;
self.last_ping = Instant::now();
match (inner_poll, timer_poll) {
(Async::NotReady, Async::NotReady) => Ok(Async::NotReady),
(Async::NotReady, Async::Ready(msg)) => {
assert!(msg.is_some());
self.send_ping()?;
Ok(Async::NotReady)
}
(Async::Ready(None), _) => Ok(Async::Ready(None)),
(Async::Ready(Some(msg)), _) => {
match timer_poll {
Async::Ready(msg) => {
assert!(msg.is_some());
self.send_ping()?;
}
Async::NotReady => (),
}
match msg.command {
// Automatically respond to PINGs from the server.
Command::PING(ref data, _) => {
let result = self.start_send(Command::PONG(data.to_owned(), None).into())?; let result = self.start_send(Command::PONG(data.to_owned(), None).into())?;
assert!(result.is_ready()); assert!(result.is_ready());
self.poll_complete()?; self.poll_complete()?;
} }
message => return Ok(Async::Ready(message)), // Check PONG responses from the server.
Command::PONG(ref data, None) |
Command::PONG(_, Some(ref data)) => {
if self.last_ping_data == &data[..] {
self.last_pong_received = Instant::now();
}
}
_ => (),
} }
Ok(Async::Ready(Some(msg)))
} }
} }
} }
@ -74,7 +132,7 @@ where
type SinkError = error::Error; type SinkError = error::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
if self.last_ping.elapsed().as_secs() >= self.ping_timeout { if self.ping_timed_out() {
self.close()?; self.close()?;
Err(error::ErrorKind::PingTimeout.into()) Err(error::ErrorKind::PingTimeout.into())
} else { } else {
@ -83,7 +141,7 @@ where
} }
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
if self.last_ping.elapsed().as_secs() >= self.ping_timeout { if self.ping_timed_out() {
self.close()?; self.close()?;
Err(error::ErrorKind::PingTimeout.into()) Err(error::ErrorKind::PingTimeout.into())
} else { } else {

View file

@ -9,6 +9,7 @@ error_chain! {
Recv(::std::sync::mpsc::RecvError); Recv(::std::sync::mpsc::RecvError);
SendMessage(::futures::sync::mpsc::SendError<::proto::Message>); SendMessage(::futures::sync::mpsc::SendError<::proto::Message>);
OneShotCancelled(::futures::sync::oneshot::Canceled); OneShotCancelled(::futures::sync::oneshot::Canceled);
Timer(::tokio_timer::TimerError);
} }
errors { errors {

View file

@ -1,6 +1,7 @@
//! A simple, thread-safe, and async-friendly IRC library. //! A simple, thread-safe, and async-friendly IRC library.
#![warn(missing_docs)] #![warn(missing_docs)]
#![recursion_limit="128"]
extern crate bufstream; extern crate bufstream;
extern crate bytes; extern crate bytes;
@ -18,6 +19,7 @@ extern crate time;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_mockstream; extern crate tokio_mockstream;
extern crate tokio_timer;
extern crate tokio_tls; extern crate tokio_tls;
pub mod client; pub mod client;