From 56e584fe76f788c3430ef1e56a4c795d03946e25 Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Wed, 2 Dec 2020 08:01:05 +0100 Subject: [PATCH 1/4] Run rustfmt --- src/client/mod.rs | 622 ++++++++++++++++++++-------------------- src/client/transport.rs | 2 +- 2 files changed, 318 insertions(+), 306 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index d3f9dc8..702faaf 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -95,326 +95,338 @@ pub mod transport; macro_rules! pub_state_base { () => { - /// Changes the modes for the specified target. - pub fn send_mode(&self, target: S, modes: &[Mode]) -> error::Result<()> - where - S: fmt::Display, - T: ModeType, - { - self.send(T::mode(&target.to_string(), modes)) - } - - /// Joins the specified channel or chanlist. - pub fn send_join(&self, chanlist: S) -> error::Result<()> - where - S: fmt::Display, - { - self.send(JOIN(chanlist.to_string(), None, None)) - } - - /// Joins the specified channel or chanlist using the specified key or keylist. - pub fn send_join_with_keys(&self, chanlist: &str, keylist: &str) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - self.send(JOIN(chanlist.to_string(), Some(keylist.to_string()), None)) - } - - /// Sends a notice to the specified target. - pub fn send_notice(&self, target: S1, message: S2) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - let message = message.to_string(); - for line in message.split("\r\n") { - self.send(NOTICE(target.to_string(), line.to_string()))? + /// Changes the modes for the specified target. + pub fn send_mode(&self, target: S, modes: &[Mode]) -> error::Result<()> + where + S: fmt::Display, + T: ModeType, + { + self.send(T::mode(&target.to_string(), modes)) } - Ok(()) - } - } + + /// Joins the specified channel or chanlist. + pub fn send_join(&self, chanlist: S) -> error::Result<()> + where + S: fmt::Display, + { + self.send(JOIN(chanlist.to_string(), None, None)) + } + + /// Joins the specified channel or chanlist using the specified key or keylist. + pub fn send_join_with_keys( + &self, + chanlist: &str, + keylist: &str, + ) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + self.send(JOIN(chanlist.to_string(), Some(keylist.to_string()), None)) + } + + /// Sends a notice to the specified target. + pub fn send_notice(&self, target: S1, message: S2) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + let message = message.to_string(); + for line in message.split("\r\n") { + self.send(NOTICE(target.to_string(), line.to_string()))? + } + Ok(()) + } + }; } macro_rules! pub_sender_base { () => { - /// Sends a request for a list of server capabilities for a specific IRCv3 version. - pub fn send_cap_ls(&self, version: NegotiationVersion) -> error::Result<()> { - self.send(Command::CAP( - None, - LS, - match version { - NegotiationVersion::V301 => None, - NegotiationVersion::V302 => Some("302".to_owned()), - }, - None, - )) - } - - /// Sends an IRCv3 capabilities request for the specified extensions. - pub fn send_cap_req(&self, extensions: &[Capability]) -> error::Result<()> { - let append = |mut s: String, c| { - s.push_str(c); - s.push(' '); - s - }; - let mut exts = extensions - .iter() - .map(|c| c.as_ref()) - .fold(String::new(), append); - let len = exts.len() - 1; - exts.truncate(len); - self.send(CAP(None, REQ, None, Some(exts))) - } - - /// Sends a SASL AUTHENTICATE message with the specified data. - pub fn send_sasl(&self, data: S) -> error::Result<()> { - self.send(AUTHENTICATE(data.to_string())) - } - - /// Sends a SASL AUTHENTICATE request to use the PLAIN mechanism. - pub fn send_sasl_plain(&self) -> error::Result<()> { - self.send_sasl("PLAIN") - } - - /// Sends a SASL AUTHENTICATE request to use the EXTERNAL mechanism. - pub fn send_sasl_external(&self) -> error::Result<()> { - self.send_sasl("EXTERNAL") - } - - /// Sends a SASL AUTHENTICATE request to abort authentication. - pub fn send_sasl_abort(&self) -> error::Result<()> { - self.send_sasl("*") - } - - /// Sends a PONG with the specified message. - pub fn send_pong(&self, msg: S) -> error::Result<()> - where - S: fmt::Display, - { - self.send(PONG(msg.to_string(), None)) - } - - /// Parts the specified channel or chanlist. - pub fn send_part(&self, chanlist: S) -> error::Result<()> - where - S: fmt::Display, - { - self.send(PART(chanlist.to_string(), None)) - } - - /// Attempts to oper up using the specified username and password. - pub fn send_oper(&self, username: S1, password: S2) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - self.send(OPER(username.to_string(), password.to_string())) - } - - /// Sends a message to the specified target. If the message contains IRC newlines (`\r\n`), it - /// will automatically be split and sent as multiple separate `PRIVMSG`s to the specified - /// target. If you absolutely must avoid this behavior, you can do - /// `client.send(PRIVMSG(target, message))` directly. - pub fn send_privmsg(&self, target: S1, message: S2) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - let message = message.to_string(); - for line in message.split("\r\n") { - self.send(PRIVMSG(target.to_string(), line.to_string()))? + /// Sends a request for a list of server capabilities for a specific IRCv3 version. + pub fn send_cap_ls(&self, version: NegotiationVersion) -> error::Result<()> { + self.send(Command::CAP( + None, + LS, + match version { + NegotiationVersion::V301 => None, + NegotiationVersion::V302 => Some("302".to_owned()), + }, + None, + )) } - Ok(()) - } - /// Sets the topic of a channel or requests the current one. - /// If `topic` is an empty string, it won't be included in the message. - pub fn send_topic(&self, channel: S1, topic: S2) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - let topic = topic.to_string(); - self.send(TOPIC( - channel.to_string(), - if topic.is_empty() { None } else { Some(topic) }, - )) - } - - /// Kills the target with the provided message. - pub fn send_kill(&self, target: S1, message: S2) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - self.send(KILL(target.to_string(), message.to_string())) - } - - /// Kicks the listed nicknames from the listed channels with a comment. - /// If `message` is an empty string, it won't be included in the message. - pub fn send_kick( - &self, - chanlist: S1, - nicklist: S2, - message: S3, - ) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - S3: fmt::Display, - { - let message = message.to_string(); - self.send(KICK( - chanlist.to_string(), - nicklist.to_string(), - if message.is_empty() { - None - } else { - Some(message) - }, - )) - } - - /// Changes the mode of the target by force. - /// If `modeparams` is an empty string, it won't be included in the message. - pub fn send_samode(&self, target: S1, mode: S2, modeparams: S3) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - S3: fmt::Display, - { - let modeparams = modeparams.to_string(); - self.send(SAMODE( - target.to_string(), - mode.to_string(), - if modeparams.is_empty() { - None - } else { - Some(modeparams) - }, - )) - } - - /// Forces a user to change from the old nickname to the new nickname. - pub fn send_sanick(&self, old_nick: S1, new_nick: S2) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - self.send(SANICK(old_nick.to_string(), new_nick.to_string())) - } - - /// Invites a user to the specified channel. - pub fn send_invite(&self, nick: S1, chan: S2) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - self.send(INVITE(nick.to_string(), chan.to_string())) - } - - /// Quits the server entirely with a message. - /// This defaults to `Powered by Rust.` if none is specified. - pub fn send_quit(&self, msg: S) -> error::Result<()> - where - S: fmt::Display, - { - let msg = msg.to_string(); - self.send(QUIT(Some(if msg.is_empty() { - "Powered by Rust.".to_string() - } else { - msg - }))) - } - - /// Sends a CTCP-escaped message to the specified target. - /// This requires the CTCP feature to be enabled. - #[cfg(feature = "ctcp")] - pub fn send_ctcp(&self, target: S1, msg: S2) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - let msg = msg.to_string(); - for line in msg.split("\r\n") { - self.send(PRIVMSG(target.to_string(), format!("\u{001}{}\u{001}", line)))? + /// Sends an IRCv3 capabilities request for the specified extensions. + pub fn send_cap_req(&self, extensions: &[Capability]) -> error::Result<()> { + let append = |mut s: String, c| { + s.push_str(c); + s.push(' '); + s + }; + let mut exts = extensions + .iter() + .map(|c| c.as_ref()) + .fold(String::new(), append); + let len = exts.len() - 1; + exts.truncate(len); + self.send(CAP(None, REQ, None, Some(exts))) } - Ok(()) - } - /// Sends an action command to the specified target. - /// This requires the CTCP feature to be enabled. - #[cfg(feature = "ctcp")] - pub fn send_action(&self, target: S1, msg: S2) -> error::Result<()> - where - S1: fmt::Display, - S2: fmt::Display, - { - self.send_ctcp(target, &format!("ACTION {}", msg.to_string())[..]) - } + /// Sends a SASL AUTHENTICATE message with the specified data. + pub fn send_sasl(&self, data: S) -> error::Result<()> { + self.send(AUTHENTICATE(data.to_string())) + } - /// Sends a finger request to the specified target. - /// This requires the CTCP feature to be enabled. - #[cfg(feature = "ctcp")] - pub fn send_finger(&self, target: S) -> error::Result<()> - where - S: fmt::Display, - { - self.send_ctcp(target, "FINGER") - } + /// Sends a SASL AUTHENTICATE request to use the PLAIN mechanism. + pub fn send_sasl_plain(&self) -> error::Result<()> { + self.send_sasl("PLAIN") + } - /// Sends a version request to the specified target. - /// This requires the CTCP feature to be enabled. - #[cfg(feature = "ctcp")] - pub fn send_version(&self, target: S) -> error::Result<()> - where - S: fmt::Display, - { - self.send_ctcp(target, "VERSION") - } + /// Sends a SASL AUTHENTICATE request to use the EXTERNAL mechanism. + pub fn send_sasl_external(&self) -> error::Result<()> { + self.send_sasl("EXTERNAL") + } - /// Sends a source request to the specified target. - /// This requires the CTCP feature to be enabled. - #[cfg(feature = "ctcp")] - pub fn send_source(&self, target: S) -> error::Result<()> - where - S: fmt::Display, - { - self.send_ctcp(target, "SOURCE") - } + /// Sends a SASL AUTHENTICATE request to abort authentication. + pub fn send_sasl_abort(&self) -> error::Result<()> { + self.send_sasl("*") + } - /// Sends a user info request to the specified target. - /// This requires the CTCP feature to be enabled. - #[cfg(feature = "ctcp")] - pub fn send_user_info(&self, target: S) -> error::Result<()> - where - S: fmt::Display, - { - self.send_ctcp(target, "USERINFO") - } + /// Sends a PONG with the specified message. + pub fn send_pong(&self, msg: S) -> error::Result<()> + where + S: fmt::Display, + { + self.send(PONG(msg.to_string(), None)) + } - /// Sends a finger request to the specified target. - /// This requires the CTCP feature to be enabled. - #[cfg(feature = "ctcp")] - pub fn send_ctcp_ping(&self, target: S) -> error::Result<()> - where - S: fmt::Display, - { - let time = Local::now(); - self.send_ctcp(target, &format!("PING {}", time.timestamp())[..]) - } + /// Parts the specified channel or chanlist. + pub fn send_part(&self, chanlist: S) -> error::Result<()> + where + S: fmt::Display, + { + self.send(PART(chanlist.to_string(), None)) + } - /// Sends a time request to the specified target. - /// This requires the CTCP feature to be enabled. - #[cfg(feature = "ctcp")] - pub fn send_time(&self, target: S) -> error::Result<()> - where - S: fmt::Display, - { - self.send_ctcp(target, "TIME") - } - } + /// Attempts to oper up using the specified username and password. + pub fn send_oper(&self, username: S1, password: S2) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + self.send(OPER(username.to_string(), password.to_string())) + } + + /// Sends a message to the specified target. If the message contains IRC newlines (`\r\n`), it + /// will automatically be split and sent as multiple separate `PRIVMSG`s to the specified + /// target. If you absolutely must avoid this behavior, you can do + /// `client.send(PRIVMSG(target, message))` directly. + pub fn send_privmsg(&self, target: S1, message: S2) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + let message = message.to_string(); + for line in message.split("\r\n") { + self.send(PRIVMSG(target.to_string(), line.to_string()))? + } + Ok(()) + } + + /// Sets the topic of a channel or requests the current one. + /// If `topic` is an empty string, it won't be included in the message. + pub fn send_topic(&self, channel: S1, topic: S2) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + let topic = topic.to_string(); + self.send(TOPIC( + channel.to_string(), + if topic.is_empty() { None } else { Some(topic) }, + )) + } + + /// Kills the target with the provided message. + pub fn send_kill(&self, target: S1, message: S2) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + self.send(KILL(target.to_string(), message.to_string())) + } + + /// Kicks the listed nicknames from the listed channels with a comment. + /// If `message` is an empty string, it won't be included in the message. + pub fn send_kick( + &self, + chanlist: S1, + nicklist: S2, + message: S3, + ) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + S3: fmt::Display, + { + let message = message.to_string(); + self.send(KICK( + chanlist.to_string(), + nicklist.to_string(), + if message.is_empty() { + None + } else { + Some(message) + }, + )) + } + + /// Changes the mode of the target by force. + /// If `modeparams` is an empty string, it won't be included in the message. + pub fn send_samode( + &self, + target: S1, + mode: S2, + modeparams: S3, + ) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + S3: fmt::Display, + { + let modeparams = modeparams.to_string(); + self.send(SAMODE( + target.to_string(), + mode.to_string(), + if modeparams.is_empty() { + None + } else { + Some(modeparams) + }, + )) + } + + /// Forces a user to change from the old nickname to the new nickname. + pub fn send_sanick(&self, old_nick: S1, new_nick: S2) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + self.send(SANICK(old_nick.to_string(), new_nick.to_string())) + } + + /// Invites a user to the specified channel. + pub fn send_invite(&self, nick: S1, chan: S2) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + self.send(INVITE(nick.to_string(), chan.to_string())) + } + + /// Quits the server entirely with a message. + /// This defaults to `Powered by Rust.` if none is specified. + pub fn send_quit(&self, msg: S) -> error::Result<()> + where + S: fmt::Display, + { + let msg = msg.to_string(); + self.send(QUIT(Some(if msg.is_empty() { + "Powered by Rust.".to_string() + } else { + msg + }))) + } + + /// Sends a CTCP-escaped message to the specified target. + /// This requires the CTCP feature to be enabled. + #[cfg(feature = "ctcp")] + pub fn send_ctcp(&self, target: S1, msg: S2) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + let msg = msg.to_string(); + for line in msg.split("\r\n") { + self.send(PRIVMSG( + target.to_string(), + format!("\u{001}{}\u{001}", line), + ))? + } + Ok(()) + } + + /// Sends an action command to the specified target. + /// This requires the CTCP feature to be enabled. + #[cfg(feature = "ctcp")] + pub fn send_action(&self, target: S1, msg: S2) -> error::Result<()> + where + S1: fmt::Display, + S2: fmt::Display, + { + self.send_ctcp(target, &format!("ACTION {}", msg.to_string())[..]) + } + + /// Sends a finger request to the specified target. + /// This requires the CTCP feature to be enabled. + #[cfg(feature = "ctcp")] + pub fn send_finger(&self, target: S) -> error::Result<()> + where + S: fmt::Display, + { + self.send_ctcp(target, "FINGER") + } + + /// Sends a version request to the specified target. + /// This requires the CTCP feature to be enabled. + #[cfg(feature = "ctcp")] + pub fn send_version(&self, target: S) -> error::Result<()> + where + S: fmt::Display, + { + self.send_ctcp(target, "VERSION") + } + + /// Sends a source request to the specified target. + /// This requires the CTCP feature to be enabled. + #[cfg(feature = "ctcp")] + pub fn send_source(&self, target: S) -> error::Result<()> + where + S: fmt::Display, + { + self.send_ctcp(target, "SOURCE") + } + + /// Sends a user info request to the specified target. + /// This requires the CTCP feature to be enabled. + #[cfg(feature = "ctcp")] + pub fn send_user_info(&self, target: S) -> error::Result<()> + where + S: fmt::Display, + { + self.send_ctcp(target, "USERINFO") + } + + /// Sends a finger request to the specified target. + /// This requires the CTCP feature to be enabled. + #[cfg(feature = "ctcp")] + pub fn send_ctcp_ping(&self, target: S) -> error::Result<()> + where + S: fmt::Display, + { + let time = Local::now(); + self.send_ctcp(target, &format!("PING {}", time.timestamp())[..]) + } + + /// Sends a time request to the specified target. + /// This requires the CTCP feature to be enabled. + #[cfg(feature = "ctcp")] + pub fn send_time(&self, target: S) -> error::Result<()> + where + S: fmt::Display, + { + self.send_ctcp(target, "TIME") + } + }; } /// A stream of `Messages` received from an IRC server via an `Client`. diff --git a/src/client/transport.rs b/src/client/transport.rs index aac776e..1d10ffa 100644 --- a/src/client/transport.rs +++ b/src/client/transport.rs @@ -20,7 +20,7 @@ use tokio_util::codec::Framed; use crate::{ client::data::Config, error, - proto::{Command, Response, IrcCodec, Message}, + proto::{Command, IrcCodec, Message, Response}, }; /// Pinger-based futures helper. From 43c8b1cb63969301f63e50f480dd2138639491a9 Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Wed, 28 Oct 2020 03:26:31 +0100 Subject: [PATCH 2/4] Bump to Tokio 0.3.0 --- Cargo.toml | 11 ++++++----- irc-proto/Cargo.toml | 4 ++-- src/client/conn.rs | 14 +++++++------- src/client/mock.rs | 10 ++++++---- src/client/transport.rs | 6 +++--- 5 files changed, 24 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 63db804..1adcbb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ yaml = ["yaml_config"] proxy = ["tokio-socks"] -tls-native = ["native-tls", "tokio-tls"] +tls-native = ["native-tls", "tokio-native-tls"] tls-rust = ["tokio-rustls", "webpki-roots"] @@ -52,8 +52,8 @@ log = "0.4.0" parking_lot = "0.11.0" pin-utils = "0.1.0-alpha.4" thiserror = "1.0.0" -tokio = { version = "0.2.0", features = ["macros", "net", "stream", "time"] } -tokio-util = { version = "0.3.0", features = ["codec"] } +tokio = { version = "0.3.0", features = ["net", "stream", "time"] } +tokio-util = { version = "0.4.0", features = ["codec"] } # Feature - Config serde = { version = "1.0.0", optional = true } @@ -67,8 +67,8 @@ tokio-socks = { version = "0.3.0", optional = true } # Feature - TLS native-tls = { version = "0.2.0", optional = true } -tokio-rustls = { version = "0.14.0", optional = true } -tokio-tls = { version = "0.3.0", optional = true } +tokio-rustls = { version = "0.20.0", optional = true } +tokio-native-tls = { version = "0.2.0", optional = true } webpki-roots = { version = "0.20.0", optional = true } @@ -78,6 +78,7 @@ args = "2.0.0" env_logger = "0.7.0" futures = "0.3.0" getopts = "0.2.0" +tokio = { version = "0.3.0", features = ["rt", "rt-multi-thread", "macros", "net", "stream", "time"] } [[example]] diff --git a/irc-proto/Cargo.toml b/irc-proto/Cargo.toml index 069b3bf..3f31e3c 100644 --- a/irc-proto/Cargo.toml +++ b/irc-proto/Cargo.toml @@ -21,5 +21,5 @@ encoding = "0.2.0" thiserror = "1.0.0" bytes = { version = "0.5.0", optional = true } -tokio = { version = "0.2.0", optional = true } -tokio-util = { version = "0.3.0", features = ["codec"], optional = true } +tokio = { version = "0.3.0", optional = true } +tokio-util = { version = "0.4.0", features = ["codec"], optional = true } diff --git a/src/client/conn.rs b/src/client/conn.rs index 716b8f8..c01b872 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -7,7 +7,7 @@ use std::{ task::{Context, Poll}, }; use tokio::net::TcpStream; -use tokio_util::codec::Decoder; +use tokio_util::codec::Framed; #[cfg(feature = "proxy")] use tokio_socks::tcp::Socks5Stream; @@ -22,7 +22,7 @@ use std::{fs::File, io::Read}; use native_tls::{Certificate, Identity, TlsConnector}; #[cfg(feature = "tls-native")] -use tokio_tls::{self, TlsStream}; +use tokio_native_tls::{self, TlsStream}; #[cfg(feature = "tls-rust")] use std::{ @@ -150,7 +150,7 @@ impl Connection { tx: UnboundedSender, ) -> error::Result> { let stream = Self::new_stream(config).await?; - let framed = IrcCodec::new(config.encoding())?.framed(stream); + let framed = Framed::new(stream, IrcCodec::new(config.encoding())?); Ok(Transport::new(&config, framed, tx)) } @@ -184,12 +184,12 @@ impl Connection { ); } - let connector: tokio_tls::TlsConnector = builder.build()?.into(); + let connector: tokio_native_tls::TlsConnector = builder.build()?.into(); let domain = config.server()?; let stream = Self::new_stream(config).await?; let stream = connector.connect(domain, stream).await?; - let framed = IrcCodec::new(config.encoding())?.framed(stream); + let framed = Framed::new(stream, IrcCodec::new(config.encoding())?); Ok(Transport::new(&config, framed, tx)) } @@ -236,7 +236,7 @@ impl Connection { let stream = Self::new_stream(config).await?; let stream = connector.connect(domain, stream).await?; - let framed = IrcCodec::new(config.encoding())?.framed(stream); + let framed = Framed::new(stream, IrcCodec::new(config.encoding())?); Ok(Transport::new(&config, framed, tx)) } @@ -262,7 +262,7 @@ impl Connection { })?; let stream = MockStream::new(&initial); - let framed = IrcCodec::new(config.encoding())?.framed(stream); + let framed = Framed::new(stream, IrcCodec::new(config.encoding())?); Ok(Transport::new(&config, framed, tx)) } diff --git a/src/client/mock.rs b/src/client/mock.rs index e1c3334..eb7e885 100644 --- a/src/client/mock.rs +++ b/src/client/mock.rs @@ -3,7 +3,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; /// A fake stream for testing network applications backed by buffers. #[derive(Clone, Debug)] @@ -41,9 +41,11 @@ impl AsyncRead for MockStream { fn poll_read( mut self: Pin<&mut Self>, _: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Poll::Ready(self.as_mut().received.read(buf)) + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let n = self.as_mut().received.read(buf.initialize_unfilled())?; + buf.advance(n); + Poll::Ready(Ok(())) } } diff --git a/src/client/transport.rs b/src/client/transport.rs index 1d10ffa..573d566 100644 --- a/src/client/transport.rs +++ b/src/client/transport.rs @@ -13,7 +13,7 @@ use futures_channel::mpsc::UnboundedSender; use futures_util::{future::Future, ready, sink::Sink, stream::Stream}; use tokio::{ io::{AsyncRead, AsyncWrite}, - time::{self, Delay, Interval}, + time::{self, Interval, Sleep}, }; use tokio_util::codec::Framed; @@ -31,7 +31,7 @@ struct Pinger { /// The amount of time to wait before timing out from no ping response. ping_timeout: Duration, /// The instant that the last ping was sent to the server. - ping_deadline: Option, + ping_deadline: Option, /// The interval at which to send pings. ping_interval: Interval, } @@ -98,7 +98,7 @@ impl Pinger { /// Set the ping deadline. fn set_deadline(&mut self) { if self.ping_deadline.is_none() { - let ping_deadline = time::delay_for(self.ping_timeout); + let ping_deadline = time::sleep(self.ping_timeout); self.ping_deadline = Some(ping_deadline); } } From 9437f7264ae0d38f55b667309e71db99d0a19c04 Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Wed, 28 Oct 2020 03:41:01 +0100 Subject: [PATCH 3/4] Switch to tokio channels since they can now be suitably used --- Cargo.toml | 5 ++--- src/client/conn.rs | 2 +- src/client/mod.rs | 6 +++--- src/client/transport.rs | 8 +++----- src/error.rs | 25 ++++++++----------------- 5 files changed, 17 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1adcbb5..83e63dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,14 +45,13 @@ bufstream = "0.1.0" bytes = "0.5.0" chrono = "0.4.0" encoding = "0.2.0" -futures-channel = "0.3.0" -futures-util = { version = "0.3.0", features = ["sink"] } +futures-util = { version = "0.3.0", default-features = false, features = ["sink"] } irc-proto = { version = "0.14.0", path = "irc-proto" } log = "0.4.0" parking_lot = "0.11.0" pin-utils = "0.1.0-alpha.4" thiserror = "1.0.0" -tokio = { version = "0.3.0", features = ["net", "stream", "time"] } +tokio = { version = "0.3.0", features = ["net", "stream", "time", "sync"] } tokio-util = { version = "0.4.0", features = ["codec"] } # Feature - Config diff --git a/src/client/conn.rs b/src/client/conn.rs index c01b872..a23e68d 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -1,5 +1,4 @@ //! A module providing IRC connections for use by `IrcServer`s. -use futures_channel::mpsc::UnboundedSender; use futures_util::{sink::Sink, stream::Stream}; use std::{ fmt, @@ -7,6 +6,7 @@ use std::{ task::{Context, Poll}, }; use tokio::net::TcpStream; +use tokio::sync::mpsc::UnboundedSender; use tokio_util::codec::Framed; #[cfg(feature = "proxy")] diff --git a/src/client/mod.rs b/src/client/mod.rs index 702faaf..d710b61 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -49,7 +49,6 @@ #[cfg(feature = "ctcp")] use chrono::prelude::*; -use futures_channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; use futures_util::{ future::{FusedFuture, Future}, ready, @@ -68,6 +67,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use crate::{ client::{ @@ -825,7 +825,7 @@ pub struct Sender { impl Sender { /// Send a single message to the unbounded queue. pub fn send>(&self, msg: M) -> error::Result<()> { - Ok(self.tx_outgoing.unbounded_send(msg.into())?) + Ok(self.tx_outgoing.send(msg.into())?) } pub_state_base!(); @@ -932,7 +932,7 @@ impl Client { /// single, shared event loop. It can also be used to take more control over execution and error /// handling. Connection will not occur until the event loop is run. pub async fn from_config(config: Config) -> error::Result { - let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); + let (tx_outgoing, rx_outgoing) = mpsc::unbounded_channel(); let conn = Connection::new(&config, tx_outgoing.clone()).await?; #[cfg(test)] diff --git a/src/client/transport.rs b/src/client/transport.rs index 573d566..5184e03 100644 --- a/src/client/transport.rs +++ b/src/client/transport.rs @@ -9,8 +9,8 @@ use std::{ }; use chrono::prelude::*; -use futures_channel::mpsc::UnboundedSender; use futures_util::{future::Future, ready, sink::Sink, stream::Stream}; +use tokio::sync::mpsc::UnboundedSender; use tokio::{ io::{AsyncRead, AsyncWrite}, time::{self, Interval, Sleep}, @@ -77,8 +77,7 @@ impl Pinger { /// Send a pong. fn send_pong(&mut self, data: &str) -> error::Result<()> { - self.tx - .unbounded_send(Command::PONG(data.to_owned(), None).into())?; + self.tx.send(Command::PONG(data.to_owned(), None).into())?; Ok(()) } @@ -89,8 +88,7 @@ impl Pinger { // Creates new ping data using the local timestamp. let data = format!("{}", Local::now().timestamp()); - self.tx - .unbounded_send(Command::PING(data.clone(), None).into())?; + self.tx.send(Command::PING(data.clone(), None).into())?; Ok(()) } diff --git a/src/error.rs b/src/error.rs index e848849..c338bf1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,11 +3,8 @@ use std::io::Error as IoError; use std::sync::mpsc::RecvError; -use futures_channel::{ - mpsc::{SendError, TrySendError}, - oneshot::Canceled, -}; use thiserror::Error; +use tokio::sync::mpsc::error::{SendError, TrySendError}; #[cfg(feature = "tls-rust")] use tokio_rustls::webpki::InvalidDNSNameError; @@ -45,11 +42,11 @@ pub enum Error { /// An internal asynchronous channel closed. #[error("an async channel closed")] - AsyncChannelClosed(#[source] SendError), + AsyncChannelClosed, /// An internal oneshot channel closed. #[error("a oneshot channel closed")] - OneShotCanceled(#[source] Canceled), + OneShotCanceled, /// Error for invalid configurations. #[error("invalid config: {}", path)] @@ -205,20 +202,14 @@ impl From for Error { } } -impl From for Error { - fn from(e: SendError) -> Error { - Error::AsyncChannelClosed(e) +impl From> for Error { + fn from(_: SendError) -> Error { + Error::AsyncChannelClosed } } impl From> for Error { - fn from(e: TrySendError) -> Error { - Error::AsyncChannelClosed(e.into_send_error()) - } -} - -impl From for Error { - fn from(e: Canceled) -> Error { - Error::OneShotCanceled(e) + fn from(_: TrySendError) -> Error { + Error::AsyncChannelClosed } } From 370fb1c0885830e593e78fdc020015c9ee9587eb Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Wed, 28 Oct 2020 03:47:07 +0100 Subject: [PATCH 4/4] Bump and re-use a few unused deps --- Cargo.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 83e63dd..7e56736 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,15 +41,12 @@ tls-rust = ["tokio-rustls", "webpki-roots"] [dependencies] -bufstream = "0.1.0" -bytes = "0.5.0" chrono = "0.4.0" encoding = "0.2.0" futures-util = { version = "0.3.0", default-features = false, features = ["sink"] } irc-proto = { version = "0.14.0", path = "irc-proto" } log = "0.4.0" parking_lot = "0.11.0" -pin-utils = "0.1.0-alpha.4" thiserror = "1.0.0" tokio = { version = "0.3.0", features = ["net", "stream", "time", "sync"] } tokio-util = { version = "0.4.0", features = ["codec"] }