diff --git a/Cargo.toml b/Cargo.toml index 8938783..fbd1541 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ serde = "1.0" serde_derive = "1.0" serde_json = { version = "1.0", optional = true } serde_yaml = { version = "0.7", optional = true } +tokio = "0.1" tokio-core = "0.1" tokio-io = "0.1" tokio-mockstream = "1.1" diff --git a/README.md b/README.md index 89a4b7b..9fc2697 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ fn main() { }; let mut reactor = IrcReactor::new().unwrap(); - let client = reactor.prepare_client_and_connect(&config).unwrap(); + let client = reactor.prepare_client_and_connect(config).unwrap(); client.identify().unwrap(); reactor.register_client_with_handler(client, |client, message| { diff --git a/examples/multiserver.rs b/examples/multiserver.rs index c88311e..57e98b8 100644 --- a/examples/multiserver.rs +++ b/examples/multiserver.rs @@ -26,7 +26,7 @@ fn main() { for config in configs { // Immediate errors like failure to resolve the server's domain or to establish any connection will // manifest here in the result of prepare_client_and_connect. - let client = reactor.prepare_client_and_connect(&config).unwrap(); + let client = reactor.prepare_client_and_connect(config).unwrap(); client.identify().unwrap(); // Here, we tell the reactor to setup this client for future handling (in run) using the specified // handler function process_msg. diff --git a/examples/reactor.rs b/examples/reactor.rs index 66dea6b..fd201f2 100644 --- a/examples/reactor.rs +++ b/examples/reactor.rs @@ -14,7 +14,7 @@ fn main() { }; let mut reactor = IrcReactor::new().unwrap(); - let client = reactor.prepare_client_and_connect(&config).unwrap(); + let client = reactor.prepare_client_and_connect(config).unwrap(); client.identify().unwrap(); reactor.register_client_with_handler(client, |client, message| { diff --git a/examples/reconnector.rs b/examples/reconnector.rs index f879f48..579f335 100644 --- a/examples/reconnector.rs +++ b/examples/reconnector.rs @@ -26,7 +26,7 @@ fn main() { loop { let res = configs.iter().fold(Ok(()), |acc, config| { acc.and( - reactor.prepare_client_and_connect(config).and_then(|client| { + reactor.prepare_client_and_connect(config.clone()).and_then(|client| { client.identify().and(Ok(client)) }).and_then(|client| { reactor.register_client_with_handler(client, process_msg); diff --git a/src/client/conn.rs b/src/client/conn.rs index e9552d2..d9d1c12 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -5,10 +5,10 @@ use std::io::Read; use encoding::EncoderTrap; use encoding::label::encoding_from_whatwg_label; +use futures::future; use futures::{Async, Poll, Future, Sink, StartSend, Stream}; use native_tls::{Certificate, TlsConnector, Pkcs12}; -use tokio_core::reactor::Handle; -use tokio_core::net::{TcpStream, TcpStreamNew}; +use tokio::net::{ConnectFuture, TcpStream}; use tokio_io::AsyncRead; use tokio_mockstream::MockStream; use tokio_tls::{TlsConnectorExt, TlsStream}; @@ -42,57 +42,63 @@ impl fmt::Debug for Connection { } } -/// A convenient type alias representing the `TlsStream` future. -type TlsFuture = Box> + Send>; - /// A future representing an eventual `Connection`. -pub enum ConnectionFuture<'a> { +pub enum ConnectionFuture { #[doc(hidden)] - Unsecured(&'a Config, TcpStreamNew), + Unsecured(Config, ConnectFuture), #[doc(hidden)] - Secured(&'a Config, TlsFuture), + Secured(Config, ConnectFuture, TlsConnector), #[doc(hidden)] - Mock(&'a Config), + Mock(Config), } -impl<'a> fmt::Debug for ConnectionFuture<'a> { +impl fmt::Debug for ConnectionFuture { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "{}({:?}, ...)", match *self { ConnectionFuture::Unsecured(_, _) => "ConnectionFuture::Unsecured", - ConnectionFuture::Secured(_, _) => "ConnectionFuture::Secured", + ConnectionFuture::Secured(_, _, _) => "ConnectionFuture::Secured", ConnectionFuture::Mock(_) => "ConnectionFuture::Mock", }, match *self { - ConnectionFuture::Unsecured(cfg, _) | - ConnectionFuture::Secured(cfg, _) | - ConnectionFuture::Mock(cfg) => cfg, + ConnectionFuture::Unsecured(ref cfg, _) | + ConnectionFuture::Secured(ref cfg, _, _) | + ConnectionFuture::Mock(ref cfg) => cfg, } ) } } -impl<'a> Future for ConnectionFuture<'a> { +impl Future for ConnectionFuture { type Item = Connection; type Error = error::IrcError; fn poll(&mut self) -> Poll { match *self { - ConnectionFuture::Unsecured(config, ref mut inner) => { + ConnectionFuture::Unsecured(ref config, ref mut inner) => { let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); let transport = IrcTransport::new(config, framed); Ok(Async::Ready(Connection::Unsecured(transport))) } - ConnectionFuture::Secured(config, ref mut inner) => { + ConnectionFuture::Secured(ref config, ref mut inner, ref connector) => { + let domain = format!("{}", config.server().expect("should already be tested")); + let mut inner = inner.map_err(|e| { + let res: error::IrcError = e.into(); + res + }).and_then(move |socket| { + connector.connect_async(&domain, socket).map_err( + |e| e.into(), + ) + }); let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); let transport = IrcTransport::new(config, framed); Ok(Async::Ready(Connection::Secured(transport))) } - ConnectionFuture::Mock(config) => { + ConnectionFuture::Mock(ref config) => { let enc: error::Result<_> = encoding_from_whatwg_label( config.encoding() ).ok_or_else(|| error::IrcError::UnknownCodec { @@ -119,8 +125,13 @@ impl<'a> Future for ConnectionFuture<'a> { } impl Connection { - /// Creates a new `Connection` using the specified `Config` and `Handle`. - pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result> { + /// Creates a future yielding a new `Connection` using the specified `Config`. + pub fn new(config: Config) -> impl Future { + future::lazy(move || Connection::new_inner(config)) + .and_then(|connection_future| connection_future) + } + + fn new_inner(config: Config) -> error::Result { if config.use_mock_connection() { Ok(ConnectionFuture::Mock(config)) } else if config.use_ssl() { @@ -145,20 +156,18 @@ impl Connection { info!("Using {} for client certificate authentication.", client_cert_path); } let connector = builder.build()?; - let stream = Box::new(TcpStream::connect(&config.socket_addr()?, handle).map_err(|e| { - let res: error::IrcError = e.into(); - res - }).and_then(move |socket| { - connector.connect_async(&domain, socket).map_err( - |e| e.into(), - ) - })); - Ok(ConnectionFuture::Secured(config, stream)) + let socket_addr = config.socket_addr()?; + Ok(ConnectionFuture::Secured( + config, + TcpStream::connect(&socket_addr), + connector + )) } else { info!("Connecting to {}.", config.server()?); + let socket_addr = config.socket_addr()?; Ok(ConnectionFuture::Unsecured( config, - TcpStream::connect(&config.socket_addr()?, handle), + TcpStream::connect(&socket_addr), )) } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 2a92d62..a85302e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -58,11 +58,11 @@ use futures::{Async, Poll, Future, Sink, Stream}; use futures::stream::SplitStream; use futures::sync::mpsc; use futures::sync::oneshot; -use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tokio_core::reactor::{Core, Handle}; +use futures::sync::mpsc::UnboundedSender; +use tokio_core::reactor::Core; use error; -use client::conn::{Connection, ConnectionFuture}; +use client::conn::Connection; use client::data::{Config, User}; use client::ext::ClientExt; use client::transport::LogView; @@ -754,8 +754,7 @@ impl IrcClient { // will instead panic. let _ = thread::spawn(move || { let mut reactor = Core::new().unwrap(); - let handle = reactor.handle(); - let conn = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap(); + let conn = reactor.run(Connection::new(cfg)).unwrap(); tx_view.send(conn.log_view()).unwrap(); let (sink, stream) = conn.split(); @@ -779,9 +778,9 @@ impl IrcClient { }) } - /// Creates a `Future` of an `IrcClient` from the specified configuration and on the event loop - /// corresponding to the given handle. This can be used to set up a number of `IrcClients` on a - /// single, shared event loop. It can also be used to take more control over execution and error + /// Creates a `Future` of an `IrcClient` from the specified configuration. + /// This can be used to set up a number of `IrcClients` on a single, + /// shared event loop. It can also be used to take more control over execution and error /// handling. Connection will not occur until the event loop is run. /// /// Proper usage requires familiarity with `tokio` and `futures`. You can find more information @@ -797,7 +796,6 @@ impl IrcClient { /// # extern crate tokio_core; /// # use std::default::Default; /// # use irc::client::prelude::*; - /// # use irc::client::PackedIrcClient; /// # use irc::error; /// # use tokio_core::reactor::Core; /// # fn main() { @@ -807,9 +805,9 @@ impl IrcClient { /// # .. Default::default() /// # }; /// let mut reactor = Core::new().unwrap(); - /// let future = IrcClient::new_future(reactor.handle(), &config).unwrap(); + /// let future = IrcClient::new_future(config); /// // immediate connection errors (like no internet) will turn up here... - /// let PackedIrcClient(client, future) = reactor.run(future).unwrap(); + /// let (client, future) = reactor.run(future).unwrap(); /// // runtime errors (like disconnections and so forth) will turn up here... /// reactor.run(client.stream().for_each(move |irc_msg| { /// // processing messages works like usual @@ -818,15 +816,28 @@ impl IrcClient { /// # } /// # fn process_msg(server: &IrcClient, message: Message) -> error::Result<()> { Ok(()) } /// ``` - pub fn new_future(handle: Handle, config: &Config) -> error::Result { - let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); - - Ok(IrcClientFuture { - conn: Connection::new(config, &handle)?, - _handle: handle, config, - tx_outgoing: Some(tx_outgoing), - rx_outgoing: Some(rx_outgoing), - }) + pub fn new_future(config: Config) -> impl Future< + Item = (IrcClient, impl Future + 'static), + Error = error::IrcError + > { + Connection::new(config.clone()) + .and_then(move |connection| { + let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); + let log_view = connection.log_view(); + let (sink, stream) = connection.split(); + let outgoing_future = sink.send_all( + rx_outgoing.map_err::(|()| { + unreachable!("futures::sync::mpsc::Receiver should never return Err"); + }) + ).map(|_| ()); + ClientState::new(stream, tx_outgoing, config).map(|state| { + let client = IrcClient { + state: Arc::new(state), + view: log_view, + }; + (client, outgoing_future) + }) + }) } /// Gets the current nickname in use. This may be the primary username set in the configuration, @@ -843,59 +854,6 @@ impl IrcClient { } } -/// A future representing the eventual creation of an `IrcClient`. This future returns a -/// `PackedIrcClient` which includes the actual `IrcClient` being created and a future that drives -/// the sending of messages for the client. -/// -/// Interaction with this future relies on the `futures` API, but is only expected for more advanced -/// use cases. To learn more, you can view the documentation for the -/// [`futures`](https://docs.rs/futures/) crate, or the tutorials for -/// [`tokio`](https://tokio.rs/docs/getting-started/futures/). An easy to use abstraction that does -/// not require this knowledge is available via [`IrcReactors`](./reactor/struct.IrcReactor.html). -#[derive(Debug)] -pub struct IrcClientFuture<'a> { - conn: ConnectionFuture<'a>, - _handle: Handle, - config: &'a Config, - tx_outgoing: Option>, - rx_outgoing: Option>, -} - -impl<'a> Future for IrcClientFuture<'a> { - type Item = PackedIrcClient; - type Error = error::IrcError; - - fn poll(&mut self) -> Poll { - let conn = try_ready!(self.conn.poll()); - - let view = conn.log_view(); - let (sink, stream) = conn.split(); - - // Forward every message from the outgoing channel to the sink. - let outgoing_future = sink.send_all( - self.rx_outgoing.take().unwrap().map_err::(|()| { - unreachable!("futures::sync::mpsc::Receiver should never return Err"); - }) - ).map(|_| ()); - - let client = IrcClient { - state: Arc::new(ClientState::new( - stream, self.tx_outgoing.take().unwrap(), self.config.clone() - )?), view, - }; - Ok(Async::Ready(PackedIrcClient(client, Box::new(outgoing_future)))) - } -} - -/// An `IrcClient` packaged with a future that drives its message sending. In order for the client -/// to actually work properly, this future _must_ be running. Without it, messages cannot be sent to -/// the server. -/// -/// This type should only be used by advanced users who are familiar with the implementation of this -/// crate. An easy to use abstraction that does not require this knowledge is available via -/// [`IrcReactors`](./reactor/struct.IrcReactor.html). -pub struct PackedIrcClient(pub IrcClient, pub Box>); - #[cfg(test)] mod test { use std::collections::HashMap; diff --git a/src/client/reactor.rs b/src/client/reactor.rs index 012864b..582e392 100644 --- a/src/client/reactor.rs +++ b/src/client/reactor.rs @@ -16,7 +16,7 @@ //! fn main() { //! let config = Config::default(); //! let mut reactor = IrcReactor::new().unwrap(); -//! let client = reactor.prepare_client_and_connect(&config).unwrap(); +//! let client = reactor.prepare_client_and_connect(config).unwrap(); //! reactor.register_client_with_handler(client, process_msg); //! reactor.run().unwrap(); //! } @@ -28,7 +28,7 @@ use futures::future; use tokio_core::reactor::{Core, Handle}; use client::data::Config; -use client::{IrcClient, IrcClientFuture, PackedIrcClient, Client}; +use client::{IrcClient, Client}; use error; use proto::Message; @@ -65,12 +65,15 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let future_client = IrcReactor::new().and_then(|mut reactor| { - /// reactor.prepare_client(&config) + /// Ok(reactor.prepare_client(config)) /// }); /// # } /// ``` - pub fn prepare_client<'a>(&mut self, config: &'a Config) -> error::Result> { - IrcClient::new_future(self.inner_handle(), config) + pub fn prepare_client(&mut self, config: Config) -> impl Future< + Item = (IrcClient, impl Future + 'static), + Error = error::IrcError + > { + IrcClient::new_future(config) } /// Runs an [`IrcClientFuture`](../struct.IrcClientFuture.html), such as one from @@ -84,14 +87,17 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let client = IrcReactor::new().and_then(|mut reactor| { - /// reactor.prepare_client(&config).and_then(|future| { - /// reactor.connect_client(future) - /// }) + /// let future = reactor.prepare_client(config); + /// reactor.connect_client(future) /// }); /// # } /// ``` - pub fn connect_client(&mut self, future: IrcClientFuture) -> error::Result { - self.inner.run(future).map(|PackedIrcClient(client, future)| { + pub fn connect_client(&mut self, future: F) -> error::Result + where + F: Future, + G: Future + 'static, + { + self.inner.run(future).map(|(client, future)| { self.register_future(future); client }) @@ -109,12 +115,13 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let client = IrcReactor::new().and_then(|mut reactor| { - /// reactor.prepare_client_and_connect(&config) + /// reactor.prepare_client_and_connect(config) /// }); /// # } /// ``` - pub fn prepare_client_and_connect(&mut self, config: &Config) -> error::Result { - self.prepare_client(config).and_then(|future| self.connect_client(future)) + pub fn prepare_client_and_connect(&mut self, config: Config) -> error::Result { + let client_future = self.prepare_client(config); + self.connect_client(client_future) } /// Registers the given client with the specified message handler. The reactor will store this @@ -132,7 +139,7 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let mut reactor = IrcReactor::new().unwrap(); - /// let client = reactor.prepare_client_and_connect(&config).unwrap(); + /// let client = reactor.prepare_client_and_connect(config).unwrap(); /// reactor.register_client_with_handler(client, |client, msg| { /// // Message processing happens here. /// Ok(()) @@ -178,7 +185,7 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let mut reactor = IrcReactor::new().unwrap(); - /// let client = reactor.prepare_client_and_connect(&config).unwrap(); + /// let client = reactor.prepare_client_and_connect(config).unwrap(); /// reactor.register_client_with_handler(client, process_msg); /// reactor.run().unwrap(); /// # } diff --git a/src/lib.rs b/src/lib.rs index 27c66f0..2fdd974 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,7 @@ extern crate serde_derive; extern crate serde_json; #[cfg(feature = "yaml")] extern crate serde_yaml; +extern crate tokio; extern crate tokio_core; extern crate tokio_io; extern crate tokio_mockstream;