Merge branch 'irc-reactor' into develop

This commit is contained in:
Aaron Weiss 2018-01-27 21:19:38 +01:00
commit 8782f66de4
No known key found for this signature in database
GPG key ID: 047D32DF25DC22EF
6 changed files with 300 additions and 103 deletions

View file

@ -1,12 +1,8 @@
extern crate futures;
extern crate irc;
extern crate tokio_core;
use std::default::Default;
use futures::future;
use irc::error;
use irc::client::prelude::*;
use tokio_core::reactor::Core;
fn main() {
let cfg1 = Config {
@ -17,31 +13,28 @@ fn main() {
};
let cfg2 = Config {
nickname: Some("pickles".to_owned()),
server: Some("irc.pdgn.co".to_owned()),
nickname: Some("bananas".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
use_ssl: Some(true),
..Default::default()
};
let configs = vec![cfg1, cfg2];
// Create an event loop to run the multiple connections on.
let mut reactor = Core::new().unwrap();
let processor_handle = reactor.handle();
let mut reactor = IrcReactor::new().unwrap();
for config in configs {
let handle = reactor.handle();
let server = reactor.run(IrcServer::new_future(handle, &config).unwrap()).unwrap();
// Immediate errors like failure to resolve the server's name or to establish any connection will
// manifest here in the result of prepare_server_and_connect.
let server = reactor.prepare_server_and_connect(&config).unwrap();
server.identify().unwrap();
processor_handle.spawn(server.stream().for_each(move |message| {
process_msg(&server, message)
}).map_err(|e| Err(e).unwrap()))
// Here, we tell the reactor to setup this server for future handling (in run) using the specified
// handler function process_msg.
reactor.register_server_with_handler(server, process_msg);
}
// You might instead want to join all the futures and run them directly.
reactor.run(future::empty::<(), ()>()).unwrap();
// Runtime errors like a dropped connection will manifest here in the result of run.
reactor.run().unwrap();
}
fn process_msg(server: &IrcServer, message: Message) -> error::Result<()> {

View file

@ -1,62 +0,0 @@
extern crate futures;
extern crate irc;
extern crate tokio_core;
use std::default::Default;
use futures::future;
use irc::error;
use irc::client::prelude::*;
use tokio_core::reactor::Core;
fn main() {
let cfg1 = Config {
nickname: Some("pickles1".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
..Default::default()
};
let cfg2 = Config {
nickname: Some("pickles2".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
..Default::default()
};
let configs = vec![cfg1, cfg2];
let (futures, mut reactor) = configs.iter().fold(
(vec![], Core::new().unwrap()),
|(mut acc, mut reactor), config| {
let handle = reactor.handle();
// First, we run the future representing the connection to the server.
// After this is complete, we have connected and can send and receive messages.
let server = reactor.run(IrcServer::new_future(handle, config).unwrap()).unwrap();
server.identify().unwrap();
// Add the future for processing messages from the current server to the accumulator.
acc.push(server.stream().for_each(move |message| {
process_msg(&server, message)
}));
// We then thread through the updated accumulator and the reactor.
(acc, reactor)
}
);
// Here, we join on all of the futures representing the message handling for each server.
reactor.run(future::join_all(futures)).unwrap();
}
fn process_msg(server: &IrcServer, message: Message) -> error::Result<()> {
print!("{}", message);
match message.command {
Command::PRIVMSG(ref target, ref msg) => {
if msg.contains("pickles") {
server.send_privmsg(target, "Hi!")?;
}
}
_ => (),
}
Ok(())
}

60
examples/reconnector.rs Normal file
View file

@ -0,0 +1,60 @@
extern crate irc;
use std::default::Default;
use irc::error;
use irc::client::prelude::*;
fn main() {
let cfg1 = Config {
nickname: Some("pickles".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
..Default::default()
};
let cfg2 = Config {
nickname: Some("bananas".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
..Default::default()
};
let configs = vec![cfg1, cfg2];
let mut reactor = IrcReactor::new().unwrap();
loop {
let res = configs.iter().fold(Ok(()), |acc, config| {
acc.and(
reactor.prepare_server_and_connect(config).and_then(|server| {
server.identify().and(Ok(server))
}).and_then(|server| {
reactor.register_server_with_handler(server, process_msg);
Ok(())
})
)
}).and_then(|()| reactor.run());
match res {
// The connections ended normally (for example, they sent a QUIT message to the server).
Ok(_) => break,
// Something went wrong! We'll print the error, and restart the connections.
Err(e) => eprintln!("{}", e),
}
}
}
fn process_msg(server: &IrcServer, message: Message) -> error::Result<()> {
print!("{}", message);
match message.command {
Command::PRIVMSG(ref target, ref msg) => {
if msg.contains("pickles") {
server.send_privmsg(target, "Hi!")?;
} else if msg.contains("quit") {
server.send_quit("bye")?;
}
}
_ => (),
}
Ok(())
}

View file

@ -2,6 +2,7 @@
pub mod conn;
pub mod data;
pub mod reactor;
pub mod server;
pub mod transport;
@ -16,7 +17,8 @@ pub mod prelude {
//! method to send `Commands` because it makes it easy to see the whole set of possible
//! interactions with a server. The `ServerExt` trait addresses this deficiency by defining a
//! number of methods that provide a more clear and succinct interface for sending various
//! common IRC commands to the server.
//! common IRC commands to the server. An `IrcReactor` can be used to create and manage multiple
//! `IrcServers` with more fine-grained control over error management.
//!
//! The various `proto` types capture details of the IRC protocol that are used throughout the
//! client API. `Message`, `Command`, and `Response` are used to send and receive messages along
@ -29,6 +31,7 @@ pub mod prelude {
//! as well as in the parsed form of received mode commands.
pub use client::data::Config;
pub use client::reactor::IrcReactor;
pub use client::server::{EachIncomingExt, IrcServer, Server};
pub use client::server::utils::ServerExt;
pub use proto::{Capability, ChannelExt, Command, Message, NegotiationVersion, Response};

190
src/client/reactor.rs Normal file
View file

@ -0,0 +1,190 @@
//! A system for creating and managing IRC server connections.
//!
//! This API provides the ability to create and manage multiple IRC servers that can run on the same
//! thread through the use of a shared event loop. It can also be used to encapsulate the dependency
//! on `tokio` and `futures` in the use of `IrcServer::new_future`. This means that knowledge of
//! those libraries should be unnecessary for the average user. Nevertheless, this API also provides
//! some escape hatches that let advanced users take further advantage of these dependencies.
//!
//! # Example
//! ```no_run
//! # extern crate irc;
//! # use std::default::Default;
//! use irc::client::prelude::*;
//! use irc::error;
//!
//! fn main() {
//! let config = Config::default();
//! let mut reactor = IrcReactor::new().unwrap();
//! let server = reactor.prepare_server_and_connect(&config).unwrap();
//! reactor.register_server_with_handler(server, process_msg);
//! reactor.run().unwrap();
//! }
//! # fn process_msg(server: &IrcServer, message: Message) -> error::Result<()> { Ok(()) }
//! ```
use futures::{Future, IntoFuture, Stream};
use futures::future;
use tokio_core::reactor::{Core, Handle};
use client::data::Config;
use client::server::{IrcServer, IrcServerFuture, PackedIrcServer, Server};
use error;
use proto::Message;
/// A thin wrapper over an event loop.
///
/// An IRC reactor is used to create new connections to IRC servers and to drive the management of
/// all connected servers as the application runs. It can be used to run multiple servers on the
/// same thread, as well as to get better control over error management in an IRC client.
///
/// For a full example usage, see [irc::client::reactor](./index.html).
pub struct IrcReactor {
inner: Core,
handlers: Vec<Box<Future<Item = (), Error = error::Error>>>,
}
impl IrcReactor {
/// Creates a new reactor.
pub fn new() -> error::Result<IrcReactor> {
Ok(IrcReactor {
inner: Core::new()?,
handlers: Vec::new(),
})
}
/// Creates a representation of an IRC server that has not yet attempted to connect. In
/// particular, this representation is as a Future that when run will produce a connected
/// [IrcServer](./server/struct.IrcServer.html).
///
/// # Example
/// ```no_run
/// # extern crate irc;
/// # use std::default::Default;
/// # use irc::client::prelude::*;
/// # fn main() {
/// # let config = Config::default();
/// let future_server = IrcReactor::new().and_then(|mut reactor| {
/// reactor.prepare_server(&config)
/// });
/// # }
/// ```
pub fn prepare_server<'a>(&mut self, config: &'a Config) -> error::Result<IrcServerFuture<'a>> {
IrcServer::new_future(self.inner_handle(), config)
}
/// Runs an [IrcServerFuture](./server/struct.IrcServerFuture.html), such as one from
/// `prepare_server` to completion, yielding an [IrcServer](./server/struct.IrcServer.html).
///
/// # Example
/// ```no_run
/// # extern crate irc;
/// # use std::default::Default;
/// # use irc::client::prelude::*;
/// # fn main() {
/// # let config = Config::default();
/// let server = IrcReactor::new().and_then(|mut reactor| {
/// reactor.prepare_server(&config).and_then(|future| {
/// reactor.connect_server(future)
/// })
/// });
/// # }
/// ```
pub fn connect_server(&mut self, future: IrcServerFuture) -> error::Result<IrcServer> {
self.inner.run(future).map(|PackedIrcServer(server, future)| {
self.register_future(future);
server
})
}
/// Creates a new IRC server from the specified configuration, connecting immediately. This is
/// guaranteed to be the composition of prepare_server and connect_server.
///
/// # Example
/// ```no_run
/// # extern crate irc;
/// # use std::default::Default;
/// # use irc::client::prelude::*;
/// # fn main() {
/// # let config = Config::default();
/// let server = IrcReactor::new().and_then(|mut reactor| {
/// reactor.prepare_server_and_connect(&config)
/// });
/// # }
/// ```
pub fn prepare_server_and_connect(&mut self, config: &Config) -> error::Result<IrcServer> {
self.prepare_server(config).and_then(|future| self.connect_server(future))
}
/// Registers the given server with the specified message handler. The reactor will store this
/// setup until the next call to run, where it will be used to process new messages over the
/// connection indefinitely (or until failure). As registration is consumed by `run`, subsequent
/// calls to run will require new registration.
///
/// # Example
/// ```no_run
/// # extern crate irc;
/// # use std::default::Default;
/// # use irc::client::prelude::*;
/// # fn main() {
/// # let config = Config::default();
/// let mut reactor = IrcReactor::new().unwrap();
/// let server = reactor.prepare_server_and_connect(&config).unwrap();
/// reactor.register_server_with_handler(server, |server, msg| {
/// // Message processing happens here.
/// Ok(())
/// })
/// # }
/// ```
pub fn register_server_with_handler<F, U>(
&mut self, server: IrcServer, handler: F
) where F: Fn(&IrcServer, Message) -> U + 'static,
U: IntoFuture<Item = (), Error = error::Error> + 'static {
self.handlers.push(Box::new(server.stream().for_each(move |message| {
handler(&server, message)
})));
}
/// Registers an arbitrary future with this reactor. This is a sort of escape hatch that allows
/// you to take more control over what runs on the reactor without requiring you to bring in
/// additional knowledge about `tokio`. It is suspected that `register_server_with_handler` will
/// be sufficient for most use cases.
pub fn register_future<F>(
&mut self, future: F
) where F: IntoFuture<Item = (), Error = error::Error> + 'static {
self.handlers.push(Box::new(future.into_future()))
}
/// Returns a handle to the internal event loop. This is a sort of escape hatch that allows you
/// to take more control over what runs on the reactor using `tokio`. This can be used for
/// sharing this reactor with some elements of other libraries.
pub fn inner_handle(&self) -> Handle {
self.inner.handle()
}
/// Consumes all registered handlers and futures, and runs them. When using
/// `register_server_with_handler`, this will block indefinitely (until failure occurs) as it
/// will simply continue to process new, incoming messages for each server that was registered.
///
/// # Example
/// ```no_run
/// # extern crate irc;
/// # use std::default::Default;
/// # use irc::client::prelude::*;
/// # use irc::error;
/// # fn main() {
/// # let config = Config::default();
/// let mut reactor = IrcReactor::new().unwrap();
/// let server = reactor.prepare_server_and_connect(&config).unwrap();
/// reactor.register_server_with_handler(server, process_msg)
/// # }
/// # fn process_msg(server: &IrcServer, message: Message) -> error::Result<()> { Ok(()) }
/// ```
pub fn run(&mut self) -> error::Result<()> {
let mut handlers = Vec::new();
while let Some(handler) = self.handlers.pop() {
handlers.push(handler);
}
self.inner.run(future::join_all(handlers).map(|_| ()))
}
}

View file

@ -17,7 +17,7 @@
//! use irc::client::prelude::{IrcServer, ServerExt};
//!
//! # fn main() {
//! let server = IrcServer::new("config.toml").unwrap();
//! let server = IrcServer::new("config.toml").unwrap();
//! // identify comes from `ServerExt`
//! server.identify().unwrap();
//! # }
@ -85,7 +85,7 @@ pub mod utils;
/// use irc::client::prelude::EachIncomingExt;
///
/// # fn main() {
/// # let server = IrcServer::new("config.toml").unwrap();
/// # let server = IrcServer::new("config.toml").unwrap();
/// # server.identify().unwrap();
/// server.stream().for_each_incoming(|irc_msg| {
/// match irc_msg.command {
@ -128,7 +128,7 @@ pub trait Server {
/// # extern crate irc;
/// # use irc::client::prelude::*;
/// # fn main() {
/// # let server = IrcServer::new("config.toml").unwrap();
/// # let server = IrcServer::new("config.toml").unwrap();
/// server.send(Command::NICK("example".to_owned())).unwrap();
/// server.send(Command::USER("user".to_owned(), "0".to_owned(), "name".to_owned())).unwrap();
/// # }
@ -153,7 +153,7 @@ pub trait Server {
/// # extern crate irc;
/// # use irc::client::prelude::{IrcServer, ServerExt, Server, Command};
/// # fn main() {
/// # let server = IrcServer::new("config.toml").unwrap();
/// # let server = IrcServer::new("config.toml").unwrap();
/// # server.identify().unwrap();
/// server.for_each_incoming(|irc_msg| {
/// match irc_msg.command {
@ -189,7 +189,7 @@ pub trait Server {
/// use irc::proto::caps::Capability;
///
/// # fn main() {
/// # let server = IrcServer::new("config.toml").unwrap();
/// # let server = IrcServer::new("config.toml").unwrap();
/// server.send_cap_req(&[Capability::MultiPrefix]).unwrap();
/// server.identify().unwrap();
/// # }
@ -599,6 +599,8 @@ impl ServerState {
/// server after connection. Cloning an `IrcServer` is relatively cheap, as it's equivalent to
/// cloning a single `Arc`. This may be useful for setting up multiple threads with access to one
/// connection.
///
/// For a full example usage, see [irc::client::server](./index.html).
#[derive(Clone, Debug)]
pub struct IrcServer {
/// The internal, thread-safe server state.
@ -672,7 +674,7 @@ impl IrcServer {
/// # extern crate irc;
/// # use irc::client::prelude::*;
/// # fn main() {
/// let server = IrcServer::new("config.toml").unwrap();
/// let server = IrcServer::new("config.toml").unwrap();
/// # }
/// ```
pub fn new<P: AsRef<Path>>(config: P) -> error::Result<IrcServer> {
@ -683,7 +685,8 @@ impl IrcServer {
/// immediately. Due to current design limitations, error handling here is somewhat limited. In
/// particular, failed connections will cause the program to panic because the connection
/// attempt is made on a freshly created thread. If you need to avoid this behavior and handle
/// errors more gracefully, it is recommended that you use `IrcServer::new_future` instead.
/// errors more gracefully, it is recommended that you use an
/// [IrcReactor](../reactor/struct.IrcReactor.html) instead.
///
/// # Example
/// ```no_run
@ -696,7 +699,7 @@ impl IrcServer {
/// server: Some("irc.example.com".to_owned()),
/// .. Default::default()
/// };
/// let server = IrcServer::from_config(config).unwrap();
/// let server = IrcServer::from_config(config).unwrap();
/// # }
/// ```
pub fn from_config(config: Config) -> error::Result<IrcServer> {
@ -743,7 +746,9 @@ impl IrcServer {
/// Proper usage requires familiarity with `tokio` and `futures`. You can find more information
/// in the crate documentation for [tokio-core](http://docs.rs/tokio-core) or
/// [futures](http://docs.rs/futures). Additionally, you can find detailed tutorials on using
/// both libraries on the [tokio website](https://tokio.rs/docs/getting-started/tokio/).
/// both libraries on the [tokio website](https://tokio.rs/docs/getting-started/tokio/). An easy
/// to use abstraction that does not require this knowledge is available via
/// [IrcReactors](../reactor/struct.IrcReactor.html).
///
/// # Example
/// ```no_run
@ -751,6 +756,7 @@ impl IrcServer {
/// # extern crate tokio_core;
/// # use std::default::Default;
/// # use irc::client::prelude::*;
/// # use irc::client::server::PackedIrcServer;
/// # use irc::error;
/// # use tokio_core::reactor::Core;
/// # fn main() {
@ -760,14 +766,14 @@ impl IrcServer {
/// # .. Default::default()
/// # };
/// let mut reactor = Core::new().unwrap();
/// let future = IrcServer::new_future(reactor.handle(), &config).unwrap();
/// let future = IrcServer::new_future(reactor.handle(), &config).unwrap();
/// // immediate connection errors (like no internet) will turn up here...
/// let server = reactor.run(future).unwrap();
/// let PackedIrcServer(server, future) = reactor.run(future).unwrap();
/// // runtime errors (like disconnections and so forth) will turn up here...
/// reactor.run(server.stream().for_each(move |irc_msg| {
/// // processing messages works like usual
/// process_msg(&server, irc_msg)
/// })).unwrap();
/// }).join(future)).unwrap();
/// # }
/// # fn process_msg(server: &IrcServer, message: Message) -> error::Result<()> { Ok(()) }
/// ```
@ -776,7 +782,7 @@ impl IrcServer {
Ok(IrcServerFuture {
conn: Connection::new(config, &handle)?,
handle: handle,
_handle: handle,
config: config,
tx_outgoing: Some(tx_outgoing),
rx_outgoing: Some(rx_outgoing),
@ -802,18 +808,19 @@ impl IrcServer {
/// Interaction with this future relies on the `futures` API, but is only expected for more advanced
/// use cases. To learn more, you can view the documentation for the
/// [futures](https://docs.rs/futures/) crate, or the tutorials for
/// [tokio](https://tokio.rs/docs/getting-started/futures/).
#[derive(Debug)]
/// [tokio](https://tokio.rs/docs/getting-started/futures/). An easy to use abstraction that does
/// not require this knowledge is available via [IrcReactors](../reactor/struct.IrcReactor.html).
#[derive(Debug)]
pub struct IrcServerFuture<'a> {
conn: ConnectionFuture<'a>,
handle: Handle,
_handle: Handle,
config: &'a Config,
tx_outgoing: Option<UnboundedSender<Message>>,
rx_outgoing: Option<UnboundedReceiver<Message>>,
}
impl<'a> Future for IrcServerFuture<'a> {
type Item = IrcServer;
type Item = PackedIrcServer;
type Error = error::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -825,19 +832,25 @@ impl<'a> Future for IrcServerFuture<'a> {
let outgoing_future = sink.send_all(self.rx_outgoing.take().unwrap().map_err(|()| {
let res: error::Error = error::ErrorKind::ChannelError.into();
res
})).map(|_| ()).map_err(|e| panic!(e));
})).map(|_| ());
self.handle.spawn(outgoing_future);
Ok(Async::Ready(IrcServer {
let server = IrcServer {
state: Arc::new(ServerState::new(
stream, self.tx_outgoing.take().unwrap(), self.config.clone()
)),
view: view,
}))
};
Ok(Async::Ready(PackedIrcServer(server, Box::new(outgoing_future))))
}
}
/// An `IrcServer` packaged with a future that drives its message sending. In order for the server
/// to actually work properly, this future _must_ be running.
///
/// This type should only be used by advanced users who are familiar with the implementation of this
/// crate. An easy to use abstraction that does not require this knowledge is available via
/// [IrcReactors](../reactor/struct.IrcReactor.html).
pub struct PackedIrcServer(pub IrcServer, pub Box<Future<Item = (), Error = error::Error>>);
#[cfg(test)]
mod test {