Added a first real cut at async.

This commit is contained in:
Aaron Weiss 2017-06-21 13:18:22 -04:00
parent cc1aa5717e
commit 7d3e923de8
No known key found for this signature in database
GPG key ID: 0237035D9BF03AE2
6 changed files with 301 additions and 1 deletions

39
examples/async.rs Normal file
View file

@ -0,0 +1,39 @@
extern crate futures;
extern crate irc;
use std::default::Default;
use std::thread;
use futures::{Future, Stream};
use irc::client::async::IrcServer;
use irc::client::data::Config;
use irc::proto::{CapSubCommand, Command};
fn main() {
let config = Config {
nickname: Some("pickles".to_owned()),
alt_nicks: Some(vec!["bananas".to_owned(), "apples".to_owned()]),
server: Some("chat.freenode.net".to_owned()),
channels: Some(vec!["##yulli".to_owned()]),
..Default::default()
};
let mut server = IrcServer::new(config).unwrap();
thread::sleep_ms(100);
server.send(Command::CAP(None, CapSubCommand::END, None, None)).unwrap();
server.send(Command::NICK("aatxebot".to_owned())).unwrap();
server.send(Command::USER("aatxebot".to_owned(), "0".to_owned(), "aatxebot".to_owned())).unwrap();
thread::sleep_ms(100);
server.send(Command::JOIN("##yulli".to_owned(), None, None)).unwrap();
server.recv().for_each(|msg| {
print!("{}", msg);
match msg.command {
Command::PRIVMSG(ref target, ref msg) => {
if msg.contains("pickles") {
server.send(Command::PRIVMSG(target.to_owned(), "Hi!".to_owned())).unwrap();
}
}
_ => (),
}
Ok(())
}).wait().unwrap();
}

189
src/client/async.rs Normal file
View file

@ -0,0 +1,189 @@
use std::fmt;
use std::thread;
use std::thread::JoinHandle;
use error;
use client::data::Config;
use client::transport::IrcTransport;
use proto::{IrcCodec, Message};
use futures::future;
use futures::{Async, Poll, Future, Sink, StartSend, Stream};
use futures::stream::SplitStream;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::sync::mpsc::UnboundedSender;
use native_tls::TlsConnector;
use tokio_core::reactor::{Core, Handle};
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_io::AsyncRead;
use tokio_tls::{TlsConnectorExt, TlsStream};
pub enum Connection {
Unsecured(IrcTransport<TcpStream>),
Secured(IrcTransport<TlsStream<TcpStream>>),
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "IrcConnection")
}
}
type TlsFuture = Box<Future<Error=error::Error, Item=TlsStream<TcpStream>> + Send>;
pub enum ConnectionFuture<'a> {
Unsecured(&'a Config, TcpStreamNew),
Secured(&'a Config, TlsFuture),
}
impl<'a> Future for ConnectionFuture<'a> {
type Item = Connection;
type Error = error::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut ConnectionFuture::Unsecured(ref config, ref mut inner) => {
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
let transport = IrcTransport::new(config, framed);
Ok(Async::Ready(Connection::Unsecured(transport)))
}
&mut ConnectionFuture::Secured(ref config, ref mut inner) => {
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
let transport = IrcTransport::new(config, framed);
Ok(Async::Ready(Connection::Secured(transport)))
}
}
}
}
impl Connection {
pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result<ConnectionFuture<'a>> {
if config.use_ssl() {
let domain = format!("{}:{}", config.server(), config.port());
let connector = TlsConnector::builder()?.build()?;
let stream = TcpStream::connect(&config.socket_addr(), handle).map_err(|e| {
let res: error::Error = e.into();
res
}).and_then(move |socket| {
connector.connect_async(&domain, socket).map_err(|e| e.into())
}).boxed();
Ok(ConnectionFuture::Secured(config, stream))
} else {
Ok(ConnectionFuture::Unsecured(config, TcpStream::connect(&config.socket_addr(), handle)))
}
}
}
impl Stream for Connection {
type Item = Message;
type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut Connection::Unsecured(ref mut inner) => inner.poll(),
&mut Connection::Secured(ref mut inner) => inner.poll(),
}
}
}
impl Sink for Connection {
type SinkItem = Message;
type SinkError = error::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
match self {
&mut Connection::Unsecured(ref mut inner) => inner.start_send(item),
&mut Connection::Secured(ref mut inner) => inner.start_send(item),
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
match self {
&mut Connection::Unsecured(ref mut inner) => inner.poll_complete(),
&mut Connection::Secured(ref mut inner) => inner.poll_complete(),
}
}
}
pub struct IrcServer {
config: Config,
handle: JoinHandle<()>,
incoming: Option<SplitStream<Connection>>,
outgoing: UnboundedSender<Message>,
}
impl IrcServer {
pub fn new(config: Config) -> error::Result<IrcServer> {
// Setting up a remote reactor running forever.
let (tx_outgoing, rx_outgoing) = mpsc::unbounded();
let (tx_incoming, rx_incoming) = oneshot::channel();
let cfg = config.clone();
let handle = thread::spawn(move || {
let mut reactor = Core::new().unwrap();
// Setting up internal processing stuffs.
let handle = reactor.handle();
let (sink, stream) = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap().split();
let outgoing_future = sink.send_all(rx_outgoing.map_err(|_| {
let res: error::Error = error::ErrorKind::ChannelError.into();
res
}));
handle.spawn(outgoing_future.map(|_| ()).map_err(|_| ()));
// let incoming_future = tx_incoming.sink_map_err(|e| {
// let res: error::Error = e.into();
// res
// }).send_all(stream);
// // let incoming_future = stream.forward(tx_incoming);
// handle.spawn(incoming_future.map(|_| ()).map_err(|_| ()));
tx_incoming.send(stream).unwrap();
reactor.run(future::empty::<(), ()>()).unwrap();
});
Ok(IrcServer {
config: config,
handle: handle,
incoming: Some(rx_incoming.wait()?),
outgoing: tx_outgoing,
})
}
pub fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()> {
(&self.outgoing).send(msg.into())?;
Ok(())
}
pub fn recv(&mut self) -> SplitStream<Connection> {
self.incoming.take().unwrap()
}
pub fn join(self) -> () {
self.handle.join().unwrap()
}
}
impl Stream for IrcServer {
type Item = Message;
type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.incoming.as_mut().unwrap().poll().map_err(|_| error::ErrorKind::ChannelError.into())
}
}
impl Sink for IrcServer {
type SinkItem = Message;
type SinkError = error::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
Ok(self.outgoing.start_send(item)?)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
Ok(self.outgoing.poll_complete()?)
}
}

