From bbc6b0244d8e678e9156921340242e098cdfe3ca Mon Sep 17 00:00:00 2001 From: Aaron Weiss Date: Mon, 8 Jan 2018 21:52:56 -0500 Subject: [PATCH] Added an experimental reactor API to hide tokio. --- examples/nothreads.rs | 24 +++++--------------- src/client/mod.rs | 2 ++ src/client/reactor.rs | 52 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 18 deletions(-) create mode 100644 src/client/reactor.rs diff --git a/examples/nothreads.rs b/examples/nothreads.rs index 9185186..fbec2e1 100644 --- a/examples/nothreads.rs +++ b/examples/nothreads.rs @@ -1,12 +1,8 @@ -extern crate futures; extern crate irc; -extern crate tokio_core; use std::default::Default; -use futures::future; use irc::error; use irc::client::prelude::*; -use tokio_core::reactor::Core; fn main() { let cfg1 = Config { @@ -17,31 +13,23 @@ fn main() { }; let cfg2 = Config { - nickname: Some("pickles".to_owned()), - server: Some("irc.pdgn.co".to_owned()), + nickname: Some("bananas".to_owned()), + server: Some("irc.fyrechat.net".to_owned()), channels: Some(vec!["#irc-crate".to_owned()]), - use_ssl: Some(true), ..Default::default() }; let configs = vec![cfg1, cfg2]; - // Create an event loop to run the multiple connections on. - let mut reactor = Core::new().unwrap(); - let processor_handle = reactor.handle(); + let mut reactor = IrcReactor::new().unwrap(); for config in configs { - let handle = reactor.handle(); - let server = reactor.run(IrcServer::new_future(handle, &config).unwrap()).unwrap(); + let server = reactor.prepare_server_and_connect(&config).unwrap(); server.identify().unwrap(); - - processor_handle.spawn(server.stream().for_each(move |message| { - process_msg(&server, message) - }).map_err(|e| Err(e).unwrap())) + reactor.register_server_with_handler(server, process_msg); } - // You might instead want to join all the futures and run them directly. - reactor.run(future::empty::<(), ()>()).unwrap(); + reactor.run().unwrap(); } fn process_msg(server: &IrcServer, message: Message) -> error::Result<()> { diff --git a/src/client/mod.rs b/src/client/mod.rs index 45085e7..4ab8f92 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -2,6 +2,7 @@ pub mod conn; pub mod data; +pub mod reactor; pub mod server; pub mod transport; @@ -29,6 +30,7 @@ pub mod prelude { //! as well as in the parsed form of received mode commands. pub use client::data::Config; + pub use client::reactor::IrcReactor; pub use client::server::{EachIncomingExt, IrcServer, Server}; pub use client::server::utils::ServerExt; pub use proto::{Capability, ChannelExt, Command, Message, NegotiationVersion, Response}; diff --git a/src/client/reactor.rs b/src/client/reactor.rs new file mode 100644 index 0000000..3817cc2 --- /dev/null +++ b/src/client/reactor.rs @@ -0,0 +1,52 @@ +//! A system for creating and managing multiple connections to IRC servers. + +use client::data::Config; +use client::server::{IrcServer, IrcServerFuture, Server}; +use error; +use proto::Message; + +use futures::{Future, Stream}; +use futures::future; +use tokio_core::reactor::Core; + +pub struct IrcReactor { + inner: Core, + handlers: Vec>>, +} + +impl IrcReactor { + pub fn new() -> error::Result { + Ok(IrcReactor { + inner: Core::new()?, + handlers: Vec::new(), + }) + } + + pub fn prepare_server<'a>(&mut self, config: &'a Config) -> error::Result> { + IrcServer::new_future(self.inner.handle(), config) + } + + pub fn connect_server(&mut self, future: IrcServerFuture) -> error::Result { + self.inner.run(future) + } + + pub fn prepare_server_and_connect(&mut self, config: &Config) -> error::Result { + self.prepare_server(config).and_then(|future| self.connect_server(future)) + } + + pub fn register_server_with_handler( + &mut self, server: IrcServer, handler: F + ) where F: Fn(&IrcServer, Message) -> error::Result<()> + 'static { + self.handlers.push(Box::new(server.stream().for_each(move |message| { + handler(&server, message) + }))); + } + + pub fn run(&mut self) -> error::Result<()> { + let mut handlers = Vec::new(); + while let Some(handler) = self.handlers.pop() { + handlers.push(handler); + } + self.inner.run(future::join_all(handlers).map(|_| ())) + } +}