Merge pull request #226 from udoprog/tokio1

Upgrade to Tokio 1.0
This commit is contained in:
Aaron Weiss 2021-01-03 14:11:13 -05:00 committed by GitHub
commit 0872bf5823
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 106 additions and 94 deletions

View file

@ -48,8 +48,10 @@ irc-proto = { version = "0.14.0", path = "irc-proto" }
log = "0.4.0" log = "0.4.0"
parking_lot = "0.11.0" parking_lot = "0.11.0"
thiserror = "1.0.0" thiserror = "1.0.0"
tokio = { version = "0.3.0", features = ["net", "stream", "time", "sync"] } pin-project = "1.0.2"
tokio-util = { version = "0.4.0", features = ["codec"] } tokio = { version = "1.0.0", features = ["net", "time", "sync"] }
tokio-stream = "0.1.0"
tokio-util = { version = "0.6.0", features = ["codec"] }
# Feature - Config # Feature - Config
serde = { version = "1.0.0", optional = true } serde = { version = "1.0.0", optional = true }
@ -59,12 +61,12 @@ serde_yaml = { version = "0.8.0", optional = true }
toml = { version = "0.5.0", optional = true } toml = { version = "0.5.0", optional = true }
# Feature - Proxy # Feature - Proxy
tokio-socks = { version = "0.3.0", optional = true } tokio-socks = { version = "0.5.1", optional = true }
# Feature - TLS # Feature - TLS
native-tls = { version = "0.2.0", optional = true } native-tls = { version = "0.2.0", optional = true }
tokio-rustls = { version = "0.20.0", optional = true } tokio-rustls = { version = "0.22.0", optional = true }
tokio-native-tls = { version = "0.2.0", optional = true } tokio-native-tls = { version = "0.3.0", optional = true }
webpki-roots = { version = "0.20.0", optional = true } webpki-roots = { version = "0.20.0", optional = true }
@ -74,7 +76,7 @@ args = "2.0.0"
env_logger = "0.7.0" env_logger = "0.7.0"
futures = "0.3.0" futures = "0.3.0"
getopts = "0.2.0" getopts = "0.2.0"
tokio = { version = "0.3.0", features = ["rt", "rt-multi-thread", "macros", "net", "stream", "time"] } tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros", "net", "time"] }
[[example]] [[example]]

View file

@ -1,4 +1,3 @@
use futures::prelude::*;
use irc::client::prelude::*; use irc::client::prelude::*;
use std::time::Duration; use std::time::Duration;
@ -15,10 +14,10 @@ async fn main() -> irc::error::Result<()> {
let client = Client::from_config(config).await?; let client = Client::from_config(config).await?;
let sender = client.sender(); let sender = client.sender();
let mut interval = tokio::time::interval(Duration::from_secs(1)).fuse(); let mut interval = tokio::time::interval(Duration::from_secs(1));
loop { loop {
let _ = interval.select_next_some().await; let _ = interval.tick().await;
sender.send_privmsg("#rust-spam", "AWOOOOOOOOOO")?; sender.send_privmsg("#rust-spam", "AWOOOOOOOOOO")?;
} }
} }

View file

