Upgrade to Tokio 1.0
This commit is contained in:
parent
73dbbedc75
commit
def1442e5e
7 changed files with 106 additions and 94 deletions
14
Cargo.toml
14
Cargo.toml
|
@ -48,8 +48,10 @@ irc-proto = { version = "0.14.0", path = "irc-proto" }
|
||||||
log = "0.4.0"
|
log = "0.4.0"
|
||||||
parking_lot = "0.11.0"
|
parking_lot = "0.11.0"
|
||||||
thiserror = "1.0.0"
|
thiserror = "1.0.0"
|
||||||
tokio = { version = "0.3.0", features = ["net", "stream", "time", "sync"] }
|
pin-project = "1.0.2"
|
||||||
tokio-util = { version = "0.4.0", features = ["codec"] }
|
tokio = { version = "1.0.0", features = ["net", "time", "sync"] }
|
||||||
|
tokio-stream = "0.1.0"
|
||||||
|
tokio-util = { version = "0.6.0", features = ["codec"] }
|
||||||
|
|
||||||
# Feature - Config
|
# Feature - Config
|
||||||
serde = { version = "1.0.0", optional = true }
|
serde = { version = "1.0.0", optional = true }
|
||||||
|
@ -59,12 +61,12 @@ serde_yaml = { version = "0.8.0", optional = true }
|
||||||
toml = { version = "0.5.0", optional = true }
|
toml = { version = "0.5.0", optional = true }
|
||||||
|
|
||||||
# Feature - Proxy
|
# Feature - Proxy
|
||||||
tokio-socks = { version = "0.3.0", optional = true }
|
tokio-socks = { version = "0.5.1", optional = true }
|
||||||
|
|
||||||
# Feature - TLS
|
# Feature - TLS
|
||||||
native-tls = { version = "0.2.0", optional = true }
|
native-tls = { version = "0.2.0", optional = true }
|
||||||
tokio-rustls = { version = "0.20.0", optional = true }
|
tokio-rustls = { version = "0.22.0", optional = true }
|
||||||
tokio-native-tls = { version = "0.2.0", optional = true }
|
tokio-native-tls = { version = "0.3.0", optional = true }
|
||||||
webpki-roots = { version = "0.20.0", optional = true }
|
webpki-roots = { version = "0.20.0", optional = true }
|
||||||
|
|
||||||
|
|
||||||
|
@ -74,7 +76,7 @@ args = "2.0.0"
|
||||||
env_logger = "0.7.0"
|
env_logger = "0.7.0"
|
||||||
futures = "0.3.0"
|
futures = "0.3.0"
|
||||||
getopts = "0.2.0"
|
getopts = "0.2.0"
|
||||||
tokio = { version = "0.3.0", features = ["rt", "rt-multi-thread", "macros", "net", "stream", "time"] }
|
tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros", "net", "time"] }
|
||||||
|
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use futures::prelude::*;
|
|
||||||
use irc::client::prelude::*;
|
use irc::client::prelude::*;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -15,10 +14,10 @@ async fn main() -> irc::error::Result<()> {
|
||||||
let client = Client::from_config(config).await?;
|
let client = Client::from_config(config).await?;
|
||||||
let sender = client.sender();
|
let sender = client.sender();
|
||||||
|
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(1)).fuse();
|
let mut interval = tokio::time::interval(Duration::from_secs(1));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let _ = interval.select_next_some().await;
|
let _ = interval.tick().await;
|
||||||
sender.send_privmsg("#rust-spam", "AWOOOOOOOOOO")?;
|
sender.send_privmsg("#rust-spam", "AWOOOOOOOOOO")?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use futures::prelude::*;
|
|
||||||
use irc::client::prelude::*;
|
use irc::client::prelude::*;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use tokio_stream::StreamExt as _;
|
||||||
|
|
||||||
// NOTE: you can find an asynchronous version of this example with `IrcReactor` in `tooter.rs`.
|
// NOTE: you can find an asynchronous version of this example with `IrcReactor` in `tooter.rs`.
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
@ -16,14 +16,14 @@ async fn main() -> irc::error::Result<()> {
|
||||||
client.identify()?;
|
client.identify()?;
|
||||||
|
|
||||||
let mut stream = client.stream()?;
|
let mut stream = client.stream()?;
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(10)).fuse();
|
let mut interval = tokio::time::interval(Duration::from_secs(10));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
futures::select! {
|
tokio::select! {
|
||||||
m = stream.select_next_some() => {
|
Some(m) = stream.next() => {
|
||||||
println!("{}", m?);
|
println!("{}", m?);
|
||||||
}
|
}
|
||||||
_ = interval.select_next_some() => {
|
_ = interval.tick() => {
|
||||||
client.send_privmsg("#rust-spam", "TWEET TWEET")?;
|
client.send_privmsg("#rust-spam", "TWEET TWEET")?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,6 @@ default = ["bytes", "tokio", "tokio-util"]
|
||||||
encoding = "0.2.0"
|
encoding = "0.2.0"
|
||||||
thiserror = "1.0.0"
|
thiserror = "1.0.0"
|
||||||
|
|
||||||
bytes = { version = "0.5.0", optional = true }
|
bytes = { version = "1.0.0", optional = true }
|
||||||
tokio = { version = "0.3.0", optional = true }
|
tokio = { version = "1.0.0", optional = true }
|
||||||
tokio-util = { version = "0.4.0", features = ["codec"], optional = true }
|
tokio-util = { version = "0.6.0", features = ["codec"], optional = true }
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
//! A module providing IRC connections for use by `IrcServer`s.
|
//! A module providing IRC connections for use by `IrcServer`s.
|
||||||
use futures_util::{sink::Sink, stream::Stream};
|
use futures_util::{sink::Sink, stream::Stream};
|
||||||
|
use pin_project::pin_project;
|
||||||
use std::{
|
use std::{
|
||||||
fmt,
|
fmt,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
|
@ -53,14 +54,15 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
/// An IRC connection used internally by `IrcServer`.
|
/// An IRC connection used internally by `IrcServer`.
|
||||||
|
#[pin_project(project = ConnectionProj)]
|
||||||
pub enum Connection {
|
pub enum Connection {
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
Unsecured(Transport<TcpStream>),
|
Unsecured(#[pin] Transport<TcpStream>),
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
||||||
Secured(Transport<TlsStream<TcpStream>>),
|
Secured(#[pin] Transport<TlsStream<TcpStream>>),
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
Mock(Logged<MockStream>),
|
Mock(#[pin] Logged<MockStream>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for Connection {
|
impl fmt::Debug for Connection {
|
||||||
|
@ -280,12 +282,12 @@ impl Connection {
|
||||||
impl Stream for Connection {
|
impl Stream for Connection {
|
||||||
type Item = error::Result<Message>;
|
type Item = error::Result<Message>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
match &mut *self {
|
match self.project() {
|
||||||
Connection::Unsecured(inner) => Pin::new(inner).poll_next(cx),
|
ConnectionProj::Unsecured(inner) => inner.poll_next(cx),
|
||||||
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
||||||
Connection::Secured(inner) => Pin::new(inner).poll_next(cx),
|
ConnectionProj::Secured(inner) => inner.poll_next(cx),
|
||||||
Connection::Mock(inner) => Pin::new(inner).poll_next(cx),
|
ConnectionProj::Mock(inner) => inner.poll_next(cx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -293,39 +295,39 @@ impl Stream for Connection {
|
||||||
impl Sink<Message> for Connection {
|
impl Sink<Message> for Connection {
|
||||||
type Error = error::Error;
|
type Error = error::Error;
|
||||||
|
|
||||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
match &mut *self {
|
match self.project() {
|
||||||
Connection::Unsecured(inner) => Pin::new(inner).poll_ready(cx),
|
ConnectionProj::Unsecured(inner) => inner.poll_ready(cx),
|
||||||
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
||||||
Connection::Secured(inner) => Pin::new(inner).poll_ready(cx),
|
ConnectionProj::Secured(inner) => inner.poll_ready(cx),
|
||||||
Connection::Mock(inner) => Pin::new(inner).poll_ready(cx),
|
ConnectionProj::Mock(inner) => inner.poll_ready(cx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
|
fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
|
||||||
match &mut *self {
|
match self.project() {
|
||||||
Connection::Unsecured(inner) => Pin::new(inner).start_send(item),
|
ConnectionProj::Unsecured(inner) => inner.start_send(item),
|
||||||
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
||||||
Connection::Secured(inner) => Pin::new(inner).start_send(item),
|
ConnectionProj::Secured(inner) => inner.start_send(item),
|
||||||
Connection::Mock(inner) => Pin::new(inner).start_send(item),
|
ConnectionProj::Mock(inner) => inner.start_send(item),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
match &mut *self {
|
match self.project() {
|
||||||
Connection::Unsecured(inner) => Pin::new(inner).poll_flush(cx),
|
ConnectionProj::Unsecured(inner) => inner.poll_flush(cx),
|
||||||
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
||||||
Connection::Secured(inner) => Pin::new(inner).poll_flush(cx),
|
ConnectionProj::Secured(inner) => inner.poll_flush(cx),
|
||||||
Connection::Mock(inner) => Pin::new(inner).poll_flush(cx),
|
ConnectionProj::Mock(inner) => inner.poll_flush(cx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
match &mut *self {
|
match self.project() {
|
||||||
Connection::Unsecured(inner) => Pin::new(inner).poll_close(cx),
|
ConnectionProj::Unsecured(inner) => inner.poll_close(cx),
|
||||||
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
#[cfg(any(feature = "tls-native", feature = "tls-rust"))]
|
||||||
Connection::Secured(inner) => Pin::new(inner).poll_close(cx),
|
ConnectionProj::Secured(inner) => inner.poll_close(cx),
|
||||||
Connection::Mock(inner) => Pin::new(inner).poll_close(cx),
|
ConnectionProj::Mock(inner) => inner.poll_close(cx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -879,7 +879,7 @@ impl Future for Outgoing {
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match this.stream.poll_next_unpin(cx) {
|
match this.stream.poll_recv(cx) {
|
||||||
Poll::Ready(Some(message)) => ready!(this.try_start_send(cx, message))?,
|
Poll::Ready(Some(message)) => ready!(this.try_start_send(cx, message))?,
|
||||||
Poll::Ready(None) => {
|
Poll::Ready(None) => {
|
||||||
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
|
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
|
||||||
|
|
|
@ -10,6 +10,7 @@ use std::{
|
||||||
|
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use futures_util::{future::Future, ready, sink::Sink, stream::Stream};
|
use futures_util::{future::Future, ready, sink::Sink, stream::Stream};
|
||||||
|
use pin_project::pin_project;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncRead, AsyncWrite},
|
io::{AsyncRead, AsyncWrite},
|
||||||
|
@ -24,6 +25,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Pinger-based futures helper.
|
/// Pinger-based futures helper.
|
||||||
|
#[pin_project]
|
||||||
struct Pinger {
|
struct Pinger {
|
||||||
tx: UnboundedSender<Message>,
|
tx: UnboundedSender<Message>,
|
||||||
// Whether this pinger pings.
|
// Whether this pinger pings.
|
||||||
|
@ -31,8 +33,10 @@ struct Pinger {
|
||||||
/// The amount of time to wait before timing out from no ping response.
|
/// The amount of time to wait before timing out from no ping response.
|
||||||
ping_timeout: Duration,
|
ping_timeout: Duration,
|
||||||
/// The instant that the last ping was sent to the server.
|
/// The instant that the last ping was sent to the server.
|
||||||
|
#[pin]
|
||||||
ping_deadline: Option<Sleep>,
|
ping_deadline: Option<Sleep>,
|
||||||
/// The interval at which to send pings.
|
/// The interval at which to send pings.
|
||||||
|
#[pin]
|
||||||
ping_interval: Interval,
|
ping_interval: Interval,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,11 +56,11 @@ impl Pinger {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle an incoming message.
|
/// Handle an incoming message.
|
||||||
fn handle_message(&mut self, message: &Message) -> error::Result<()> {
|
fn handle_message(self: Pin<&mut Self>, message: &Message) -> error::Result<()> {
|
||||||
match message.command {
|
match message.command {
|
||||||
Command::Response(Response::RPL_ENDOFMOTD, _)
|
Command::Response(Response::RPL_ENDOFMOTD, _)
|
||||||
| Command::Response(Response::ERR_NOMOTD, _) => {
|
| Command::Response(Response::ERR_NOMOTD, _) => {
|
||||||
self.enabled = true;
|
*self.project().enabled = true;
|
||||||
}
|
}
|
||||||
// On receiving a `PING` message from the server, we automatically respond with
|
// On receiving a `PING` message from the server, we automatically respond with
|
||||||
// the appropriate `PONG` message to keep the connection alive for transport.
|
// the appropriate `PONG` message to keep the connection alive for transport.
|
||||||
|
@ -67,7 +71,7 @@ impl Pinger {
|
||||||
// last instant that the pong was received. This will prevent timeout.
|
// last instant that the pong was received. This will prevent timeout.
|
||||||
Command::PONG(_, None) | Command::PONG(_, Some(_)) => {
|
Command::PONG(_, None) | Command::PONG(_, Some(_)) => {
|
||||||
log::trace!("Received PONG");
|
log::trace!("Received PONG");
|
||||||
self.ping_deadline.take();
|
self.project().ping_deadline.set(None);
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
@ -76,47 +80,47 @@ impl Pinger {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a pong.
|
/// Send a pong.
|
||||||
fn send_pong(&mut self, data: &str) -> error::Result<()> {
|
fn send_pong(self: Pin<&mut Self>, data: &str) -> error::Result<()> {
|
||||||
self.tx.send(Command::PONG(data.to_owned(), None).into())?;
|
self.project()
|
||||||
|
.tx
|
||||||
|
.send(Command::PONG(data.to_owned(), None).into())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a ping via the transport.
|
/// Sends a ping via the transport.
|
||||||
fn send_ping(&mut self) -> error::Result<()> {
|
fn send_ping(self: Pin<&mut Self>) -> error::Result<()> {
|
||||||
log::trace!("Sending PING");
|
log::trace!("Sending PING");
|
||||||
|
|
||||||
// Creates new ping data using the local timestamp.
|
// Creates new ping data using the local timestamp.
|
||||||
let data = format!("{}", Local::now().timestamp());
|
let data = format!("{}", Local::now().timestamp());
|
||||||
|
|
||||||
self.tx.send(Command::PING(data.clone(), None).into())?;
|
let mut this = self.project();
|
||||||
|
|
||||||
|
this.tx.send(Command::PING(data.clone(), None).into())?;
|
||||||
|
|
||||||
|
if this.ping_deadline.is_none() {
|
||||||
|
let ping_deadline = time::sleep(*this.ping_timeout);
|
||||||
|
this.ping_deadline.set(Some(ping_deadline));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set the ping deadline.
|
|
||||||
fn set_deadline(&mut self) {
|
|
||||||
if self.ping_deadline.is_none() {
|
|
||||||
let ping_deadline = time::sleep(self.ping_timeout);
|
|
||||||
self.ping_deadline = Some(ping_deadline);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for Pinger {
|
impl Future for Pinger {
|
||||||
type Output = Result<(), error::Error>;
|
type Output = Result<(), error::Error>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
if let Some(ping_deadline) = self.as_mut().ping_deadline.as_mut() {
|
if let Some(ping_deadline) = self.as_mut().project().ping_deadline.as_pin_mut() {
|
||||||
match Pin::new(ping_deadline).poll(cx) {
|
match ping_deadline.poll(cx) {
|
||||||
Poll::Ready(()) => return Poll::Ready(Err(error::Error::PingTimeout)),
|
Poll::Ready(()) => return Poll::Ready(Err(error::Error::PingTimeout)),
|
||||||
Poll::Pending => (),
|
Poll::Pending => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Poll::Ready(_) = Pin::new(&mut self.as_mut().ping_interval).poll_next(cx) {
|
if let Poll::Ready(_) = self.as_mut().project().ping_interval.poll_tick(cx) {
|
||||||
if self.enabled {
|
if *self.as_mut().project().enabled {
|
||||||
self.as_mut().send_ping()?;
|
self.as_mut().send_ping()?;
|
||||||
self.as_mut().set_deadline();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,15 +131,16 @@ impl Future for Pinger {
|
||||||
/// An IRC transport that handles core functionality for the IRC protocol. This is used in the
|
/// An IRC transport that handles core functionality for the IRC protocol. This is used in the
|
||||||
/// implementation of `Connection` and ultimately `IrcServer`, and plays an important role in
|
/// implementation of `Connection` and ultimately `IrcServer`, and plays an important role in
|
||||||
/// handling connection timeouts, message throttling, and ping response.
|
/// handling connection timeouts, message throttling, and ping response.
|
||||||
|
#[pin_project]
|
||||||
pub struct Transport<T> {
|
pub struct Transport<T> {
|
||||||
/// The inner connection framed with an `IrcCodec`.
|
/// The inner connection framed with an `IrcCodec`.
|
||||||
|
#[pin]
|
||||||
inner: Framed<T, IrcCodec>,
|
inner: Framed<T, IrcCodec>,
|
||||||
/// Helper for handle pinging.
|
/// Helper for handle pinging.
|
||||||
|
#[pin]
|
||||||
pinger: Option<Pinger>,
|
pinger: Option<Pinger>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Unpin for Transport<T> where T: Unpin {}
|
|
||||||
|
|
||||||
impl<T> Transport<T>
|
impl<T> Transport<T>
|
||||||
where
|
where
|
||||||
T: Unpin + AsyncRead + AsyncWrite,
|
T: Unpin + AsyncRead + AsyncWrite,
|
||||||
|
@ -164,21 +169,21 @@ where
|
||||||
type Item = Result<Message, error::Error>;
|
type Item = Result<Message, error::Error>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
if let Some(pinger) = self.as_mut().pinger.as_mut() {
|
if let Some(pinger) = self.as_mut().project().pinger.as_pin_mut() {
|
||||||
match Pin::new(pinger).poll(cx) {
|
match pinger.poll(cx) {
|
||||||
Poll::Ready(result) => result?,
|
Poll::Ready(result) => result?,
|
||||||
Poll::Pending => (),
|
Poll::Pending => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = ready!(Pin::new(&mut self.as_mut().inner).poll_next(cx));
|
let result = ready!(self.as_mut().project().inner.poll_next(cx));
|
||||||
|
|
||||||
let message = match result {
|
let message = match result {
|
||||||
None => return Poll::Ready(None),
|
None => return Poll::Ready(None),
|
||||||
Some(message) => message?,
|
Some(message) => message?,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(pinger) = self.as_mut().pinger.as_mut() {
|
if let Some(pinger) = self.as_mut().project().pinger.as_pin_mut() {
|
||||||
pinger.handle_message(&message)?;
|
pinger.handle_message(&message)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,24 +197,24 @@ where
|
||||||
{
|
{
|
||||||
type Error = error::Error;
|
type Error = error::Error;
|
||||||
|
|
||||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
ready!(Pin::new(&mut self.as_mut().inner).poll_ready(cx))?;
|
ready!(self.project().inner.poll_ready(cx))?;
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
|
fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
|
||||||
log::trace!("[SEND] {}", item);
|
log::trace!("[SEND] {}", item);
|
||||||
Pin::new(&mut self.as_mut().inner).start_send(item)?;
|
self.project().inner.start_send(item)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
ready!(Pin::new(&mut self.as_mut().inner).poll_flush(cx))?;
|
ready!(self.project().inner.poll_flush(cx))?;
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
ready!(Pin::new(&mut self.as_mut().inner).poll_close(cx))?;
|
ready!(self.project().inner.poll_close(cx))?;
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -235,13 +240,13 @@ impl LogView {
|
||||||
|
|
||||||
/// A logged version of the `Transport` that records all sent and received messages.
|
/// A logged version of the `Transport` that records all sent and received messages.
|
||||||
/// Note: this will introduce some performance overhead by cloning all messages.
|
/// Note: this will introduce some performance overhead by cloning all messages.
|
||||||
|
#[pin_project]
|
||||||
pub struct Logged<T> {
|
pub struct Logged<T> {
|
||||||
|
#[pin]
|
||||||
inner: Transport<T>,
|
inner: Transport<T>,
|
||||||
view: LogView,
|
view: LogView,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Unpin for Logged<T> where T: Unpin {}
|
|
||||||
|
|
||||||
impl<T> Logged<T>
|
impl<T> Logged<T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite,
|
||||||
|
@ -269,12 +274,14 @@ where
|
||||||
{
|
{
|
||||||
type Item = Result<Message, error::Error>;
|
type Item = Result<Message, error::Error>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
match ready!(Pin::new(&mut self.as_mut().inner).poll_next(cx)) {
|
let this = self.project();
|
||||||
|
|
||||||
|
match ready!(this.inner.poll_next(cx)) {
|
||||||
Some(msg) => {
|
Some(msg) => {
|
||||||
let msg = msg?;
|
let msg = msg?;
|
||||||
|
|
||||||
self.view
|
this.view
|
||||||
.received
|
.received
|
||||||
.write()
|
.write()
|
||||||
.map_err(|_| error::Error::PoisonedLog)?
|
.map_err(|_| error::Error::PoisonedLog)?
|
||||||
|
@ -293,18 +300,20 @@ where
|
||||||
{
|
{
|
||||||
type Error = error::Error;
|
type Error = error::Error;
|
||||||
|
|
||||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
Pin::new(&mut self.as_mut().inner).poll_ready(cx)
|
self.project().inner.poll_ready(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
Pin::new(&mut self.as_mut().inner).poll_close(cx)
|
self.project().inner.poll_close(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
|
fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
|
||||||
Pin::new(&mut self.as_mut().inner).start_send(item.clone())?;
|
let this = self.project();
|
||||||
|
|
||||||
self.view
|
this.inner.start_send(item.clone())?;
|
||||||
|
|
||||||
|
this.view
|
||||||
.sent
|
.sent
|
||||||
.write()
|
.write()
|
||||||
.map_err(|_| error::Error::PoisonedLog)?
|
.map_err(|_| error::Error::PoisonedLog)?
|
||||||
|
@ -313,7 +322,7 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
Pin::new(&mut self.as_mut().inner).poll_flush(cx)
|
self.project().inner.poll_flush(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue