Merge pull request #220 from udoprog/develop

Bump to Tokio 0.3.0 and clean up a few dependencies
This commit is contained in:
Aaron Weiss 2020-12-03 19:07:00 -05:00 committed by GitHub
commit d77696b389
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 358 additions and 358 deletions

View file

@ -36,24 +36,20 @@ yaml = ["yaml_config"]
proxy = ["tokio-socks"] proxy = ["tokio-socks"]
tls-native = ["native-tls", "tokio-tls"] tls-native = ["native-tls", "tokio-native-tls"]
tls-rust = ["tokio-rustls", "webpki-roots"] tls-rust = ["tokio-rustls", "webpki-roots"]
[dependencies] [dependencies]
bufstream = "0.1.0"
bytes = "0.5.0"
chrono = "0.4.0" chrono = "0.4.0"
encoding = "0.2.0" encoding = "0.2.0"
futures-channel = "0.3.0" futures-util = { version = "0.3.0", default-features = false, features = ["sink"] }
futures-util = { version = "0.3.0", features = ["sink"] }
irc-proto = { version = "0.14.0", path = "irc-proto" } irc-proto = { version = "0.14.0", path = "irc-proto" }
log = "0.4.0" log = "0.4.0"
parking_lot = "0.11.0" parking_lot = "0.11.0"
pin-utils = "0.1.0-alpha.4"
thiserror = "1.0.0" thiserror = "1.0.0"
tokio = { version = "0.2.0", features = ["macros", "net", "stream", "time"] } tokio = { version = "0.3.0", features = ["net", "stream", "time", "sync"] }
tokio-util = { version = "0.3.0", features = ["codec"] } tokio-util = { version = "0.4.0", features = ["codec"] }
# Feature - Config # Feature - Config
serde = { version = "1.0.0", optional = true } serde = { version = "1.0.0", optional = true }
@ -67,8 +63,8 @@ tokio-socks = { version = "0.3.0", optional = true }
# Feature - TLS # Feature - TLS
native-tls = { version = "0.2.0", optional = true } native-tls = { version = "0.2.0", optional = true }
tokio-rustls = { version = "0.14.0", optional = true } tokio-rustls = { version = "0.20.0", optional = true }
tokio-tls = { version = "0.3.0", optional = true } tokio-native-tls = { version = "0.2.0", optional = true }
webpki-roots = { version = "0.20.0", optional = true } webpki-roots = { version = "0.20.0", optional = true }
@ -78,6 +74,7 @@ args = "2.0.0"
env_logger = "0.7.0" env_logger = "0.7.0"
futures = "0.3.0" futures = "0.3.0"
getopts = "0.2.0" getopts = "0.2.0"
tokio = { version = "0.3.0", features = ["rt", "rt-multi-thread", "macros", "net", "stream", "time"] }
[[example]] [[example]]

View file

@ -21,5 +21,5 @@ encoding = "0.2.0"
thiserror = "1.0.0" thiserror = "1.0.0"
bytes = { version = "0.5.0", optional = true } bytes = { version = "0.5.0", optional = true }
tokio = { version = "0.2.0", optional = true } tokio = { version = "0.3.0", optional = true }
tokio-util = { version = "0.3.0", features = ["codec"], optional = true } tokio-util = { version = "0.4.0", features = ["codec"], optional = true }

View file

@ -1,5 +1,4 @@
//! A module providing IRC connections for use by `IrcServer`s. //! A module providing IRC connections for use by `IrcServer`s.
use futures_channel::mpsc::UnboundedSender;
use futures_util::{sink::Sink, stream::Stream}; use futures_util::{sink::Sink, stream::Stream};
use std::{ use std::{
fmt, fmt,
@ -7,7 +6,8 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_util::codec::Decoder; use tokio::sync::mpsc::UnboundedSender;
use tokio_util::codec::Framed;
#[cfg(feature = "proxy")] #[cfg(feature = "proxy")]
use tokio_socks::tcp::Socks5Stream; use tokio_socks::tcp::Socks5Stream;
@ -22,7 +22,7 @@ use std::{fs::File, io::Read};
use native_tls::{Certificate, Identity, TlsConnector}; use native_tls::{Certificate, Identity, TlsConnector};
#[cfg(feature = "tls-native")] #[cfg(feature = "tls-native")]
use tokio_tls::{self, TlsStream}; use tokio_native_tls::{self, TlsStream};
#[cfg(feature = "tls-rust")] #[cfg(feature = "tls-rust")]
use std::{ use std::{
@ -150,7 +150,7 @@ impl Connection {
tx: UnboundedSender<Message>, tx: UnboundedSender<Message>,
) -> error::Result<Transport<TcpStream>> { ) -> error::Result<Transport<TcpStream>> {
let stream = Self::new_stream(config).await?; let stream = Self::new_stream(config).await?;
let framed = IrcCodec::new(config.encoding())?.framed(stream); let framed = Framed::new(stream, IrcCodec::new(config.encoding())?);
Ok(Transport::new(&config, framed, tx)) Ok(Transport::new(&config, framed, tx))
} }
@ -184,12 +184,12 @@ impl Connection {
); );
} }
let connector: tokio_tls::TlsConnector = builder.build()?.into(); let connector: tokio_native_tls::TlsConnector = builder.build()?.into();
let domain = config.server()?; let domain = config.server()?;
let stream = Self::new_stream(config).await?; let stream = Self::new_stream(config).await?;
let stream = connector.connect(domain, stream).await?; let stream = connector.connect(domain, stream).await?;
let framed = IrcCodec::new(config.encoding())?.framed(stream); let framed = Framed::new(stream, IrcCodec::new(config.encoding())?);
Ok(Transport::new(&config, framed, tx)) Ok(Transport::new(&config, framed, tx))
} }
@ -236,7 +236,7 @@ impl Connection {
let stream = Self::new_stream(config).await?; let stream = Self::new_stream(config).await?;
let stream = connector.connect(domain, stream).await?; let stream = connector.connect(domain, stream).await?;
let framed = IrcCodec::new(config.encoding())?.framed(stream); let framed = Framed::new(stream, IrcCodec::new(config.encoding())?);
Ok(Transport::new(&config, framed, tx)) Ok(Transport::new(&config, framed, tx))
} }
@ -262,7 +262,7 @@ impl Connection {
})?; })?;
let stream = MockStream::new(&initial); let stream = MockStream::new(&initial);
let framed = IrcCodec::new(config.encoding())?.framed(stream); let framed = Framed::new(stream, IrcCodec::new(config.encoding())?);
Ok(Transport::new(&config, framed, tx)) Ok(Transport::new(&config, framed, tx))
} }

View file

@ -3,7 +3,7 @@ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
/// A fake stream for testing network applications backed by buffers. /// A fake stream for testing network applications backed by buffers.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -41,9 +41,11 @@ impl AsyncRead for MockStream {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
_: &mut Context<'_>, _: &mut Context<'_>,
buf: &mut [u8], buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<()>> {
Poll::Ready(self.as_mut().received.read(buf)) let n = self.as_mut().received.read(buf.initialize_unfilled())?;
buf.advance(n);
Poll::Ready(Ok(()))
} }
} }

View file

@ -49,7 +49,6 @@
#[cfg(feature = "ctcp")] #[cfg(feature = "ctcp")]
use chrono::prelude::*; use chrono::prelude::*;
use futures_channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures_util::{ use futures_util::{
future::{FusedFuture, Future}, future::{FusedFuture, Future},
ready, ready,
@ -68,6 +67,7 @@ use std::{
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use crate::{ use crate::{
client::{ client::{
@ -113,7 +113,11 @@ macro_rules! pub_state_base {
} }
/// Joins the specified channel or chanlist using the specified key or keylist. /// Joins the specified channel or chanlist using the specified key or keylist.
pub fn send_join_with_keys<S1, S2>(&self, chanlist: &str, keylist: &str) -> error::Result<()> pub fn send_join_with_keys<S1, S2>(
&self,
chanlist: &str,
keylist: &str,
) -> error::Result<()>
where where
S1: fmt::Display, S1: fmt::Display,
S2: fmt::Display, S2: fmt::Display,
@ -133,7 +137,7 @@ macro_rules! pub_state_base {
} }
Ok(()) Ok(())
} }
} };
} }
macro_rules! pub_sender_base { macro_rules! pub_sender_base {
@ -278,7 +282,12 @@ macro_rules! pub_sender_base {
/// Changes the mode of the target by force. /// Changes the mode of the target by force.
/// If `modeparams` is an empty string, it won't be included in the message. /// If `modeparams` is an empty string, it won't be included in the message.
pub fn send_samode<S1, S2, S3>(&self, target: S1, mode: S2, modeparams: S3) -> error::Result<()> pub fn send_samode<S1, S2, S3>(
&self,
target: S1,
mode: S2,
modeparams: S3,
) -> error::Result<()>
where where
S1: fmt::Display, S1: fmt::Display,
S2: fmt::Display, S2: fmt::Display,
@ -338,7 +347,10 @@ macro_rules! pub_sender_base {
{ {
let msg = msg.to_string(); let msg = msg.to_string();
for line in msg.split("\r\n") { for line in msg.split("\r\n") {
self.send(PRIVMSG(target.to_string(), format!("\u{001}{}\u{001}", line)))? self.send(PRIVMSG(
target.to_string(),
format!("\u{001}{}\u{001}", line),
))?
} }
Ok(()) Ok(())
} }
@ -414,7 +426,7 @@ macro_rules! pub_sender_base {
{ {
self.send_ctcp(target, "TIME") self.send_ctcp(target, "TIME")
} }
} };
} }
/// A stream of `Messages` received from an IRC server via an `Client`. /// A stream of `Messages` received from an IRC server via an `Client`.
@ -813,7 +825,7 @@ pub struct Sender {
impl Sender { impl Sender {
/// Send a single message to the unbounded queue. /// Send a single message to the unbounded queue.
pub fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()> { pub fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()> {
Ok(self.tx_outgoing.unbounded_send(msg.into())?) Ok(self.tx_outgoing.send(msg.into())?)
} }
pub_state_base!(); pub_state_base!();
@ -920,7 +932,7 @@ impl Client {
/// single, shared event loop. It can also be used to take more control over execution and error /// single, shared event loop. It can also be used to take more control over execution and error
/// handling. Connection will not occur until the event loop is run. /// handling. Connection will not occur until the event loop is run.
pub async fn from_config(config: Config) -> error::Result<Client> { pub async fn from_config(config: Config) -> error::Result<Client> {
let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); let (tx_outgoing, rx_outgoing) = mpsc::unbounded_channel();
let conn = Connection::new(&config, tx_outgoing.clone()).await?; let conn = Connection::new(&config, tx_outgoing.clone()).await?;
#[cfg(test)] #[cfg(test)]

View file

@ -9,18 +9,18 @@ use std::{
}; };
use chrono::prelude::*; use chrono::prelude::*;
use futures_channel::mpsc::UnboundedSender;
use futures_util::{future::Future, ready, sink::Sink, stream::Stream}; use futures_util::{future::Future, ready, sink::Sink, stream::Stream};
use tokio::sync::mpsc::UnboundedSender;
use tokio::{ use tokio::{
io::{AsyncRead, AsyncWrite}, io::{AsyncRead, AsyncWrite},
time::{self, Delay, Interval}, time::{self, Interval, Sleep},
}; };
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
use crate::{ use crate::{
client::data::Config, client::data::Config,
error, error,
proto::{Command, Response, IrcCodec, Message}, proto::{Command, IrcCodec, Message, Response},
}; };
/// Pinger-based futures helper. /// Pinger-based futures helper.
@ -31,7 +31,7 @@ struct Pinger {
/// The amount of time to wait before timing out from no ping response. /// The amount of time to wait before timing out from no ping response.
ping_timeout: Duration, ping_timeout: Duration,
/// The instant that the last ping was sent to the server. /// The instant that the last ping was sent to the server.
ping_deadline: Option<Delay>, ping_deadline: Option<Sleep>,
/// The interval at which to send pings. /// The interval at which to send pings.
ping_interval: Interval, ping_interval: Interval,
} }
@ -77,8 +77,7 @@ impl Pinger {
/// Send a pong. /// Send a pong.
fn send_pong(&mut self, data: &str) -> error::Result<()> { fn send_pong(&mut self, data: &str) -> error::Result<()> {
self.tx self.tx.send(Command::PONG(data.to_owned(), None).into())?;
.unbounded_send(Command::PONG(data.to_owned(), None).into())?;
Ok(()) Ok(())
} }
@ -89,8 +88,7 @@ impl Pinger {
// Creates new ping data using the local timestamp. // Creates new ping data using the local timestamp.
let data = format!("{}", Local::now().timestamp()); let data = format!("{}", Local::now().timestamp());
self.tx self.tx.send(Command::PING(data.clone(), None).into())?;
.unbounded_send(Command::PING(data.clone(), None).into())?;
Ok(()) Ok(())
} }
@ -98,7 +96,7 @@ impl Pinger {
/// Set the ping deadline. /// Set the ping deadline.
fn set_deadline(&mut self) { fn set_deadline(&mut self) {
if self.ping_deadline.is_none() { if self.ping_deadline.is_none() {
let ping_deadline = time::delay_for(self.ping_timeout); let ping_deadline = time::sleep(self.ping_timeout);
self.ping_deadline = Some(ping_deadline); self.ping_deadline = Some(ping_deadline);
} }
} }

View file

@ -3,11 +3,8 @@
use std::io::Error as IoError; use std::io::Error as IoError;
use std::sync::mpsc::RecvError; use std::sync::mpsc::RecvError;
use futures_channel::{
mpsc::{SendError, TrySendError},
oneshot::Canceled,
};
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc::error::{SendError, TrySendError};
#[cfg(feature = "tls-rust")] #[cfg(feature = "tls-rust")]
use tokio_rustls::webpki::InvalidDNSNameError; use tokio_rustls::webpki::InvalidDNSNameError;
@ -45,11 +42,11 @@ pub enum Error {
/// An internal asynchronous channel closed. /// An internal asynchronous channel closed.
#[error("an async channel closed")] #[error("an async channel closed")]
AsyncChannelClosed(#[source] SendError), AsyncChannelClosed,
/// An internal oneshot channel closed. /// An internal oneshot channel closed.
#[error("a oneshot channel closed")] #[error("a oneshot channel closed")]
OneShotCanceled(#[source] Canceled), OneShotCanceled,
/// Error for invalid configurations. /// Error for invalid configurations.
#[error("invalid config: {}", path)] #[error("invalid config: {}", path)]
@ -205,20 +202,14 @@ impl From<RecvError> for Error {
} }
} }
impl From<SendError> for Error { impl<T> From<SendError<T>> for Error {
fn from(e: SendError) -> Error { fn from(_: SendError<T>) -> Error {
Error::AsyncChannelClosed(e) Error::AsyncChannelClosed
} }
} }
impl<T> From<TrySendError<T>> for Error { impl<T> From<TrySendError<T>> for Error {
fn from(e: TrySendError<T>) -> Error { fn from(_: TrySendError<T>) -> Error {
Error::AsyncChannelClosed(e.into_send_error()) Error::AsyncChannelClosed
}
}
impl From<Canceled> for Error {
fn from(e: Canceled) -> Error {
Error::OneShotCanceled(e)
} }
} }