View file

@ -1,8 +1,10 @@
//! A simple, thread-safe IRC client library.
pub mod async;
pub mod conn;
pub mod data;
pub mod server;
pub mod transport;
pub mod prelude {
//! A client-side IRC prelude, re-exporting all the necessary basics.

61
src/client/transport.rs Normal file
View file

@ -0,0 +1,61 @@
use std::io;
use std::time::Instant;
use error;
use client::data::Config;
use proto::{Command, IrcCodec, Message};
use futures::{Async, Poll, Sink, StartSend, Stream};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Framed;
pub struct IrcTransport<T> where T: AsyncRead + AsyncWrite {
inner: Framed<T, IrcCodec>,
ping_timeout: u64,
last_ping: Instant,
}
impl<T> IrcTransport<T> where T: AsyncRead + AsyncWrite {
pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> {
IrcTransport {
inner: inner,
ping_timeout: config.ping_time() as u64,
last_ping: Instant::now(),
}
}
}
impl<T> Stream for IrcTransport<T> where T: AsyncRead + AsyncWrite {
type Item = Message;
type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.last_ping.elapsed().as_secs() >= self.ping_timeout {
self.close()?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, "Ping timed out.").into())
} else {
loop {
match try_ready!(self.inner.poll()) {
Some(Message { command: Command::PING(ref data, _), .. }) => {
self.last_ping = Instant::now();
let result = self.start_send(Command::PONG(data.to_owned(), None).into())?;
assert!(result.is_ready());
self.poll_complete()?;
}
message => return Ok(Async::Ready(message))
}
}
}
}
}
impl<T> Sink for IrcTransport<T> where T: AsyncRead + AsyncWrite {
type SinkItem = Message;
type SinkError = error::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
Ok(self.inner.start_send(item)?)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
Ok(self.inner.poll_complete()?)
}
}

View file

@ -6,6 +6,9 @@ error_chain! {
foreign_links {
Io(::std::io::Error);
Tls(::native_tls::Error);
Recv(::std::sync::mpsc::RecvError);
SendMessage(::futures::sync::mpsc::SendError<::proto::Message>);
OneShotCancelled(::futures::sync::oneshot::Canceled);
}
errors {
@ -26,5 +29,11 @@ error_chain! {
description("Failed to parse an IRC subcommand.")
display("Failed to parse an IRC subcommand.")
}
/// An error occurred on one of the internal channels of the `IrcServer`.
ChannelError {
description("An error occured on one of the IrcServer's internal channels.")
display("An error occured on one of the IrcServer's internal channels.")
}
}
}

View file

@ -8,7 +8,7 @@ pub mod message;
pub mod response;
pub use self::caps::{Capability, NegotiationVersion};
pub use self::command::{BatchSubCommand, Command};
pub use self::command::{BatchSubCommand, CapSubCommand, Command};
pub use self::irc::IrcCodec;
pub use self::message::Message;
pub use self::response::Response;