From 23112f75829bca40d1cbf0b36c16531c24925ce2 Mon Sep 17 00:00:00 2001 From: Douman Date: Sat, 22 Sep 2018 23:31:14 +0300 Subject: [PATCH] Clean up tokio-core --- Cargo.toml | 2 +- src/client/conn.rs | 13 ++++++------- src/client/mod.rs | 29 +++++++++++++---------------- src/client/reactor.rs | 14 +++++++------- src/lib.rs | 2 +- 5 files changed, 28 insertions(+), 32 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 506b05c..81f28ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,8 +39,8 @@ serde = "1.0" serde_derive = "1.0" serde_json = { version = "1.0", optional = true } serde_yaml = { version = "0.7", optional = true } +tokio = "0.1" tokio-codec = "0.1" -tokio-core = "0.1" tokio-io = "0.1" tokio-mockstream = "1.1" tokio-timer = "0.1" diff --git a/src/client/conn.rs b/src/client/conn.rs index 3c8ea22..0f91714 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -8,8 +8,7 @@ use encoding::label::encoding_from_whatwg_label; use futures::{Async, Poll, Future, Sink, StartSend, Stream}; use native_tls::{Certificate, TlsConnector, Identity}; use tokio_codec::Decoder; -use tokio_core::reactor::Handle; -use tokio_core::net::{TcpStream, TcpStreamNew}; +use tokio::net::{TcpStream, ConnectFuture}; use tokio_mockstream::MockStream; use tokio_tls::{self, TlsStream}; @@ -48,7 +47,7 @@ type TlsFuture = Box /// A future representing an eventual `Connection`. pub enum ConnectionFuture<'a> { #[doc(hidden)] - Unsecured(&'a Config, TcpStreamNew), + Unsecured(&'a Config, ConnectFuture), #[doc(hidden)] Secured(&'a Config, TlsFuture), #[doc(hidden)] @@ -122,8 +121,8 @@ impl<'a> Future for ConnectionFuture<'a> { } impl Connection { - /// Creates a new `Connection` using the specified `Config` and `Handle`. - pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result> { + /// Creates a new `Connection` using the specified `Config` + pub fn new<'a>(config: &'a Config) -> error::Result> { if config.use_mock_connection() { Ok(ConnectionFuture::Mock(config)) } else if config.use_ssl() { @@ -148,7 +147,7 @@ impl Connection { info!("Using {} for client certificate authentication.", client_cert_path); } let connector: tokio_tls::TlsConnector = builder.build()?.into(); - let stream = Box::new(TcpStream::connect(&config.socket_addr()?, handle).map_err(|e| { + let stream = Box::new(TcpStream::connect(&config.socket_addr()?).map_err(|e| { let res: error::IrcError = e.into(); res }).and_then(move |socket| { @@ -161,7 +160,7 @@ impl Connection { info!("Connecting to {}.", config.server()?); Ok(ConnectionFuture::Unsecured( config, - TcpStream::connect(&config.socket_addr()?, handle), + TcpStream::connect(&config.socket_addr()?), )) } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 258c94b..e685763 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -57,7 +57,7 @@ use futures::stream::SplitStream; use futures::sync::mpsc; use futures::sync::oneshot; use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tokio_core::reactor::{Core, Handle}; +use tokio::runtime::current_thread::Runtime; use error; use client::conn::{Connection, ConnectionFuture}; @@ -118,7 +118,7 @@ pub trait Client { /// Gets the configuration being used with this `Client`. fn config(&self) -> &Config; - /// Sends a [`Command`](../proto/command/enum.Command.html) as this `Client`. This is the + /// Sends a [`Command`](../proto/command/enum.Command.html) as this `Client`. This is the /// core primitive for sending messages to the server. In practice, it's often more pleasant /// (and more idiomatic) to use the functions defined on /// [`ClientExt`](./ext/trait.ClientExt.html). They capture a lot of the more repetitive @@ -694,10 +694,9 @@ impl IrcClient { let cfg = config.clone(); let _ = thread::spawn(move || { - let mut reactor = Core::new().unwrap(); + let mut reactor = Runtime::new().unwrap(); // Setting up internal processing stuffs. - let handle = reactor.handle(); - let conn = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap(); + let conn = reactor.block_on(Connection::new(&cfg).unwrap()).unwrap(); tx_view.send(conn.log_view()).unwrap(); let (sink, stream) = conn.split(); @@ -709,7 +708,7 @@ impl IrcClient { // Send the stream half back to the original thread. tx_incoming.send(stream).unwrap(); - reactor.run(outgoing_future).unwrap(); + reactor.block_on(outgoing_future).unwrap(); }); Ok(IrcClient { @@ -733,36 +732,35 @@ impl IrcClient { /// # Example /// ```no_run /// # extern crate irc; - /// # extern crate tokio_core; + /// # extern crate tokio; /// # use std::default::Default; /// # use irc::client::prelude::*; /// # use irc::client::PackedIrcClient; /// # use irc::error; - /// # use tokio_core::reactor::Core; + /// # use tokio::runtime::current_thread::Runtime; /// # fn main() { /// # let config = Config { /// # nickname: Some("example".to_owned()), /// # server: Some("irc.example.com".to_owned()), /// # .. Default::default() /// # }; - /// let mut reactor = Core::new().unwrap(); - /// let future = IrcClient::new_future(reactor.handle(), &config).unwrap(); + /// let mut reactor = Runtime::new().unwrap(); + /// let future = IrcClient::new_future(&config).unwrap(); /// // immediate connection errors (like no internet) will turn up here... - /// let PackedIrcClient(client, future) = reactor.run(future).unwrap(); + /// let PackedIrcClient(client, future) = reactor.block_on(future).unwrap(); /// // runtime errors (like disconnections and so forth) will turn up here... - /// reactor.run(client.stream().for_each(move |irc_msg| { + /// reactor.block_on(client.stream().for_each(move |irc_msg| { /// // processing messages works like usual /// process_msg(&client, irc_msg) /// }).join(future)).unwrap(); /// # } /// # fn process_msg(server: &IrcClient, message: Message) -> error::Result<()> { Ok(()) } /// ``` - pub fn new_future(handle: Handle, config: &Config) -> error::Result { + pub fn new_future(config: &Config) -> error::Result { let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); Ok(IrcClientFuture { - conn: Connection::new(config, &handle)?, - _handle: handle, + conn: Connection::new(config)?, config: config, tx_outgoing: Some(tx_outgoing), rx_outgoing: Some(rx_outgoing), @@ -793,7 +791,6 @@ impl IrcClient { #[derive(Debug)] pub struct IrcClientFuture<'a> { conn: ConnectionFuture<'a>, - _handle: Handle, config: &'a Config, tx_outgoing: Option>, rx_outgoing: Option>, diff --git a/src/client/reactor.rs b/src/client/reactor.rs index 34bf937..7aea153 100644 --- a/src/client/reactor.rs +++ b/src/client/reactor.rs @@ -25,7 +25,7 @@ use futures::{Future, IntoFuture, Stream}; use futures::future; -use tokio_core::reactor::{Core, Handle}; +use tokio::runtime::current_thread as tokio_rt; use client::data::Config; use client::{IrcClient, IrcClientFuture, PackedIrcClient, Client}; @@ -40,7 +40,7 @@ use proto::Message; /// /// For a full example usage, see [`irc::client::reactor`](./index.html). pub struct IrcReactor { - inner: Core, + inner: tokio_rt::Runtime, handlers: Vec>>, } @@ -48,7 +48,7 @@ impl IrcReactor { /// Creates a new reactor. pub fn new() -> error::Result { Ok(IrcReactor { - inner: Core::new()?, + inner: tokio_rt::Runtime::new()?, handlers: Vec::new(), }) } @@ -70,7 +70,7 @@ impl IrcReactor { /// # } /// ``` pub fn prepare_client<'a>(&mut self, config: &'a Config) -> error::Result> { - IrcClient::new_future(self.inner_handle(), config) + IrcClient::new_future(config) } /// Runs an [`IrcClientFuture`](../struct.IrcClientFuture.html), such as one from @@ -91,7 +91,7 @@ impl IrcReactor { /// # } /// ``` pub fn connect_client(&mut self, future: IrcClientFuture) -> error::Result { - self.inner.run(future).map(|PackedIrcClient(client, future)| { + self.inner.block_on(future).map(|PackedIrcClient(client, future)| { self.register_future(future); client }) @@ -159,7 +159,7 @@ impl IrcReactor { /// Returns a handle to the internal event loop. This is a sort of escape hatch that allows you /// to take more control over what runs on the reactor using `tokio`. This can be used for /// sharing this reactor with some elements of other libraries. - pub fn inner_handle(&self) -> Handle { + pub fn inner_handle(&self) -> tokio_rt::Handle { self.inner.handle() } @@ -187,6 +187,6 @@ impl IrcReactor { while let Some(handler) = self.handlers.pop() { handlers.push(handler); } - self.inner.run(future::join_all(handlers).map(|_| ())) + self.inner.block_on(future::join_all(handlers).map(|_| ())) } } diff --git a/src/lib.rs b/src/lib.rs index 1852698..813d0d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,8 +59,8 @@ extern crate serde_derive; extern crate serde_json; #[cfg(feature = "yaml")] extern crate serde_yaml; +extern crate tokio; extern crate tokio_codec; -extern crate tokio_core; extern crate tokio_io; extern crate tokio_mockstream; extern crate tokio_timer;