Clean up tokio-core

This commit is contained in:
Douman 2018-09-22 23:31:14 +03:00
parent f109c10475
commit 23112f7582
5 changed files with 28 additions and 32 deletions

View file

@ -39,8 +39,8 @@ serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = { version = "1.0", optional = true } serde_json = { version = "1.0", optional = true }
serde_yaml = { version = "0.7", optional = true } serde_yaml = { version = "0.7", optional = true }
tokio = "0.1"
tokio-codec = "0.1" tokio-codec = "0.1"
tokio-core = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-mockstream = "1.1" tokio-mockstream = "1.1"
tokio-timer = "0.1" tokio-timer = "0.1"

View file

@ -8,8 +8,7 @@ use encoding::label::encoding_from_whatwg_label;
use futures::{Async, Poll, Future, Sink, StartSend, Stream}; use futures::{Async, Poll, Future, Sink, StartSend, Stream};
use native_tls::{Certificate, TlsConnector, Identity}; use native_tls::{Certificate, TlsConnector, Identity};
use tokio_codec::Decoder; use tokio_codec::Decoder;
use tokio_core::reactor::Handle; use tokio::net::{TcpStream, ConnectFuture};
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_mockstream::MockStream; use tokio_mockstream::MockStream;
use tokio_tls::{self, TlsStream}; use tokio_tls::{self, TlsStream};
@ -48,7 +47,7 @@ type TlsFuture = Box<Future<Error = error::IrcError, Item = TlsStream<TcpStream>
/// A future representing an eventual `Connection`. /// A future representing an eventual `Connection`.
pub enum ConnectionFuture<'a> { pub enum ConnectionFuture<'a> {
#[doc(hidden)] #[doc(hidden)]
Unsecured(&'a Config, TcpStreamNew), Unsecured(&'a Config, ConnectFuture),
#[doc(hidden)] #[doc(hidden)]
Secured(&'a Config, TlsFuture), Secured(&'a Config, TlsFuture),
#[doc(hidden)] #[doc(hidden)]
@ -122,8 +121,8 @@ impl<'a> Future for ConnectionFuture<'a> {
} }
impl Connection { impl Connection {
/// Creates a new `Connection` using the specified `Config` and `Handle`. /// Creates a new `Connection` using the specified `Config`
pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result<ConnectionFuture<'a>> { pub fn new<'a>(config: &'a Config) -> error::Result<ConnectionFuture<'a>> {
if config.use_mock_connection() { if config.use_mock_connection() {
Ok(ConnectionFuture::Mock(config)) Ok(ConnectionFuture::Mock(config))
} else if config.use_ssl() { } else if config.use_ssl() {
@ -148,7 +147,7 @@ impl Connection {
info!("Using {} for client certificate authentication.", client_cert_path); info!("Using {} for client certificate authentication.", client_cert_path);
} }
let connector: tokio_tls::TlsConnector = builder.build()?.into(); let connector: tokio_tls::TlsConnector = builder.build()?.into();
let stream = Box::new(TcpStream::connect(&config.socket_addr()?, handle).map_err(|e| { let stream = Box::new(TcpStream::connect(&config.socket_addr()?).map_err(|e| {
let res: error::IrcError = e.into(); let res: error::IrcError = e.into();
res res
}).and_then(move |socket| { }).and_then(move |socket| {
@ -161,7 +160,7 @@ impl Connection {
info!("Connecting to {}.", config.server()?); info!("Connecting to {}.", config.server()?);
Ok(ConnectionFuture::Unsecured( Ok(ConnectionFuture::Unsecured(
config, config,
TcpStream::connect(&config.socket_addr()?, handle), TcpStream::connect(&config.socket_addr()?),
)) ))
} }
} }

View file

@ -57,7 +57,7 @@ use futures::stream::SplitStream;
use futures::sync::mpsc; use futures::sync::mpsc;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio_core::reactor::{Core, Handle}; use tokio::runtime::current_thread::Runtime;
use error; use error;
use client::conn::{Connection, ConnectionFuture}; use client::conn::{Connection, ConnectionFuture};
@ -118,7 +118,7 @@ pub trait Client {
/// Gets the configuration being used with this `Client`. /// Gets the configuration being used with this `Client`.
fn config(&self) -> &Config; fn config(&self) -> &Config;
/// Sends a [`Command`](../proto/command/enum.Command.html) as this `Client`. This is the /// Sends a [`Command`](../proto/command/enum.Command.html) as this `Client`. This is the
/// core primitive for sending messages to the server. In practice, it's often more pleasant /// core primitive for sending messages to the server. In practice, it's often more pleasant
/// (and more idiomatic) to use the functions defined on /// (and more idiomatic) to use the functions defined on
/// [`ClientExt`](./ext/trait.ClientExt.html). They capture a lot of the more repetitive /// [`ClientExt`](./ext/trait.ClientExt.html). They capture a lot of the more repetitive
@ -694,10 +694,9 @@ impl IrcClient {
let cfg = config.clone(); let cfg = config.clone();
let _ = thread::spawn(move || { let _ = thread::spawn(move || {
let mut reactor = Core::new().unwrap(); let mut reactor = Runtime::new().unwrap();
// Setting up internal processing stuffs. // Setting up internal processing stuffs.
let handle = reactor.handle(); let conn = reactor.block_on(Connection::new(&cfg).unwrap()).unwrap();
let conn = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap();
tx_view.send(conn.log_view()).unwrap(); tx_view.send(conn.log_view()).unwrap();
let (sink, stream) = conn.split(); let (sink, stream) = conn.split();
@ -709,7 +708,7 @@ impl IrcClient {
// Send the stream half back to the original thread. // Send the stream half back to the original thread.
tx_incoming.send(stream).unwrap(); tx_incoming.send(stream).unwrap();
reactor.run(outgoing_future).unwrap(); reactor.block_on(outgoing_future).unwrap();
}); });
Ok(IrcClient { Ok(IrcClient {
@ -733,36 +732,35 @@ impl IrcClient {
/// # Example /// # Example
/// ```no_run /// ```no_run
/// # extern crate irc; /// # extern crate irc;
/// # extern crate tokio_core; /// # extern crate tokio;
/// # use std::default::Default; /// # use std::default::Default;
/// # use irc::client::prelude::*; /// # use irc::client::prelude::*;
/// # use irc::client::PackedIrcClient; /// # use irc::client::PackedIrcClient;
/// # use irc::error; /// # use irc::error;
/// # use tokio_core::reactor::Core; /// # use tokio::runtime::current_thread::Runtime;
/// # fn main() { /// # fn main() {
/// # let config = Config { /// # let config = Config {
/// # nickname: Some("example".to_owned()), /// # nickname: Some("example".to_owned()),
/// # server: Some("irc.example.com".to_owned()), /// # server: Some("irc.example.com".to_owned()),
/// # .. Default::default() /// # .. Default::default()
/// # }; /// # };
/// let mut reactor = Core::new().unwrap(); /// let mut reactor = Runtime::new().unwrap();
/// let future = IrcClient::new_future(reactor.handle(), &config).unwrap(); /// let future = IrcClient::new_future(&config).unwrap();
/// // immediate connection errors (like no internet) will turn up here... /// // immediate connection errors (like no internet) will turn up here...
/// let PackedIrcClient(client, future) = reactor.run(future).unwrap(); /// let PackedIrcClient(client, future) = reactor.block_on(future).unwrap();
/// // runtime errors (like disconnections and so forth) will turn up here... /// // runtime errors (like disconnections and so forth) will turn up here...
/// reactor.run(client.stream().for_each(move |irc_msg| { /// reactor.block_on(client.stream().for_each(move |irc_msg| {
/// // processing messages works like usual /// // processing messages works like usual
/// process_msg(&client, irc_msg) /// process_msg(&client, irc_msg)
/// }).join(future)).unwrap(); /// }).join(future)).unwrap();
/// # } /// # }
/// # fn process_msg(server: &IrcClient, message: Message) -> error::Result<()> { Ok(()) } /// # fn process_msg(server: &IrcClient, message: Message) -> error::Result<()> { Ok(()) }
/// ``` /// ```
pub fn new_future(handle: Handle, config: &Config) -> error::Result<IrcClientFuture> { pub fn new_future(config: &Config) -> error::Result<IrcClientFuture> {
let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); let (tx_outgoing, rx_outgoing) = mpsc::unbounded();
Ok(IrcClientFuture { Ok(IrcClientFuture {
conn: Connection::new(config, &handle)?, conn: Connection::new(config)?,
_handle: handle,
config: config, config: config,
tx_outgoing: Some(tx_outgoing), tx_outgoing: Some(tx_outgoing),
rx_outgoing: Some(rx_outgoing), rx_outgoing: Some(rx_outgoing),
@ -793,7 +791,6 @@ impl IrcClient {
#[derive(Debug)] #[derive(Debug)]
pub struct IrcClientFuture<'a> { pub struct IrcClientFuture<'a> {
conn: ConnectionFuture<'a>, conn: ConnectionFuture<'a>,
_handle: Handle,
config: &'a Config, config: &'a Config,
tx_outgoing: Option<UnboundedSender<Message>>, tx_outgoing: Option<UnboundedSender<Message>>,
rx_outgoing: Option<UnboundedReceiver<Message>>, rx_outgoing: Option<UnboundedReceiver<Message>>,

View file

@ -25,7 +25,7 @@
use futures::{Future, IntoFuture, Stream}; use futures::{Future, IntoFuture, Stream};
use futures::future; use futures::future;
use tokio_core::reactor::{Core, Handle}; use tokio::runtime::current_thread as tokio_rt;
use client::data::Config; use client::data::Config;
use client::{IrcClient, IrcClientFuture, PackedIrcClient, Client}; use client::{IrcClient, IrcClientFuture, PackedIrcClient, Client};
@ -40,7 +40,7 @@ use proto::Message;
/// ///
/// For a full example usage, see [`irc::client::reactor`](./index.html). /// For a full example usage, see [`irc::client::reactor`](./index.html).
pub struct IrcReactor { pub struct IrcReactor {
inner: Core, inner: tokio_rt::Runtime,
handlers: Vec<Box<Future<Item = (), Error = error::IrcError>>>, handlers: Vec<Box<Future<Item = (), Error = error::IrcError>>>,
} }
@ -48,7 +48,7 @@ impl IrcReactor {
/// Creates a new reactor. /// Creates a new reactor.
pub fn new() -> error::Result<IrcReactor> { pub fn new() -> error::Result<IrcReactor> {
Ok(IrcReactor { Ok(IrcReactor {
inner: Core::new()?, inner: tokio_rt::Runtime::new()?,
handlers: Vec::new(), handlers: Vec::new(),
}) })
} }
@ -70,7 +70,7 @@ impl IrcReactor {
/// # } /// # }
/// ``` /// ```
pub fn prepare_client<'a>(&mut self, config: &'a Config) -> error::Result<IrcClientFuture<'a>> { pub fn prepare_client<'a>(&mut self, config: &'a Config) -> error::Result<IrcClientFuture<'a>> {
IrcClient::new_future(self.inner_handle(), config) IrcClient::new_future(config)
} }
/// Runs an [`IrcClientFuture`](../struct.IrcClientFuture.html), such as one from /// Runs an [`IrcClientFuture`](../struct.IrcClientFuture.html), such as one from
@ -91,7 +91,7 @@ impl IrcReactor {
/// # } /// # }
/// ``` /// ```
pub fn connect_client(&mut self, future: IrcClientFuture) -> error::Result<IrcClient> { pub fn connect_client(&mut self, future: IrcClientFuture) -> error::Result<IrcClient> {
self.inner.run(future).map(|PackedIrcClient(client, future)| { self.inner.block_on(future).map(|PackedIrcClient(client, future)| {
self.register_future(future); self.register_future(future);
client client
}) })
@ -159,7 +159,7 @@ impl IrcReactor {
/// Returns a handle to the internal event loop. This is a sort of escape hatch that allows you /// Returns a handle to the internal event loop. This is a sort of escape hatch that allows you
/// to take more control over what runs on the reactor using `tokio`. This can be used for /// to take more control over what runs on the reactor using `tokio`. This can be used for
/// sharing this reactor with some elements of other libraries. /// sharing this reactor with some elements of other libraries.
pub fn inner_handle(&self) -> Handle { pub fn inner_handle(&self) -> tokio_rt::Handle {
self.inner.handle() self.inner.handle()
} }
@ -187,6 +187,6 @@ impl IrcReactor {
while let Some(handler) = self.handlers.pop() { while let Some(handler) = self.handlers.pop() {
handlers.push(handler); handlers.push(handler);
} }
self.inner.run(future::join_all(handlers).map(|_| ())) self.inner.block_on(future::join_all(handlers).map(|_| ()))
} }
} }

View file

@ -59,8 +59,8 @@ extern crate serde_derive;
extern crate serde_json; extern crate serde_json;
#[cfg(feature = "yaml")] #[cfg(feature = "yaml")]
extern crate serde_yaml; extern crate serde_yaml;
extern crate tokio;
extern crate tokio_codec; extern crate tokio_codec;
extern crate tokio_core;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_mockstream; extern crate tokio_mockstream;
extern crate tokio_timer; extern crate tokio_timer;