From e220e90c58ff14fd545edf088e791ed9ed4cdfaf Mon Sep 17 00:00:00 2001 From: Aaron Weiss Date: Tue, 15 Aug 2017 14:00:32 -0400 Subject: [PATCH] Added the ability to create a new IrcServer without spawning a thread. --- examples/nothreads.rs | 58 ++++++++++++++++++++++++++++++++++++++++ src/client/server/mod.rs | 57 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 112 insertions(+), 3 deletions(-) create mode 100644 examples/nothreads.rs diff --git a/examples/nothreads.rs b/examples/nothreads.rs new file mode 100644 index 0000000..9185186 --- /dev/null +++ b/examples/nothreads.rs @@ -0,0 +1,58 @@ +extern crate futures; +extern crate irc; +extern crate tokio_core; + +use std::default::Default; +use futures::future; +use irc::error; +use irc::client::prelude::*; +use tokio_core::reactor::Core; + +fn main() { + let cfg1 = Config { + nickname: Some("pickles".to_owned()), + server: Some("irc.fyrechat.net".to_owned()), + channels: Some(vec!["#irc-crate".to_owned()]), + ..Default::default() + }; + + let cfg2 = Config { + nickname: Some("pickles".to_owned()), + server: Some("irc.pdgn.co".to_owned()), + channels: Some(vec!["#irc-crate".to_owned()]), + use_ssl: Some(true), + ..Default::default() + }; + + let configs = vec![cfg1, cfg2]; + + // Create an event loop to run the multiple connections on. + let mut reactor = Core::new().unwrap(); + let processor_handle = reactor.handle(); + + for config in configs { + let handle = reactor.handle(); + let server = reactor.run(IrcServer::new_future(handle, &config).unwrap()).unwrap(); + server.identify().unwrap(); + + processor_handle.spawn(server.stream().for_each(move |message| { + process_msg(&server, message) + }).map_err(|e| Err(e).unwrap())) + } + + // You might instead want to join all the futures and run them directly. + reactor.run(future::empty::<(), ()>()).unwrap(); +} + +fn process_msg(server: &IrcServer, message: Message) -> error::Result<()> { + print!("{}", message); + match message.command { + Command::PRIVMSG(ref target, ref msg) => { + if msg.contains("pickles") { + server.send_privmsg(target, "Hi!")?; + } + } + _ => (), + } + Ok(()) +} diff --git a/src/client/server/mod.rs b/src/client/server/mod.rs index d9355fc..0532b32 100644 --- a/src/client/server/mod.rs +++ b/src/client/server/mod.rs @@ -12,11 +12,11 @@ use futures::{Async, Poll, Future, Sink, Stream}; use futures::stream::SplitStream; use futures::sync::mpsc; use futures::sync::oneshot; -use futures::sync::mpsc::UnboundedSender; -use tokio_core::reactor::Core; +use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio_core::reactor::{Core, Handle}; use error; -use client::conn::Connection; +use client::conn::{Connection, ConnectionFuture}; use client::data::{Config, User}; use client::server::utils::ServerExt; use client::transport::LogView; @@ -566,6 +566,21 @@ impl IrcServer { }) } + /// Creates a new IRC server connection from the specified configration and on event loop + /// corresponding to the given handle. This can be used to set up a number of IrcServers on a + /// single, shared event loop. Connection will not occur until the event loop is run. + pub fn new_future<'a>(handle: Handle, config: &'a Config) -> error::Result> { + let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); + + Ok(IrcServerFuture { + conn: Connection::new(&config, &handle)?, + handle: handle, + config: config, + tx_outgoing: Some(tx_outgoing), + rx_outgoing: Some(rx_outgoing), + }) + } + /// Gets the current nickname in use. pub fn current_nickname(&self) -> &str { self.state.current_nickname() @@ -578,6 +593,42 @@ impl IrcServer { } } +/// A future representing the eventual creation of an `IrcServer`. +pub struct IrcServerFuture<'a> { + conn: ConnectionFuture<'a>, + handle: Handle, + config: &'a Config, + tx_outgoing: Option>, + rx_outgoing: Option>, +} + +impl<'a> Future for IrcServerFuture<'a> { + type Item = IrcServer; + type Error = error::Error; + + fn poll(&mut self) -> Poll { + let conn = try_ready!(self.conn.poll()); + + let view = conn.log_view(); + let (sink, stream) = conn.split(); + + let outgoing_future = sink.send_all(self.rx_outgoing.take().unwrap().map_err(|()| { + let res: error::Error = error::ErrorKind::ChannelError.into(); + res + })).map(|_| ()).map_err(|e| panic!(e)); + + self.handle.spawn(outgoing_future); + + Ok(Async::Ready(IrcServer { + state: Arc::new(ServerState::new( + stream, self.tx_outgoing.take().unwrap(), self.config.clone() + )), + view: view, + })) + } +} + + #[cfg(test)] mod test { use std::collections::HashMap;