Switch to tokio channels since they can now be suitably used
This commit is contained in:
parent
43c8b1cb63
commit
9437f7264a
5 changed files with 17 additions and 29 deletions
|
@ -45,14 +45,13 @@ bufstream = "0.1.0"
|
||||||
bytes = "0.5.0"
|
bytes = "0.5.0"
|
||||||
chrono = "0.4.0"
|
chrono = "0.4.0"
|
||||||
encoding = "0.2.0"
|
encoding = "0.2.0"
|
||||||
futures-channel = "0.3.0"
|
futures-util = { version = "0.3.0", default-features = false, features = ["sink"] }
|
||||||
futures-util = { version = "0.3.0", features = ["sink"] }
|
|
||||||
irc-proto = { version = "0.14.0", path = "irc-proto" }
|
irc-proto = { version = "0.14.0", path = "irc-proto" }
|
||||||
log = "0.4.0"
|
log = "0.4.0"
|
||||||
parking_lot = "0.11.0"
|
parking_lot = "0.11.0"
|
||||||
pin-utils = "0.1.0-alpha.4"
|
pin-utils = "0.1.0-alpha.4"
|
||||||
thiserror = "1.0.0"
|
thiserror = "1.0.0"
|
||||||
tokio = { version = "0.3.0", features = ["net", "stream", "time"] }
|
tokio = { version = "0.3.0", features = ["net", "stream", "time", "sync"] }
|
||||||
tokio-util = { version = "0.4.0", features = ["codec"] }
|
tokio-util = { version = "0.4.0", features = ["codec"] }
|
||||||
|
|
||||||
# Feature - Config
|
# Feature - Config
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
//! A module providing IRC connections for use by `IrcServer`s.
|
//! A module providing IRC connections for use by `IrcServer`s.
|
||||||
use futures_channel::mpsc::UnboundedSender;
|
|
||||||
use futures_util::{sink::Sink, stream::Stream};
|
use futures_util::{sink::Sink, stream::Stream};
|
||||||
use std::{
|
use std::{
|
||||||
fmt,
|
fmt,
|
||||||
|
@ -7,6 +6,7 @@ use std::{
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tokio_util::codec::Framed;
|
use tokio_util::codec::Framed;
|
||||||
|
|
||||||
#[cfg(feature = "proxy")]
|
#[cfg(feature = "proxy")]
|
||||||
|
|
|
@ -49,7 +49,6 @@
|
||||||
|
|
||||||
#[cfg(feature = "ctcp")]
|
#[cfg(feature = "ctcp")]
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use futures_channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
|
|
||||||
use futures_util::{
|
use futures_util::{
|
||||||
future::{FusedFuture, Future},
|
future::{FusedFuture, Future},
|
||||||
ready,
|
ready,
|
||||||
|
@ -68,6 +67,7 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
client::{
|
client::{
|
||||||
|
@ -825,7 +825,7 @@ pub struct Sender {
|
||||||
impl Sender {
|
impl Sender {
|
||||||
/// Send a single message to the unbounded queue.
|
/// Send a single message to the unbounded queue.
|
||||||
pub fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()> {
|
pub fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()> {
|
||||||
Ok(self.tx_outgoing.unbounded_send(msg.into())?)
|
Ok(self.tx_outgoing.send(msg.into())?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub_state_base!();
|
pub_state_base!();
|
||||||
|
@ -932,7 +932,7 @@ impl Client {
|
||||||
/// single, shared event loop. It can also be used to take more control over execution and error
|
/// single, shared event loop. It can also be used to take more control over execution and error
|
||||||
/// handling. Connection will not occur until the event loop is run.
|
/// handling. Connection will not occur until the event loop is run.
|
||||||
pub async fn from_config(config: Config) -> error::Result<Client> {
|
pub async fn from_config(config: Config) -> error::Result<Client> {
|
||||||
let (tx_outgoing, rx_outgoing) = mpsc::unbounded();
|
let (tx_outgoing, rx_outgoing) = mpsc::unbounded_channel();
|
||||||
let conn = Connection::new(&config, tx_outgoing.clone()).await?;
|
let conn = Connection::new(&config, tx_outgoing.clone()).await?;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -9,8 +9,8 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use futures_channel::mpsc::UnboundedSender;
|
|
||||||
use futures_util::{future::Future, ready, sink::Sink, stream::Stream};
|
use futures_util::{future::Future, ready, sink::Sink, stream::Stream};
|
||||||
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncRead, AsyncWrite},
|
io::{AsyncRead, AsyncWrite},
|
||||||
time::{self, Interval, Sleep},
|
time::{self, Interval, Sleep},
|
||||||
|
@ -77,8 +77,7 @@ impl Pinger {
|
||||||
|
|
||||||
/// Send a pong.
|
/// Send a pong.
|
||||||
fn send_pong(&mut self, data: &str) -> error::Result<()> {
|
fn send_pong(&mut self, data: &str) -> error::Result<()> {
|
||||||
self.tx
|
self.tx.send(Command::PONG(data.to_owned(), None).into())?;
|
||||||
.unbounded_send(Command::PONG(data.to_owned(), None).into())?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,8 +88,7 @@ impl Pinger {
|
||||||
// Creates new ping data using the local timestamp.
|
// Creates new ping data using the local timestamp.
|
||||||
let data = format!("{}", Local::now().timestamp());
|
let data = format!("{}", Local::now().timestamp());
|
||||||
|
|
||||||
self.tx
|
self.tx.send(Command::PING(data.clone(), None).into())?;
|
||||||
.unbounded_send(Command::PING(data.clone(), None).into())?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
25
src/error.rs
25
src/error.rs
|
@ -3,11 +3,8 @@
|
||||||
use std::io::Error as IoError;
|
use std::io::Error as IoError;
|
||||||
use std::sync::mpsc::RecvError;
|
use std::sync::mpsc::RecvError;
|
||||||
|
|
||||||
use futures_channel::{
|
|
||||||
mpsc::{SendError, TrySendError},
|
|
||||||
oneshot::Canceled,
|
|
||||||
};
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tokio::sync::mpsc::error::{SendError, TrySendError};
|
||||||
|
|
||||||
#[cfg(feature = "tls-rust")]
|
#[cfg(feature = "tls-rust")]
|
||||||
use tokio_rustls::webpki::InvalidDNSNameError;
|
use tokio_rustls::webpki::InvalidDNSNameError;
|
||||||
|
@ -45,11 +42,11 @@ pub enum Error {
|
||||||
|
|
||||||
/// An internal asynchronous channel closed.
|
/// An internal asynchronous channel closed.
|
||||||
#[error("an async channel closed")]
|
#[error("an async channel closed")]
|
||||||
AsyncChannelClosed(#[source] SendError),
|
AsyncChannelClosed,
|
||||||
|
|
||||||
/// An internal oneshot channel closed.
|
/// An internal oneshot channel closed.
|
||||||
#[error("a oneshot channel closed")]
|
#[error("a oneshot channel closed")]
|
||||||
OneShotCanceled(#[source] Canceled),
|
OneShotCanceled,
|
||||||
|
|
||||||
/// Error for invalid configurations.
|
/// Error for invalid configurations.
|
||||||
#[error("invalid config: {}", path)]
|
#[error("invalid config: {}", path)]
|
||||||
|
@ -205,20 +202,14 @@ impl From<RecvError> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<SendError> for Error {
|
impl<T> From<SendError<T>> for Error {
|
||||||
fn from(e: SendError) -> Error {
|
fn from(_: SendError<T>) -> Error {
|
||||||
Error::AsyncChannelClosed(e)
|
Error::AsyncChannelClosed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> From<TrySendError<T>> for Error {
|
impl<T> From<TrySendError<T>> for Error {
|
||||||
fn from(e: TrySendError<T>) -> Error {
|
fn from(_: TrySendError<T>) -> Error {
|
||||||
Error::AsyncChannelClosed(e.into_send_error())
|
Error::AsyncChannelClosed
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Canceled> for Error {
|
|
||||||
fn from(e: Canceled) -> Error {
|
|
||||||
Error::OneShotCanceled(e)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue