From 5275e799717ba58101d71b868bd227e61b5776c4 Mon Sep 17 00:00:00 2001 From: Ratys Date: Fri, 8 Jun 2018 22:00:49 +0300 Subject: [PATCH] Rewrote `conn` module to utilize `impl Future` in return position: Changed all involved structs to take `Config`s, rather than borrow them - this is due to `'static` requirement that is bound to crop up somewhere, when spawning a future into a reactor/runtime. Updated examples and docs to reflect the change. --- examples/multiserver.rs | 2 +- examples/reactor.rs | 2 +- examples/reconnector.rs | 2 +- src/client/conn.rs | 67 +++++++++++++++++++++++------------------ src/client/mod.rs | 16 +++++----- src/client/reactor.rs | 21 ++++++------- 6 files changed, 59 insertions(+), 51 deletions(-) diff --git a/examples/multiserver.rs b/examples/multiserver.rs index c88311e..57e98b8 100644 --- a/examples/multiserver.rs +++ b/examples/multiserver.rs @@ -26,7 +26,7 @@ fn main() { for config in configs { // Immediate errors like failure to resolve the server's domain or to establish any connection will // manifest here in the result of prepare_client_and_connect. - let client = reactor.prepare_client_and_connect(&config).unwrap(); + let client = reactor.prepare_client_and_connect(config).unwrap(); client.identify().unwrap(); // Here, we tell the reactor to setup this client for future handling (in run) using the specified // handler function process_msg. diff --git a/examples/reactor.rs b/examples/reactor.rs index 66dea6b..fd201f2 100644 --- a/examples/reactor.rs +++ b/examples/reactor.rs @@ -14,7 +14,7 @@ fn main() { }; let mut reactor = IrcReactor::new().unwrap(); - let client = reactor.prepare_client_and_connect(&config).unwrap(); + let client = reactor.prepare_client_and_connect(config).unwrap(); client.identify().unwrap(); reactor.register_client_with_handler(client, |client, message| { diff --git a/examples/reconnector.rs b/examples/reconnector.rs index f879f48..579f335 100644 --- a/examples/reconnector.rs +++ b/examples/reconnector.rs @@ -26,7 +26,7 @@ fn main() { loop { let res = configs.iter().fold(Ok(()), |acc, config| { acc.and( - reactor.prepare_client_and_connect(config).and_then(|client| { + reactor.prepare_client_and_connect(config.clone()).and_then(|client| { client.identify().and(Ok(client)) }).and_then(|client| { reactor.register_client_with_handler(client, process_msg); diff --git a/src/client/conn.rs b/src/client/conn.rs index ed3fb27..d9d1c12 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -2,10 +2,10 @@ use std::fs::File; use std::fmt; use std::io::Read; -use std::net::SocketAddr; use encoding::EncoderTrap; use encoding::label::encoding_from_whatwg_label; +use futures::future; use futures::{Async, Poll, Future, Sink, StartSend, Stream}; use native_tls::{Certificate, TlsConnector, Pkcs12}; use tokio::net::{ConnectFuture, TcpStream}; @@ -42,57 +42,63 @@ impl fmt::Debug for Connection { } } -/// A convenient type alias representing the `TlsStream` future. -type TlsFuture = Box> + Send>; - /// A future representing an eventual `Connection`. -pub enum ConnectionFuture<'a> { +pub enum ConnectionFuture { #[doc(hidden)] - Unsecured(&'a Config, ConnectFuture), + Unsecured(Config, ConnectFuture), #[doc(hidden)] - Secured(&'a Config, TlsFuture), + Secured(Config, ConnectFuture, TlsConnector), #[doc(hidden)] - Mock(&'a Config), + Mock(Config), } -impl<'a> fmt::Debug for ConnectionFuture<'a> { +impl fmt::Debug for ConnectionFuture { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "{}({:?}, ...)", match *self { ConnectionFuture::Unsecured(_, _) => "ConnectionFuture::Unsecured", - ConnectionFuture::Secured(_, _) => "ConnectionFuture::Secured", + ConnectionFuture::Secured(_, _, _) => "ConnectionFuture::Secured", ConnectionFuture::Mock(_) => "ConnectionFuture::Mock", }, match *self { - ConnectionFuture::Unsecured(cfg, _) | - ConnectionFuture::Secured(cfg, _) | - ConnectionFuture::Mock(cfg) => cfg, + ConnectionFuture::Unsecured(ref cfg, _) | + ConnectionFuture::Secured(ref cfg, _, _) | + ConnectionFuture::Mock(ref cfg) => cfg, } ) } } -impl<'a> Future for ConnectionFuture<'a> { +impl Future for ConnectionFuture { type Item = Connection; type Error = error::IrcError; fn poll(&mut self) -> Poll { match *self { - ConnectionFuture::Unsecured(config, ref mut inner) => { + ConnectionFuture::Unsecured(ref config, ref mut inner) => { let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); let transport = IrcTransport::new(config, framed); Ok(Async::Ready(Connection::Unsecured(transport))) } - ConnectionFuture::Secured(config, ref mut inner) => { + ConnectionFuture::Secured(ref config, ref mut inner, ref connector) => { + let domain = format!("{}", config.server().expect("should already be tested")); + let mut inner = inner.map_err(|e| { + let res: error::IrcError = e.into(); + res + }).and_then(move |socket| { + connector.connect_async(&domain, socket).map_err( + |e| e.into(), + ) + }); let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); let transport = IrcTransport::new(config, framed); Ok(Async::Ready(Connection::Secured(transport))) } - ConnectionFuture::Mock(config) => { + ConnectionFuture::Mock(ref config) => { let enc: error::Result<_> = encoding_from_whatwg_label( config.encoding() ).ok_or_else(|| error::IrcError::UnknownCodec { @@ -119,8 +125,13 @@ impl<'a> Future for ConnectionFuture<'a> { } impl Connection { - /// Creates a new `Connection` using the specified `Config` and `Handle`. - pub fn new<'a>(config: &'a Config) -> impl Future { + /// Creates a future yielding a new `Connection` using the specified `Config`. + pub fn new(config: Config) -> impl Future { + future::lazy(move || Connection::new_inner(config)) + .and_then(|connection_future| connection_future) + } + + fn new_inner(config: Config) -> error::Result { if config.use_mock_connection() { Ok(ConnectionFuture::Mock(config)) } else if config.use_ssl() { @@ -145,20 +156,18 @@ impl Connection { info!("Using {} for client certificate authentication.", client_cert_path); } let connector = builder.build()?; - let stream = Box::new(TcpStream::connect(&config.socket_addr()?).map_err(|e| { - let res: error::IrcError = e.into(); - res - }).and_then(move |socket| { - connector.connect_async(&domain, socket).map_err( - |e| e.into(), - ) - })); - Ok(ConnectionFuture::Secured(config, stream)) + let socket_addr = config.socket_addr()?; + Ok(ConnectionFuture::Secured( + config, + TcpStream::connect(&socket_addr), + connector + )) } else { info!("Connecting to {}.", config.server()?); + let socket_addr = config.socket_addr()?; Ok(ConnectionFuture::Unsecured( config, - TcpStream::connect(&config.socket_addr()?), + TcpStream::connect(&socket_addr), )) } } diff --git a/src/client/mod.rs b/src/client/mod.rs index e4ed120..6cb1000 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -754,7 +754,7 @@ impl IrcClient { // will instead panic. let _ = thread::spawn(move || { let mut reactor = Core::new().unwrap(); - let conn = reactor.run(Connection::new(&cfg)).unwrap(); + let conn = reactor.run(Connection::new(cfg)).unwrap(); tx_view.send(conn.log_view()).unwrap(); let (sink, stream) = conn.split(); @@ -806,9 +806,9 @@ impl IrcClient { /// # .. Default::default() /// # }; /// let mut reactor = Core::new().unwrap(); - /// let future = IrcClient::new_future(&config).unwrap(); + /// let future = IrcClient::new_future(config); /// // immediate connection errors (like no internet) will turn up here... - /// let PackedIrcClient(client, future) = reactor.run(future).unwrap(); + /// let (client, future) = reactor.run(future).unwrap(); /// // runtime errors (like disconnections and so forth) will turn up here... /// reactor.run(client.stream().for_each(move |irc_msg| { /// // processing messages works like usual @@ -817,12 +817,12 @@ impl IrcClient { /// # } /// # fn process_msg(server: &IrcClient, message: Message) -> error::Result<()> { Ok(()) } /// ``` - pub fn new_future(config: &Config) -> impl Future< + pub fn new_future(config: Config) -> impl Future< Item = (IrcClient, impl Future + 'static), Error = error::IrcError > { - Connection::new(config) - .and_then(|connection| { + Connection::new(config.clone()) + .and_then(move |connection| { let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); let log_view = connection.log_view(); let (sink, stream) = connection.split(); @@ -831,7 +831,7 @@ impl IrcClient { unreachable!("futures::sync::mpsc::Receiver should never return Err"); }) ).map(|_| ()); - ClientState::new(stream, tx_outgoing, config.clone()).map(|state| { + ClientState::new(stream, tx_outgoing, config).map(|state| { let client = IrcClient { state: Arc::new(state), view: log_view, @@ -866,7 +866,7 @@ impl IrcClient { /// not require this knowledge is available via [`IrcReactors`](./reactor/struct.IrcReactor.html). #[derive(Debug)] pub struct IrcClientFuture<'a> { - conn: ConnectionFuture<'a>, + conn: ConnectionFuture, config: &'a Config, tx_outgoing: Option>, rx_outgoing: Option>, diff --git a/src/client/reactor.rs b/src/client/reactor.rs index 27cd711..582e392 100644 --- a/src/client/reactor.rs +++ b/src/client/reactor.rs @@ -16,7 +16,7 @@ //! fn main() { //! let config = Config::default(); //! let mut reactor = IrcReactor::new().unwrap(); -//! let client = reactor.prepare_client_and_connect(&config).unwrap(); +//! let client = reactor.prepare_client_and_connect(config).unwrap(); //! reactor.register_client_with_handler(client, process_msg); //! reactor.run().unwrap(); //! } @@ -28,7 +28,7 @@ use futures::future; use tokio_core::reactor::{Core, Handle}; use client::data::Config; -use client::{IrcClient, IrcClientFuture, PackedIrcClient, Client}; +use client::{IrcClient, Client}; use error; use proto::Message; @@ -65,11 +65,11 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let future_client = IrcReactor::new().and_then(|mut reactor| { - /// reactor.prepare_client(&config) + /// Ok(reactor.prepare_client(config)) /// }); /// # } /// ``` - pub fn prepare_client<'a>(&mut self, config: &'a Config) -> impl Future< + pub fn prepare_client(&mut self, config: Config) -> impl Future< Item = (IrcClient, impl Future + 'static), Error = error::IrcError > { @@ -87,9 +87,8 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let client = IrcReactor::new().and_then(|mut reactor| { - /// reactor.prepare_client(&config).and_then(|future| { - /// reactor.connect_client(future) - /// }) + /// let future = reactor.prepare_client(config); + /// reactor.connect_client(future) /// }); /// # } /// ``` @@ -116,11 +115,11 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let client = IrcReactor::new().and_then(|mut reactor| { - /// reactor.prepare_client_and_connect(&config) + /// reactor.prepare_client_and_connect(config) /// }); /// # } /// ``` - pub fn prepare_client_and_connect(&mut self, config: &Config) -> error::Result { + pub fn prepare_client_and_connect(&mut self, config: Config) -> error::Result { let client_future = self.prepare_client(config); self.connect_client(client_future) } @@ -140,7 +139,7 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let mut reactor = IrcReactor::new().unwrap(); - /// let client = reactor.prepare_client_and_connect(&config).unwrap(); + /// let client = reactor.prepare_client_and_connect(config).unwrap(); /// reactor.register_client_with_handler(client, |client, msg| { /// // Message processing happens here. /// Ok(()) @@ -186,7 +185,7 @@ impl IrcReactor { /// # fn main() { /// # let config = Config::default(); /// let mut reactor = IrcReactor::new().unwrap(); - /// let client = reactor.prepare_client_and_connect(&config).unwrap(); + /// let client = reactor.prepare_client_and_connect(config).unwrap(); /// reactor.register_client_with_handler(client, process_msg); /// reactor.run().unwrap(); /// # }