@ -1,6 +1,6 @@
use futures::prelude::*;
use irc::client::prelude::*; use irc::client::prelude::*;
use std::time::Duration; use std::time::Duration;
use tokio_stream::StreamExt as _;
// NOTE: you can find an asynchronous version of this example with `IrcReactor` in `tooter.rs`. // NOTE: you can find an asynchronous version of this example with `IrcReactor` in `tooter.rs`.
#[tokio::main] #[tokio::main]
@ -16,14 +16,14 @@ async fn main() -> irc::error::Result<()> {
client.identify()?; client.identify()?;
let mut stream = client.stream()?; let mut stream = client.stream()?;
let mut interval = tokio::time::interval(Duration::from_secs(10)).fuse(); let mut interval = tokio::time::interval(Duration::from_secs(10));
loop { loop {
futures::select! { tokio::select! {
m = stream.select_next_some() => { Some(m) = stream.next() => {
println!("{}", m?); println!("{}", m?);
} }
_ = interval.select_next_some() => { _ = interval.tick() => {
client.send_privmsg("#rust-spam", "TWEET TWEET")?; client.send_privmsg("#rust-spam", "TWEET TWEET")?;
} }
} }

View file

@ -20,6 +20,6 @@ default = ["bytes", "tokio", "tokio-util"]
encoding = "0.2.0" encoding = "0.2.0"
thiserror = "1.0.0" thiserror = "1.0.0"
bytes = { version = "0.5.0", optional = true } bytes = { version = "1.0.0", optional = true }
tokio = { version = "0.3.0", optional = true } tokio = { version = "1.0.0", optional = true }
tokio-util = { version = "0.4.0", features = ["codec"], optional = true } tokio-util = { version = "0.6.0", features = ["codec"], optional = true }

View file

@ -1,5 +1,6 @@
//! A module providing IRC connections for use by `IrcServer`s. //! A module providing IRC connections for use by `IrcServer`s.
use futures_util::{sink::Sink, stream::Stream}; use futures_util::{sink::Sink, stream::Stream};
use pin_project::pin_project;
use std::{ use std::{
fmt, fmt,
pin::Pin, pin::Pin,
@ -53,14 +54,15 @@ use crate::{
}; };
/// An IRC connection used internally by `IrcServer`. /// An IRC connection used internally by `IrcServer`.
#[pin_project(project = ConnectionProj)]
pub enum Connection { pub enum Connection {
#[doc(hidden)] #[doc(hidden)]
Unsecured(Transport<TcpStream>), Unsecured(#[pin] Transport<TcpStream>),
#[doc(hidden)] #[doc(hidden)]
#[cfg(any(feature = "tls-native", feature = "tls-rust"))] #[cfg(any(feature = "tls-native", feature = "tls-rust"))]
Secured(Transport<TlsStream<TcpStream>>), Secured(#[pin] Transport<TlsStream<TcpStream>>),
#[doc(hidden)] #[doc(hidden)]
Mock(Logged<MockStream>), Mock(#[pin] Logged<MockStream>),
} }
impl fmt::Debug for Connection { impl fmt::Debug for Connection {
@ -280,12 +282,12 @@ impl Connection {
impl Stream for Connection { impl Stream for Connection {
type Item = error::Result<Message>; type Item = error::Result<Message>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut *self { match self.project() {
Connection::Unsecured(inner) => Pin::new(inner).poll_next(cx), ConnectionProj::Unsecured(inner) => inner.poll_next(cx),
#[cfg(any(feature = "tls-native", feature = "tls-rust"))] #[cfg(any(feature = "tls-native", feature = "tls-rust"))]
Connection::Secured(inner) => Pin::new(inner).poll_next(cx), ConnectionProj::Secured(inner) => inner.poll_next(cx),
Connection::Mock(inner) => Pin::new(inner).poll_next(cx), ConnectionProj::Mock(inner) => inner.poll_next(cx),
} }
} }
} }
@ -293,39 +295,39 @@ impl Stream for Connection {
impl Sink<Message> for Connection { impl Sink<Message> for Connection {
type Error = error::Error; type Error = error::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self { match self.project() {
Connection::Unsecured(inner) => Pin::new(inner).poll_ready(cx), ConnectionProj::Unsecured(inner) => inner.poll_ready(cx),
#[cfg(any(feature = "tls-native", feature = "tls-rust"))] #[cfg(any(feature = "tls-native", feature = "tls-rust"))]
Connection::Secured(inner) => Pin::new(inner).poll_ready(cx), ConnectionProj::Secured(inner) => inner.poll_ready(cx),
Connection::Mock(inner) => Pin::new(inner).poll_ready(cx), ConnectionProj::Mock(inner) => inner.poll_ready(cx),
} }
} }
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
match &mut *self { match self.project() {
Connection::Unsecured(inner) => Pin::new(inner).start_send(item), ConnectionProj::Unsecured(inner) => inner.start_send(item),
#[cfg(any(feature = "tls-native", feature = "tls-rust"))] #[cfg(any(feature = "tls-native", feature = "tls-rust"))]
Connection::Secured(inner) => Pin::new(inner).start_send(item), ConnectionProj::Secured(inner) => inner.start_send(item),
Connection::Mock(inner) => Pin::new(inner).start_send(item), ConnectionProj::Mock(inner) => inner.start_send(item),
} }
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self { match self.project() {
Connection::Unsecured(inner) => Pin::new(inner).poll_flush(cx), ConnectionProj::Unsecured(inner) => inner.poll_flush(cx),
#[cfg(any(feature = "tls-native", feature = "tls-rust"))] #[cfg(any(feature = "tls-native", feature = "tls-rust"))]
Connection::Secured(inner) => Pin::new(inner).poll_flush(cx), ConnectionProj::Secured(inner) => inner.poll_flush(cx),
Connection::Mock(inner) => Pin::new(inner).poll_flush(cx), ConnectionProj::Mock(inner) => inner.poll_flush(cx),
} }
} }
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self { match self.project() {
Connection::Unsecured(inner) => Pin::new(inner).poll_close(cx), ConnectionProj::Unsecured(inner) => inner.poll_close(cx),
#[cfg(any(feature = "tls-native", feature = "tls-rust"))] #[cfg(any(feature = "tls-native", feature = "tls-rust"))]
Connection::Secured(inner) => Pin::new(inner).poll_close(cx), ConnectionProj::Secured(inner) => inner.poll_close(cx),
Connection::Mock(inner) => Pin::new(inner).poll_close(cx), ConnectionProj::Mock(inner) => inner.poll_close(cx),
} }
} }
} }

View file

@ -879,7 +879,7 @@ impl Future for Outgoing {
} }
loop { loop {
match this.stream.poll_next_unpin(cx) { match this.stream.poll_recv(cx) {
Poll::Ready(Some(message)) => ready!(this.try_start_send(cx, message))?, Poll::Ready(Some(message)) => ready!(this.try_start_send(cx, message))?,
Poll::Ready(None) => { Poll::Ready(None) => {
ready!(Pin::new(&mut this.sink).poll_flush(cx))?; ready!(Pin::new(&mut this.sink).poll_flush(cx))?;

View file

@ -10,6 +10,7 @@ use std::{
use chrono::prelude::*; use chrono::prelude::*;
use futures_util::{future::Future, ready, sink::Sink, stream::Stream}; use futures_util::{future::Future, ready, sink::Sink, stream::Stream};
use pin_project::pin_project;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::{ use tokio::{
io::{AsyncRead, AsyncWrite}, io::{AsyncRead, AsyncWrite},
@ -24,6 +25,7 @@ use crate::{
}; };
/// Pinger-based futures helper. /// Pinger-based futures helper.
#[pin_project]
struct Pinger { struct Pinger {
tx: UnboundedSender<Message>, tx: UnboundedSender<Message>,
// Whether this pinger pings. // Whether this pinger pings.
@ -31,8 +33,10 @@ struct Pinger {
/// The amount of time to wait before timing out from no ping response. /// The amount of time to wait before timing out from no ping response.
ping_timeout: Duration, ping_timeout: Duration,
/// The instant that the last ping was sent to the server. /// The instant that the last ping was sent to the server.
#[pin]
ping_deadline: Option<Sleep>, ping_deadline: Option<Sleep>,
/// The interval at which to send pings. /// The interval at which to send pings.
#[pin]
ping_interval: Interval, ping_interval: Interval,
} }
@ -52,11 +56,11 @@ impl Pinger {
} }
/// Handle an incoming message. /// Handle an incoming message.
fn handle_message(&mut self, message: &Message) -> error::Result<()> { fn handle_message(self: Pin<&mut Self>, message: &Message) -> error::Result<()> {
match message.command { match message.command {
Command::Response(Response::RPL_ENDOFMOTD, _) Command::Response(Response::RPL_ENDOFMOTD, _)
| Command::Response(Response::ERR_NOMOTD, _) => { | Command::Response(Response::ERR_NOMOTD, _) => {
self.enabled = true; *self.project().enabled = true;
} }
// On receiving a `PING` message from the server, we automatically respond with // On receiving a `PING` message from the server, we automatically respond with
// the appropriate `PONG` message to keep the connection alive for transport. // the appropriate `PONG` message to keep the connection alive for transport.
@ -67,7 +71,7 @@ impl Pinger {
// last instant that the pong was received. This will prevent timeout. // last instant that the pong was received. This will prevent timeout.
Command::PONG(_, None) | Command::PONG(_, Some(_)) => { Command::PONG(_, None) | Command::PONG(_, Some(_)) => {
log::trace!("Received PONG"); log::trace!("Received PONG");
self.ping_deadline.take(); self.project().ping_deadline.set(None);
} }
_ => (), _ => (),
} }
@ -76,47 +80,47 @@ impl Pinger {
} }
/// Send a pong. /// Send a pong.
fn send_pong(&mut self, data: &str) -> error::Result<()> { fn send_pong(self: Pin<&mut Self>, data: &str) -> error::Result<()> {
self.tx.send(Command::PONG(data.to_owned(), None).into())?; self.project()
.tx
.send(Command::PONG(data.to_owned(), None).into())?;
Ok(()) Ok(())
} }
/// Sends a ping via the transport. /// Sends a ping via the transport.
fn send_ping(&mut self) -> error::Result<()> { fn send_ping(self: Pin<&mut Self>) -> error::Result<()> {
log::trace!("Sending PING"); log::trace!("Sending PING");
// Creates new ping data using the local timestamp. // Creates new ping data using the local timestamp.
let data = format!("{}", Local::now().timestamp()); let data = format!("{}", Local::now().timestamp());
self.tx.send(Command::PING(data.clone(), None).into())?; let mut this = self.project();
this.tx.send(Command::PING(data.clone(), None).into())?;
if this.ping_deadline.is_none() {
let ping_deadline = time::sleep(*this.ping_timeout);
this.ping_deadline.set(Some(ping_deadline));
}
Ok(()) Ok(())
} }
/// Set the ping deadline.
fn set_deadline(&mut self) {
if self.ping_deadline.is_none() {
let ping_deadline = time::sleep(self.ping_timeout);
self.ping_deadline = Some(ping_deadline);
}
}
} }
impl Future for Pinger { impl Future for Pinger {
type Output = Result<(), error::Error>; type Output = Result<(), error::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ping_deadline) = self.as_mut().ping_deadline.as_mut() { if let Some(ping_deadline) = self.as_mut().project().ping_deadline.as_pin_mut() {
match Pin::new(ping_deadline).poll(cx) { match ping_deadline.poll(cx) {
Poll::Ready(()) => return Poll::Ready(Err(error::Error::PingTimeout)), Poll::Ready(()) => return Poll::Ready(Err(error::Error::PingTimeout)),
Poll::Pending => (), Poll::Pending => (),
} }
} }
if let Poll::Ready(_) = Pin::new(&mut self.as_mut().ping_interval).poll_next(cx) { if let Poll::Ready(_) = self.as_mut().project().ping_interval.poll_tick(cx) {
if self.enabled { if *self.as_mut().project().enabled {
self.as_mut().send_ping()?; self.as_mut().send_ping()?;
self.as_mut().set_deadline();
} }
} }
@ -127,15 +131,16 @@ impl Future for Pinger {
/// An IRC transport that handles core functionality for the IRC protocol. This is used in the /// An IRC transport that handles core functionality for the IRC protocol. This is used in the
/// implementation of `Connection` and ultimately `IrcServer`, and plays an important role in /// implementation of `Connection` and ultimately `IrcServer`, and plays an important role in
/// handling connection timeouts, message throttling, and ping response. /// handling connection timeouts, message throttling, and ping response.
#[pin_project]
pub struct Transport<T> { pub struct Transport<T> {
/// The inner connection framed with an `IrcCodec`. /// The inner connection framed with an `IrcCodec`.
#[pin]
inner: Framed<T, IrcCodec>, inner: Framed<T, IrcCodec>,
/// Helper for handle pinging. /// Helper for handle pinging.
#[pin]
pinger: Option<Pinger>, pinger: Option<Pinger>,
} }
impl<T> Unpin for Transport<T> where T: Unpin {}
impl<T> Transport<T> impl<T> Transport<T>
where where
T: Unpin + AsyncRead + AsyncWrite, T: Unpin + AsyncRead + AsyncWrite,
@ -164,21 +169,21 @@ where
type Item = Result<Message, error::Error>; type Item = Result<Message, error::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(pinger) = self.as_mut().pinger.as_mut() { if let Some(pinger) = self.as_mut().project().pinger.as_pin_mut() {
match Pin::new(pinger).poll(cx) { match pinger.poll(cx) {
Poll::Ready(result) => result?, Poll::Ready(result) => result?,
Poll::Pending => (), Poll::Pending => (),
} }
} }
let result = ready!(Pin::new(&mut self.as_mut().inner).poll_next(cx)); let result = ready!(self.as_mut().project().inner.poll_next(cx));
let message = match result { let message = match result {
None => return Poll::Ready(None), None => return Poll::Ready(None),
Some(message) => message?, Some(message) => message?,
}; };
if let Some(pinger) = self.as_mut().pinger.as_mut() { if let Some(pinger) = self.as_mut().project().pinger.as_pin_mut() {
pinger.handle_message(&message)?; pinger.handle_message(&message)?;
} }
@ -192,24 +197,24 @@ where
{ {
type Error = error::Error; type Error = error::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(Pin::new(&mut self.as_mut().inner).poll_ready(cx))?; ready!(self.project().inner.poll_ready(cx))?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
log::trace!("[SEND] {}", item); log::trace!("[SEND] {}", item);
Pin::new(&mut self.as_mut().inner).start_send(item)?; self.project().inner.start_send(item)?;
Ok(()) Ok(())
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(Pin::new(&mut self.as_mut().inner).poll_flush(cx))?; ready!(self.project().inner.poll_flush(cx))?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(Pin::new(&mut self.as_mut().inner).poll_close(cx))?; ready!(self.project().inner.poll_close(cx))?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
} }
@ -235,13 +240,13 @@ impl LogView {
/// A logged version of the `Transport` that records all sent and received messages. /// A logged version of the `Transport` that records all sent and received messages.
/// Note: this will introduce some performance overhead by cloning all messages. /// Note: this will introduce some performance overhead by cloning all messages.
#[pin_project]
pub struct Logged<T> { pub struct Logged<T> {
#[pin]
inner: Transport<T>, inner: Transport<T>,
view: LogView, view: LogView,
} }
impl<T> Unpin for Logged<T> where T: Unpin {}
impl<T> Logged<T> impl<T> Logged<T>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
@ -269,12 +274,14 @@ where
{ {
type Item = Result<Message, error::Error>; type Item = Result<Message, error::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(Pin::new(&mut self.as_mut().inner).poll_next(cx)) { let this = self.project();
match ready!(this.inner.poll_next(cx)) {
Some(msg) => { Some(msg) => {
let msg = msg?; let msg = msg?;
self.view this.view
.received .received
.write() .write()
.map_err(|_| error::Error::PoisonedLog)? .map_err(|_| error::Error::PoisonedLog)?
@ -293,18 +300,20 @@ where
{ {
type Error = error::Error; type Error = error::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.as_mut().inner).poll_ready(cx) self.project().inner.poll_ready(cx)
} }
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.as_mut().inner).poll_close(cx) self.project().inner.poll_close(cx)
} }
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
Pin::new(&mut self.as_mut().inner).start_send(item.clone())?; let this = self.project();
self.view this.inner.start_send(item.clone())?;
this.view
.sent .sent
.write() .write()
.map_err(|_| error::Error::PoisonedLog)? .map_err(|_| error::Error::PoisonedLog)?
@ -313,7 +322,7 @@ where
Ok(()) Ok(())
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.as_mut().inner).poll_flush(cx) self.project().inner.poll_flush(cx)
} }
} }