Rewrote conn
module to utilize impl Future
in return position:
Changed all involved structs to take `Config`s, rather than borrow them - this is due to `'static` requirement that is bound to crop up somewhere, when spawning a future into a reactor/runtime. Updated examples and docs to reflect the change.
This commit is contained in:
parent
bfb8bf5a1b
commit
5275e79971
6 changed files with 59 additions and 51 deletions
|
@ -26,7 +26,7 @@ fn main() {
|
|||
for config in configs {
|
||||
// Immediate errors like failure to resolve the server's domain or to establish any connection will
|
||||
// manifest here in the result of prepare_client_and_connect.
|
||||
let client = reactor.prepare_client_and_connect(&config).unwrap();
|
||||
let client = reactor.prepare_client_and_connect(config).unwrap();
|
||||
client.identify().unwrap();
|
||||
// Here, we tell the reactor to setup this client for future handling (in run) using the specified
|
||||
// handler function process_msg.
|
||||
|
|
|
@ -14,7 +14,7 @@ fn main() {
|
|||
};
|
||||
|
||||
let mut reactor = IrcReactor::new().unwrap();
|
||||
let client = reactor.prepare_client_and_connect(&config).unwrap();
|
||||
let client = reactor.prepare_client_and_connect(config).unwrap();
|
||||
client.identify().unwrap();
|
||||
|
||||
reactor.register_client_with_handler(client, |client, message| {
|
||||
|
|
|
@ -26,7 +26,7 @@ fn main() {
|
|||
loop {
|
||||
let res = configs.iter().fold(Ok(()), |acc, config| {
|
||||
acc.and(
|
||||
reactor.prepare_client_and_connect(config).and_then(|client| {
|
||||
reactor.prepare_client_and_connect(config.clone()).and_then(|client| {
|
||||
client.identify().and(Ok(client))
|
||||
}).and_then(|client| {
|
||||
reactor.register_client_with_handler(client, process_msg);
|
||||
|
|
|
@ -2,10 +2,10 @@
|
|||
use std::fs::File;
|
||||
use std::fmt;
|
||||
use std::io::Read;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use encoding::EncoderTrap;
|
||||
use encoding::label::encoding_from_whatwg_label;
|
||||
use futures::future;
|
||||
use futures::{Async, Poll, Future, Sink, StartSend, Stream};
|
||||
use native_tls::{Certificate, TlsConnector, Pkcs12};
|
||||
use tokio::net::{ConnectFuture, TcpStream};
|
||||
|
@ -42,57 +42,63 @@ impl fmt::Debug for Connection {
|
|||
}
|
||||
}
|
||||
|
||||
/// A convenient type alias representing the `TlsStream` future.
|
||||
type TlsFuture = Box<Future<Error = error::IrcError, Item = TlsStream<TcpStream>> + Send>;
|
||||
|
||||
/// A future representing an eventual `Connection`.
|
||||
pub enum ConnectionFuture<'a> {
|
||||
pub enum ConnectionFuture {
|
||||
#[doc(hidden)]
|
||||
Unsecured(&'a Config, ConnectFuture),
|
||||
Unsecured(Config, ConnectFuture),
|
||||
#[doc(hidden)]
|
||||
Secured(&'a Config, TlsFuture),
|
||||
Secured(Config, ConnectFuture, TlsConnector),
|
||||
#[doc(hidden)]
|
||||
Mock(&'a Config),
|
||||
Mock(Config),
|
||||
}
|
||||
|
||||
impl<'a> fmt::Debug for ConnectionFuture<'a> {
|
||||
impl fmt::Debug for ConnectionFuture {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}({:?}, ...)",
|
||||
match *self {
|
||||
ConnectionFuture::Unsecured(_, _) => "ConnectionFuture::Unsecured",
|
||||
ConnectionFuture::Secured(_, _) => "ConnectionFuture::Secured",
|
||||
ConnectionFuture::Secured(_, _, _) => "ConnectionFuture::Secured",
|
||||
ConnectionFuture::Mock(_) => "ConnectionFuture::Mock",
|
||||
},
|
||||
match *self {
|
||||
ConnectionFuture::Unsecured(cfg, _) |
|
||||
ConnectionFuture::Secured(cfg, _) |
|
||||
ConnectionFuture::Mock(cfg) => cfg,
|
||||
ConnectionFuture::Unsecured(ref cfg, _) |
|
||||
ConnectionFuture::Secured(ref cfg, _, _) |
|
||||
ConnectionFuture::Mock(ref cfg) => cfg,
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Future for ConnectionFuture<'a> {
|
||||
impl Future for ConnectionFuture {
|
||||
type Item = Connection;
|
||||
type Error = error::IrcError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match *self {
|
||||
ConnectionFuture::Unsecured(config, ref mut inner) => {
|
||||
ConnectionFuture::Unsecured(ref config, ref mut inner) => {
|
||||
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
|
||||
let transport = IrcTransport::new(config, framed);
|
||||
|
||||
Ok(Async::Ready(Connection::Unsecured(transport)))
|
||||
}
|
||||
ConnectionFuture::Secured(config, ref mut inner) => {
|
||||
ConnectionFuture::Secured(ref config, ref mut inner, ref connector) => {
|
||||
let domain = format!("{}", config.server().expect("should already be tested"));
|
||||
let mut inner = inner.map_err(|e| {
|
||||
let res: error::IrcError = e.into();
|
||||
res
|
||||
}).and_then(move |socket| {
|
||||
connector.connect_async(&domain, socket).map_err(
|
||||
|e| e.into(),
|
||||
)
|
||||
});
|
||||
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
|
||||
let transport = IrcTransport::new(config, framed);
|
||||
|
||||
Ok(Async::Ready(Connection::Secured(transport)))
|
||||
}
|
||||
ConnectionFuture::Mock(config) => {
|
||||
ConnectionFuture::Mock(ref config) => {
|
||||
let enc: error::Result<_> = encoding_from_whatwg_label(
|
||||
config.encoding()
|
||||
).ok_or_else(|| error::IrcError::UnknownCodec {
|
||||
|
@ -119,8 +125,13 @@ impl<'a> Future for ConnectionFuture<'a> {
|
|||
}
|
||||
|
||||
impl Connection {
|
||||
/// Creates a new `Connection` using the specified `Config` and `Handle`.
|
||||
pub fn new<'a>(config: &'a Config) -> impl Future<Item = Connection, Error = error::IrcError> {
|
||||
/// Creates a future yielding a new `Connection` using the specified `Config`.
|
||||
pub fn new(config: Config) -> impl Future<Item = Connection, Error = error::IrcError> {
|
||||
future::lazy(move || Connection::new_inner(config))
|
||||
.and_then(|connection_future| connection_future)
|
||||
}
|
||||
|
||||
fn new_inner(config: Config) -> error::Result<ConnectionFuture> {
|
||||
if config.use_mock_connection() {
|
||||
Ok(ConnectionFuture::Mock(config))
|
||||
} else if config.use_ssl() {
|
||||
|
@ -145,20 +156,18 @@ impl Connection {
|
|||
info!("Using {} for client certificate authentication.", client_cert_path);
|
||||
}
|
||||
let connector = builder.build()?;
|
||||
let stream = Box::new(TcpStream::connect(&config.socket_addr()?).map_err(|e| {
|
||||
let res: error::IrcError = e.into();
|
||||
res
|
||||
}).and_then(move |socket| {
|
||||
connector.connect_async(&domain, socket).map_err(
|
||||
|e| e.into(),
|
||||
)
|
||||
}));
|
||||
Ok(ConnectionFuture::Secured(config, stream))
|
||||
let socket_addr = config.socket_addr()?;
|
||||
Ok(ConnectionFuture::Secured(
|
||||
config,
|
||||
TcpStream::connect(&socket_addr),
|
||||
connector
|
||||
))
|
||||
} else {
|
||||
info!("Connecting to {}.", config.server()?);
|
||||
let socket_addr = config.socket_addr()?;
|
||||
Ok(ConnectionFuture::Unsecured(
|
||||
config,
|
||||
TcpStream::connect(&config.socket_addr()?),
|
||||
TcpStream::connect(&socket_addr),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -754,7 +754,7 @@ impl IrcClient {
|
|||
// will instead panic.
|
||||
let _ = thread::spawn(move || {
|
||||
let mut reactor = Core::new().unwrap();
|
||||
let conn = reactor.run(Connection::new(&cfg)).unwrap();
|
||||
let conn = reactor.run(Connection::new(cfg)).unwrap();
|
||||
|
||||
tx_view.send(conn.log_view()).unwrap();
|
||||
let (sink, stream) = conn.split();
|
||||
|
@ -806,9 +806,9 @@ impl IrcClient {
|
|||
/// # .. Default::default()
|
||||
/// # };
|
||||
/// let mut reactor = Core::new().unwrap();
|
||||
/// let future = IrcClient::new_future(&config).unwrap();
|
||||
/// let future = IrcClient::new_future(config);
|
||||
/// // immediate connection errors (like no internet) will turn up here...
|
||||
/// let PackedIrcClient(client, future) = reactor.run(future).unwrap();
|
||||
/// let (client, future) = reactor.run(future).unwrap();
|
||||
/// // runtime errors (like disconnections and so forth) will turn up here...
|
||||
/// reactor.run(client.stream().for_each(move |irc_msg| {
|
||||
/// // processing messages works like usual
|
||||
|
@ -817,12 +817,12 @@ impl IrcClient {
|
|||
/// # }
|
||||
/// # fn process_msg(server: &IrcClient, message: Message) -> error::Result<()> { Ok(()) }
|
||||
/// ```
|
||||
pub fn new_future(config: &Config) -> impl Future<
|
||||
pub fn new_future(config: Config) -> impl Future<
|
||||
Item = (IrcClient, impl Future<Item = (), Error = error::IrcError> + 'static),
|
||||
Error = error::IrcError
|
||||
> {
|
||||
Connection::new(config)
|
||||
.and_then(|connection| {
|
||||
Connection::new(config.clone())
|
||||
.and_then(move |connection| {
|
||||
let (tx_outgoing, rx_outgoing) = mpsc::unbounded();
|
||||
let log_view = connection.log_view();
|
||||
let (sink, stream) = connection.split();
|
||||
|
@ -831,7 +831,7 @@ impl IrcClient {
|
|||
unreachable!("futures::sync::mpsc::Receiver should never return Err");
|
||||
})
|
||||
).map(|_| ());
|
||||
ClientState::new(stream, tx_outgoing, config.clone()).map(|state| {
|
||||
ClientState::new(stream, tx_outgoing, config).map(|state| {
|
||||
let client = IrcClient {
|
||||
state: Arc::new(state),
|
||||
view: log_view,
|
||||
|
@ -866,7 +866,7 @@ impl IrcClient {
|
|||
/// not require this knowledge is available via [`IrcReactors`](./reactor/struct.IrcReactor.html).
|
||||
#[derive(Debug)]
|
||||
pub struct IrcClientFuture<'a> {
|
||||
conn: ConnectionFuture<'a>,
|
||||
conn: ConnectionFuture,
|
||||
config: &'a Config,
|
||||
tx_outgoing: Option<UnboundedSender<Message>>,
|
||||
rx_outgoing: Option<UnboundedReceiver<Message>>,
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
//! fn main() {
|
||||
//! let config = Config::default();
|
||||
//! let mut reactor = IrcReactor::new().unwrap();
|
||||
//! let client = reactor.prepare_client_and_connect(&config).unwrap();
|
||||
//! let client = reactor.prepare_client_and_connect(config).unwrap();
|
||||
//! reactor.register_client_with_handler(client, process_msg);
|
||||
//! reactor.run().unwrap();
|
||||
//! }
|
||||
|
@ -28,7 +28,7 @@ use futures::future;
|
|||
use tokio_core::reactor::{Core, Handle};
|
||||
|
||||
use client::data::Config;
|
||||
use client::{IrcClient, IrcClientFuture, PackedIrcClient, Client};
|
||||
use client::{IrcClient, Client};
|
||||
use error;
|
||||
use proto::Message;
|
||||
|
||||
|
@ -65,11 +65,11 @@ impl IrcReactor {
|
|||
/// # fn main() {
|
||||
/// # let config = Config::default();
|
||||
/// let future_client = IrcReactor::new().and_then(|mut reactor| {
|
||||
/// reactor.prepare_client(&config)
|
||||
/// Ok(reactor.prepare_client(config))
|
||||
/// });
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn prepare_client<'a>(&mut self, config: &'a Config) -> impl Future<
|
||||
pub fn prepare_client(&mut self, config: Config) -> impl Future<
|
||||
Item = (IrcClient, impl Future<Item = (), Error = error::IrcError> + 'static),
|
||||
Error = error::IrcError
|
||||
> {
|
||||
|
@ -87,9 +87,8 @@ impl IrcReactor {
|
|||
/// # fn main() {
|
||||
/// # let config = Config::default();
|
||||
/// let client = IrcReactor::new().and_then(|mut reactor| {
|
||||
/// reactor.prepare_client(&config).and_then(|future| {
|
||||
/// reactor.connect_client(future)
|
||||
/// })
|
||||
/// let future = reactor.prepare_client(config);
|
||||
/// reactor.connect_client(future)
|
||||
/// });
|
||||
/// # }
|
||||
/// ```
|
||||
|
@ -116,11 +115,11 @@ impl IrcReactor {
|
|||
/// # fn main() {
|
||||
/// # let config = Config::default();
|
||||
/// let client = IrcReactor::new().and_then(|mut reactor| {
|
||||
/// reactor.prepare_client_and_connect(&config)
|
||||
/// reactor.prepare_client_and_connect(config)
|
||||
/// });
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn prepare_client_and_connect(&mut self, config: &Config) -> error::Result<IrcClient> {
|
||||
pub fn prepare_client_and_connect(&mut self, config: Config) -> error::Result<IrcClient> {
|
||||
let client_future = self.prepare_client(config);
|
||||
self.connect_client(client_future)
|
||||
}
|
||||
|
@ -140,7 +139,7 @@ impl IrcReactor {
|
|||
/// # fn main() {
|
||||
/// # let config = Config::default();
|
||||
/// let mut reactor = IrcReactor::new().unwrap();
|
||||
/// let client = reactor.prepare_client_and_connect(&config).unwrap();
|
||||
/// let client = reactor.prepare_client_and_connect(config).unwrap();
|
||||
/// reactor.register_client_with_handler(client, |client, msg| {
|
||||
/// // Message processing happens here.
|
||||
/// Ok(())
|
||||
|
@ -186,7 +185,7 @@ impl IrcReactor {
|
|||
/// # fn main() {
|
||||
/// # let config = Config::default();
|
||||
/// let mut reactor = IrcReactor::new().unwrap();
|
||||
/// let client = reactor.prepare_client_and_connect(&config).unwrap();
|
||||
/// let client = reactor.prepare_client_and_connect(config).unwrap();
|
||||
/// reactor.register_client_with_handler(client, process_msg);
|
||||
/// reactor.run().unwrap();
|
||||
/// # }
|
||||
|
|
Loading…
Reference in a new issue