From 7d3e923de8158a5013c4045921abf6347a8ccb58 Mon Sep 17 00:00:00 2001 From: Aaron Weiss Date: Wed, 21 Jun 2017 13:18:22 -0400 Subject: [PATCH] Added a first real cut at async. --- examples/async.rs | 39 +++++++++ src/client/async.rs | 189 ++++++++++++++++++++++++++++++++++++++++ src/client/mod.rs | 2 + src/client/transport.rs | 61 +++++++++++++ src/error.rs | 9 ++ src/proto/mod.rs | 2 +- 6 files changed, 301 insertions(+), 1 deletion(-) create mode 100644 examples/async.rs create mode 100644 src/client/async.rs create mode 100644 src/client/transport.rs diff --git a/examples/async.rs b/examples/async.rs new file mode 100644 index 0000000..9db90d2 --- /dev/null +++ b/examples/async.rs @@ -0,0 +1,39 @@ +extern crate futures; +extern crate irc; + +use std::default::Default; +use std::thread; +use futures::{Future, Stream}; +use irc::client::async::IrcServer; +use irc::client::data::Config; +use irc::proto::{CapSubCommand, Command}; + +fn main() { + let config = Config { + nickname: Some("pickles".to_owned()), + alt_nicks: Some(vec!["bananas".to_owned(), "apples".to_owned()]), + server: Some("chat.freenode.net".to_owned()), + channels: Some(vec!["##yulli".to_owned()]), + ..Default::default() + }; + + let mut server = IrcServer::new(config).unwrap(); + thread::sleep_ms(100); + server.send(Command::CAP(None, CapSubCommand::END, None, None)).unwrap(); + server.send(Command::NICK("aatxebot".to_owned())).unwrap(); + server.send(Command::USER("aatxebot".to_owned(), "0".to_owned(), "aatxebot".to_owned())).unwrap(); + thread::sleep_ms(100); + server.send(Command::JOIN("##yulli".to_owned(), None, None)).unwrap(); + server.recv().for_each(|msg| { + print!("{}", msg); + match msg.command { + Command::PRIVMSG(ref target, ref msg) => { + if msg.contains("pickles") { + server.send(Command::PRIVMSG(target.to_owned(), "Hi!".to_owned())).unwrap(); + } + } + _ => (), + } + Ok(()) + }).wait().unwrap(); +} diff --git a/src/client/async.rs b/src/client/async.rs new file mode 100644 index 0000000..6b236c0 --- /dev/null +++ b/src/client/async.rs @@ -0,0 +1,189 @@ +use std::fmt; +use std::thread; +use std::thread::JoinHandle; +use error; +use client::data::Config; +use client::transport::IrcTransport; +use proto::{IrcCodec, Message}; +use futures::future; +use futures::{Async, Poll, Future, Sink, StartSend, Stream}; +use futures::stream::SplitStream; +use futures::sync::mpsc; +use futures::sync::oneshot; +use futures::sync::mpsc::UnboundedSender; +use native_tls::TlsConnector; +use tokio_core::reactor::{Core, Handle}; +use tokio_core::net::{TcpStream, TcpStreamNew}; +use tokio_io::AsyncRead; +use tokio_tls::{TlsConnectorExt, TlsStream}; + +pub enum Connection { + Unsecured(IrcTransport), + Secured(IrcTransport>), +} + +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "IrcConnection") + } +} + +type TlsFuture = Box> + Send>; + +pub enum ConnectionFuture<'a> { + Unsecured(&'a Config, TcpStreamNew), + Secured(&'a Config, TlsFuture), +} + +impl<'a> Future for ConnectionFuture<'a> { + type Item = Connection; + type Error = error::Error; + + fn poll(&mut self) -> Poll { + match self { + &mut ConnectionFuture::Unsecured(ref config, ref mut inner) => { + let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); + let transport = IrcTransport::new(config, framed); + + Ok(Async::Ready(Connection::Unsecured(transport))) + } + &mut ConnectionFuture::Secured(ref config, ref mut inner) => { + let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); + let transport = IrcTransport::new(config, framed); + + Ok(Async::Ready(Connection::Secured(transport))) + } + } + } +} + +impl Connection { + pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result> { + if config.use_ssl() { + let domain = format!("{}:{}", config.server(), config.port()); + let connector = TlsConnector::builder()?.build()?; + let stream = TcpStream::connect(&config.socket_addr(), handle).map_err(|e| { + let res: error::Error = e.into(); + res + }).and_then(move |socket| { + connector.connect_async(&domain, socket).map_err(|e| e.into()) + }).boxed(); + Ok(ConnectionFuture::Secured(config, stream)) + } else { + Ok(ConnectionFuture::Unsecured(config, TcpStream::connect(&config.socket_addr(), handle))) + } + } +} + +impl Stream for Connection { + type Item = Message; + type Error = error::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match self { + &mut Connection::Unsecured(ref mut inner) => inner.poll(), + &mut Connection::Secured(ref mut inner) => inner.poll(), + } + } +} + +impl Sink for Connection { + type SinkItem = Message; + type SinkError = error::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + match self { + &mut Connection::Unsecured(ref mut inner) => inner.start_send(item), + &mut Connection::Secured(ref mut inner) => inner.start_send(item), + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + match self { + &mut Connection::Unsecured(ref mut inner) => inner.poll_complete(), + &mut Connection::Secured(ref mut inner) => inner.poll_complete(), + } + } +} + +pub struct IrcServer { + config: Config, + handle: JoinHandle<()>, + incoming: Option>, + outgoing: UnboundedSender, +} + +impl IrcServer { + pub fn new(config: Config) -> error::Result { + // Setting up a remote reactor running forever. + let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); + let (tx_incoming, rx_incoming) = oneshot::channel(); + + let cfg = config.clone(); + let handle = thread::spawn(move || { + let mut reactor = Core::new().unwrap(); + + // Setting up internal processing stuffs. + let handle = reactor.handle(); + let (sink, stream) = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap().split(); + + let outgoing_future = sink.send_all(rx_outgoing.map_err(|_| { + let res: error::Error = error::ErrorKind::ChannelError.into(); + res + })); + handle.spawn(outgoing_future.map(|_| ()).map_err(|_| ())); + + // let incoming_future = tx_incoming.sink_map_err(|e| { + // let res: error::Error = e.into(); + // res + // }).send_all(stream); + // // let incoming_future = stream.forward(tx_incoming); + // handle.spawn(incoming_future.map(|_| ()).map_err(|_| ())); + tx_incoming.send(stream).unwrap(); + + reactor.run(future::empty::<(), ()>()).unwrap(); + }); + + Ok(IrcServer { + config: config, + handle: handle, + incoming: Some(rx_incoming.wait()?), + outgoing: tx_outgoing, + }) + } + + pub fn send>(&self, msg: M) -> error::Result<()> { + (&self.outgoing).send(msg.into())?; + Ok(()) + } + + pub fn recv(&mut self) -> SplitStream { + self.incoming.take().unwrap() + } + + pub fn join(self) -> () { + self.handle.join().unwrap() + } +} + +impl Stream for IrcServer { + type Item = Message; + type Error = error::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + self.incoming.as_mut().unwrap().poll().map_err(|_| error::ErrorKind::ChannelError.into()) + } +} + +impl Sink for IrcServer { + type SinkItem = Message; + type SinkError = error::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + Ok(self.outgoing.start_send(item)?) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + Ok(self.outgoing.poll_complete()?) + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 021e682..7227d96 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,8 +1,10 @@ //! A simple, thread-safe IRC client library. +pub mod async; pub mod conn; pub mod data; pub mod server; +pub mod transport; pub mod prelude { //! A client-side IRC prelude, re-exporting all the necessary basics. diff --git a/src/client/transport.rs b/src/client/transport.rs new file mode 100644 index 0000000..8ce6ff6 --- /dev/null +++ b/src/client/transport.rs @@ -0,0 +1,61 @@ +use std::io; +use std::time::Instant; +use error; +use client::data::Config; +use proto::{Command, IrcCodec, Message}; +use futures::{Async, Poll, Sink, StartSend, Stream}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::Framed; + +pub struct IrcTransport where T: AsyncRead + AsyncWrite { + inner: Framed, + ping_timeout: u64, + last_ping: Instant, +} + +impl IrcTransport where T: AsyncRead + AsyncWrite { + pub fn new(config: &Config, inner: Framed) -> IrcTransport { + IrcTransport { + inner: inner, + ping_timeout: config.ping_time() as u64, + last_ping: Instant::now(), + } + } +} + +impl Stream for IrcTransport where T: AsyncRead + AsyncWrite { + type Item = Message; + type Error = error::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + if self.last_ping.elapsed().as_secs() >= self.ping_timeout { + self.close()?; + Err(io::Error::new(io::ErrorKind::ConnectionReset, "Ping timed out.").into()) + } else { + loop { + match try_ready!(self.inner.poll()) { + Some(Message { command: Command::PING(ref data, _), .. }) => { + self.last_ping = Instant::now(); + let result = self.start_send(Command::PONG(data.to_owned(), None).into())?; + assert!(result.is_ready()); + self.poll_complete()?; + } + message => return Ok(Async::Ready(message)) + } + } + } + } +} + +impl Sink for IrcTransport where T: AsyncRead + AsyncWrite { + type SinkItem = Message; + type SinkError = error::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + Ok(self.inner.start_send(item)?) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + Ok(self.inner.poll_complete()?) + } +} diff --git a/src/error.rs b/src/error.rs index 49bf747..85e886a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,6 +6,9 @@ error_chain! { foreign_links { Io(::std::io::Error); Tls(::native_tls::Error); + Recv(::std::sync::mpsc::RecvError); + SendMessage(::futures::sync::mpsc::SendError<::proto::Message>); + OneShotCancelled(::futures::sync::oneshot::Canceled); } errors { @@ -26,5 +29,11 @@ error_chain! { description("Failed to parse an IRC subcommand.") display("Failed to parse an IRC subcommand.") } + + /// An error occurred on one of the internal channels of the `IrcServer`. + ChannelError { + description("An error occured on one of the IrcServer's internal channels.") + display("An error occured on one of the IrcServer's internal channels.") + } } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 0ade093..27a5085 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -8,7 +8,7 @@ pub mod message; pub mod response; pub use self::caps::{Capability, NegotiationVersion}; -pub use self::command::{BatchSubCommand, Command}; +pub use self::command::{BatchSubCommand, CapSubCommand, Command}; pub use self::irc::IrcCodec; pub use self::message::Message; pub use self::response::Response;