From def1442e5e3a154103959346aba7bb137786e45a Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Tue, 29 Dec 2020 09:29:06 +0100 Subject: [PATCH] Upgrade to Tokio 1.0 --- Cargo.toml | 14 +++--- examples/tooter.rs | 5 +- examples/tweeter.rs | 10 ++-- irc-proto/Cargo.toml | 6 +-- src/client/conn.rs | 58 +++++++++++----------- src/client/mod.rs | 2 +- src/client/transport.rs | 105 ++++++++++++++++++++++------------------ 7 files changed, 106 insertions(+), 94 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7e56736..cfdda7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,8 +48,10 @@ irc-proto = { version = "0.14.0", path = "irc-proto" } log = "0.4.0" parking_lot = "0.11.0" thiserror = "1.0.0" -tokio = { version = "0.3.0", features = ["net", "stream", "time", "sync"] } -tokio-util = { version = "0.4.0", features = ["codec"] } +pin-project = "1.0.2" +tokio = { version = "1.0.0", features = ["net", "time", "sync"] } +tokio-stream = "0.1.0" +tokio-util = { version = "0.6.0", features = ["codec"] } # Feature - Config serde = { version = "1.0.0", optional = true } @@ -59,12 +61,12 @@ serde_yaml = { version = "0.8.0", optional = true } toml = { version = "0.5.0", optional = true } # Feature - Proxy -tokio-socks = { version = "0.3.0", optional = true } +tokio-socks = { version = "0.5.1", optional = true } # Feature - TLS native-tls = { version = "0.2.0", optional = true } -tokio-rustls = { version = "0.20.0", optional = true } -tokio-native-tls = { version = "0.2.0", optional = true } +tokio-rustls = { version = "0.22.0", optional = true } +tokio-native-tls = { version = "0.3.0", optional = true } webpki-roots = { version = "0.20.0", optional = true } @@ -74,7 +76,7 @@ args = "2.0.0" env_logger = "0.7.0" futures = "0.3.0" getopts = "0.2.0" -tokio = { version = "0.3.0", features = ["rt", "rt-multi-thread", "macros", "net", "stream", "time"] } +tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros", "net", "time"] } [[example]] diff --git a/examples/tooter.rs b/examples/tooter.rs index 8c7c062..64e9fb7 100644 --- a/examples/tooter.rs +++ b/examples/tooter.rs @@ -1,4 +1,3 @@ -use futures::prelude::*; use irc::client::prelude::*; use std::time::Duration; @@ -15,10 +14,10 @@ async fn main() -> irc::error::Result<()> { let client = Client::from_config(config).await?; let sender = client.sender(); - let mut interval = tokio::time::interval(Duration::from_secs(1)).fuse(); + let mut interval = tokio::time::interval(Duration::from_secs(1)); loop { - let _ = interval.select_next_some().await; + let _ = interval.tick().await; sender.send_privmsg("#rust-spam", "AWOOOOOOOOOO")?; } } diff --git a/examples/tweeter.rs b/examples/tweeter.rs index fdbb00a..7bff261 100644 --- a/examples/tweeter.rs +++ b/examples/tweeter.rs @@ -1,6 +1,6 @@ -use futures::prelude::*; use irc::client::prelude::*; use std::time::Duration; +use tokio_stream::StreamExt as _; // NOTE: you can find an asynchronous version of this example with `IrcReactor` in `tooter.rs`. #[tokio::main] @@ -16,14 +16,14 @@ async fn main() -> irc::error::Result<()> { client.identify()?; let mut stream = client.stream()?; - let mut interval = tokio::time::interval(Duration::from_secs(10)).fuse(); + let mut interval = tokio::time::interval(Duration::from_secs(10)); loop { - futures::select! { - m = stream.select_next_some() => { + tokio::select! { + Some(m) = stream.next() => { println!("{}", m?); } - _ = interval.select_next_some() => { + _ = interval.tick() => { client.send_privmsg("#rust-spam", "TWEET TWEET")?; } } diff --git a/irc-proto/Cargo.toml b/irc-proto/Cargo.toml index 3f31e3c..2011fcf 100644 --- a/irc-proto/Cargo.toml +++ b/irc-proto/Cargo.toml @@ -20,6 +20,6 @@ default = ["bytes", "tokio", "tokio-util"] encoding = "0.2.0" thiserror = "1.0.0" -bytes = { version = "0.5.0", optional = true } -tokio = { version = "0.3.0", optional = true } -tokio-util = { version = "0.4.0", features = ["codec"], optional = true } +bytes = { version = "1.0.0", optional = true } +tokio = { version = "1.0.0", optional = true } +tokio-util = { version = "0.6.0", features = ["codec"], optional = true } diff --git a/src/client/conn.rs b/src/client/conn.rs index a23e68d..0a397f4 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -1,5 +1,6 @@ //! A module providing IRC connections for use by `IrcServer`s. use futures_util::{sink::Sink, stream::Stream}; +use pin_project::pin_project; use std::{ fmt, pin::Pin, @@ -53,14 +54,15 @@ use crate::{ }; /// An IRC connection used internally by `IrcServer`. +#[pin_project(project = ConnectionProj)] pub enum Connection { #[doc(hidden)] - Unsecured(Transport), + Unsecured(#[pin] Transport), #[doc(hidden)] #[cfg(any(feature = "tls-native", feature = "tls-rust"))] - Secured(Transport>), + Secured(#[pin] Transport>), #[doc(hidden)] - Mock(Logged), + Mock(#[pin] Logged), } impl fmt::Debug for Connection { @@ -280,12 +282,12 @@ impl Connection { impl Stream for Connection { type Item = error::Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut *self { - Connection::Unsecured(inner) => Pin::new(inner).poll_next(cx), + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + ConnectionProj::Unsecured(inner) => inner.poll_next(cx), #[cfg(any(feature = "tls-native", feature = "tls-rust"))] - Connection::Secured(inner) => Pin::new(inner).poll_next(cx), - Connection::Mock(inner) => Pin::new(inner).poll_next(cx), + ConnectionProj::Secured(inner) => inner.poll_next(cx), + ConnectionProj::Mock(inner) => inner.poll_next(cx), } } } @@ -293,39 +295,39 @@ impl Stream for Connection { impl Sink for Connection { type Error = error::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut *self { - Connection::Unsecured(inner) => Pin::new(inner).poll_ready(cx), + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + ConnectionProj::Unsecured(inner) => inner.poll_ready(cx), #[cfg(any(feature = "tls-native", feature = "tls-rust"))] - Connection::Secured(inner) => Pin::new(inner).poll_ready(cx), - Connection::Mock(inner) => Pin::new(inner).poll_ready(cx), + ConnectionProj::Secured(inner) => inner.poll_ready(cx), + ConnectionProj::Mock(inner) => inner.poll_ready(cx), } } - fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { - match &mut *self { - Connection::Unsecured(inner) => Pin::new(inner).start_send(item), + fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { + match self.project() { + ConnectionProj::Unsecured(inner) => inner.start_send(item), #[cfg(any(feature = "tls-native", feature = "tls-rust"))] - Connection::Secured(inner) => Pin::new(inner).start_send(item), - Connection::Mock(inner) => Pin::new(inner).start_send(item), + ConnectionProj::Secured(inner) => inner.start_send(item), + ConnectionProj::Mock(inner) => inner.start_send(item), } } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut *self { - Connection::Unsecured(inner) => Pin::new(inner).poll_flush(cx), + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + ConnectionProj::Unsecured(inner) => inner.poll_flush(cx), #[cfg(any(feature = "tls-native", feature = "tls-rust"))] - Connection::Secured(inner) => Pin::new(inner).poll_flush(cx), - Connection::Mock(inner) => Pin::new(inner).poll_flush(cx), + ConnectionProj::Secured(inner) => inner.poll_flush(cx), + ConnectionProj::Mock(inner) => inner.poll_flush(cx), } } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut *self { - Connection::Unsecured(inner) => Pin::new(inner).poll_close(cx), + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + ConnectionProj::Unsecured(inner) => inner.poll_close(cx), #[cfg(any(feature = "tls-native", feature = "tls-rust"))] - Connection::Secured(inner) => Pin::new(inner).poll_close(cx), - Connection::Mock(inner) => Pin::new(inner).poll_close(cx), + ConnectionProj::Secured(inner) => inner.poll_close(cx), + ConnectionProj::Mock(inner) => inner.poll_close(cx), } } } diff --git a/src/client/mod.rs b/src/client/mod.rs index d710b61..ccc6bd5 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -879,7 +879,7 @@ impl Future for Outgoing { } loop { - match this.stream.poll_next_unpin(cx) { + match this.stream.poll_recv(cx) { Poll::Ready(Some(message)) => ready!(this.try_start_send(cx, message))?, Poll::Ready(None) => { ready!(Pin::new(&mut this.sink).poll_flush(cx))?; diff --git a/src/client/transport.rs b/src/client/transport.rs index 5184e03..df39488 100644 --- a/src/client/transport.rs +++ b/src/client/transport.rs @@ -10,6 +10,7 @@ use std::{ use chrono::prelude::*; use futures_util::{future::Future, ready, sink::Sink, stream::Stream}; +use pin_project::pin_project; use tokio::sync::mpsc::UnboundedSender; use tokio::{ io::{AsyncRead, AsyncWrite}, @@ -24,6 +25,7 @@ use crate::{ }; /// Pinger-based futures helper. +#[pin_project] struct Pinger { tx: UnboundedSender, // Whether this pinger pings. @@ -31,8 +33,10 @@ struct Pinger { /// The amount of time to wait before timing out from no ping response. ping_timeout: Duration, /// The instant that the last ping was sent to the server. + #[pin] ping_deadline: Option, /// The interval at which to send pings. + #[pin] ping_interval: Interval, } @@ -52,11 +56,11 @@ impl Pinger { } /// Handle an incoming message. - fn handle_message(&mut self, message: &Message) -> error::Result<()> { + fn handle_message(self: Pin<&mut Self>, message: &Message) -> error::Result<()> { match message.command { Command::Response(Response::RPL_ENDOFMOTD, _) | Command::Response(Response::ERR_NOMOTD, _) => { - self.enabled = true; + *self.project().enabled = true; } // On receiving a `PING` message from the server, we automatically respond with // the appropriate `PONG` message to keep the connection alive for transport. @@ -67,7 +71,7 @@ impl Pinger { // last instant that the pong was received. This will prevent timeout. Command::PONG(_, None) | Command::PONG(_, Some(_)) => { log::trace!("Received PONG"); - self.ping_deadline.take(); + self.project().ping_deadline.set(None); } _ => (), } @@ -76,47 +80,47 @@ impl Pinger { } /// Send a pong. - fn send_pong(&mut self, data: &str) -> error::Result<()> { - self.tx.send(Command::PONG(data.to_owned(), None).into())?; + fn send_pong(self: Pin<&mut Self>, data: &str) -> error::Result<()> { + self.project() + .tx + .send(Command::PONG(data.to_owned(), None).into())?; Ok(()) } /// Sends a ping via the transport. - fn send_ping(&mut self) -> error::Result<()> { + fn send_ping(self: Pin<&mut Self>) -> error::Result<()> { log::trace!("Sending PING"); // Creates new ping data using the local timestamp. let data = format!("{}", Local::now().timestamp()); - self.tx.send(Command::PING(data.clone(), None).into())?; + let mut this = self.project(); + + this.tx.send(Command::PING(data.clone(), None).into())?; + + if this.ping_deadline.is_none() { + let ping_deadline = time::sleep(*this.ping_timeout); + this.ping_deadline.set(Some(ping_deadline)); + } Ok(()) } - - /// Set the ping deadline. - fn set_deadline(&mut self) { - if self.ping_deadline.is_none() { - let ping_deadline = time::sleep(self.ping_timeout); - self.ping_deadline = Some(ping_deadline); - } - } } impl Future for Pinger { type Output = Result<(), error::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Some(ping_deadline) = self.as_mut().ping_deadline.as_mut() { - match Pin::new(ping_deadline).poll(cx) { + if let Some(ping_deadline) = self.as_mut().project().ping_deadline.as_pin_mut() { + match ping_deadline.poll(cx) { Poll::Ready(()) => return Poll::Ready(Err(error::Error::PingTimeout)), Poll::Pending => (), } } - if let Poll::Ready(_) = Pin::new(&mut self.as_mut().ping_interval).poll_next(cx) { - if self.enabled { + if let Poll::Ready(_) = self.as_mut().project().ping_interval.poll_tick(cx) { + if *self.as_mut().project().enabled { self.as_mut().send_ping()?; - self.as_mut().set_deadline(); } } @@ -127,15 +131,16 @@ impl Future for Pinger { /// An IRC transport that handles core functionality for the IRC protocol. This is used in the /// implementation of `Connection` and ultimately `IrcServer`, and plays an important role in /// handling connection timeouts, message throttling, and ping response. +#[pin_project] pub struct Transport { /// The inner connection framed with an `IrcCodec`. + #[pin] inner: Framed, /// Helper for handle pinging. + #[pin] pinger: Option, } -impl Unpin for Transport where T: Unpin {} - impl Transport where T: Unpin + AsyncRead + AsyncWrite, @@ -164,21 +169,21 @@ where type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(pinger) = self.as_mut().pinger.as_mut() { - match Pin::new(pinger).poll(cx) { + if let Some(pinger) = self.as_mut().project().pinger.as_pin_mut() { + match pinger.poll(cx) { Poll::Ready(result) => result?, Poll::Pending => (), } } - let result = ready!(Pin::new(&mut self.as_mut().inner).poll_next(cx)); + let result = ready!(self.as_mut().project().inner.poll_next(cx)); let message = match result { None => return Poll::Ready(None), Some(message) => message?, }; - if let Some(pinger) = self.as_mut().pinger.as_mut() { + if let Some(pinger) = self.as_mut().project().pinger.as_pin_mut() { pinger.handle_message(&message)?; } @@ -192,24 +197,24 @@ where { type Error = error::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(Pin::new(&mut self.as_mut().inner).poll_ready(cx))?; + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.project().inner.poll_ready(cx))?; Poll::Ready(Ok(())) } - fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { log::trace!("[SEND] {}", item); - Pin::new(&mut self.as_mut().inner).start_send(item)?; + self.project().inner.start_send(item)?; Ok(()) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(Pin::new(&mut self.as_mut().inner).poll_flush(cx))?; + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.project().inner.poll_flush(cx))?; Poll::Ready(Ok(())) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(Pin::new(&mut self.as_mut().inner).poll_close(cx))?; + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.project().inner.poll_close(cx))?; Poll::Ready(Ok(())) } } @@ -235,13 +240,13 @@ impl LogView { /// A logged version of the `Transport` that records all sent and received messages. /// Note: this will introduce some performance overhead by cloning all messages. +#[pin_project] pub struct Logged { + #[pin] inner: Transport, view: LogView, } -impl Unpin for Logged where T: Unpin {} - impl Logged where T: AsyncRead + AsyncWrite, @@ -269,12 +274,14 @@ where { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match ready!(Pin::new(&mut self.as_mut().inner).poll_next(cx)) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + match ready!(this.inner.poll_next(cx)) { Some(msg) => { let msg = msg?; - self.view + this.view .received .write() .map_err(|_| error::Error::PoisonedLog)? @@ -293,18 +300,20 @@ where { type Error = error::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.as_mut().inner).poll_ready(cx) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_ready(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.as_mut().inner).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx) } - fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { - Pin::new(&mut self.as_mut().inner).start_send(item.clone())?; + fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { + let this = self.project(); - self.view + this.inner.start_send(item.clone())?; + + this.view .sent .write() .map_err(|_| error::Error::PoisonedLog)? @@ -313,7 +322,7 @@ where Ok(()) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.as_mut().inner).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) } }