diff --git a/src/client/async.rs b/src/client/async.rs deleted file mode 100644 index a5b6766..0000000 --- a/src/client/async.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::fmt; -use std::thread; -use std::thread::JoinHandle; -use error; -use client::data::Config; -use client::transport::IrcTransport; -use proto::{IrcCodec, Message}; -use futures::future; -use futures::{Async, Poll, Future, Sink, StartSend, Stream}; -use futures::stream::SplitStream; -use futures::sync::mpsc; -use futures::sync::oneshot; -use futures::sync::mpsc::UnboundedSender; -use native_tls::TlsConnector; -use tokio_core::reactor::{Core, Handle}; -use tokio_core::net::{TcpStream, TcpStreamNew}; -use tokio_io::AsyncRead; -use tokio_tls::{TlsConnectorExt, TlsStream}; - -pub enum Connection { - Unsecured(IrcTransport), - Secured(IrcTransport>), -} - -impl fmt::Debug for Connection { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "IrcConnection") - } -} - -type TlsFuture = Box> + Send>; - -pub enum ConnectionFuture<'a> { - Unsecured(&'a Config, TcpStreamNew), - Secured(&'a Config, TlsFuture), -} - -impl<'a> Future for ConnectionFuture<'a> { - type Item = Connection; - type Error = error::Error; - - fn poll(&mut self) -> Poll { - match self { - &mut ConnectionFuture::Unsecured(ref config, ref mut inner) => { - let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); - let transport = IrcTransport::new(config, framed); - - Ok(Async::Ready(Connection::Unsecured(transport))) - } - &mut ConnectionFuture::Secured(ref config, ref mut inner) => { - let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); - let transport = IrcTransport::new(config, framed); - - Ok(Async::Ready(Connection::Secured(transport))) - } - } - } -} - -impl Connection { - pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result> { - if config.use_ssl() { - let domain = format!("{}:{}", config.server(), config.port()); - let connector = TlsConnector::builder()?.build()?; - let stream = TcpStream::connect(&config.socket_addr(), handle).map_err(|e| { - let res: error::Error = e.into(); - res - }).and_then(move |socket| { - connector.connect_async(&domain, socket).map_err(|e| e.into()) - }).boxed(); - Ok(ConnectionFuture::Secured(config, stream)) - } else { - Ok(ConnectionFuture::Unsecured(config, TcpStream::connect(&config.socket_addr(), handle))) - } - } -} - -impl Stream for Connection { - type Item = Message; - type Error = error::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - match self { - &mut Connection::Unsecured(ref mut inner) => inner.poll(), - &mut Connection::Secured(ref mut inner) => inner.poll(), - } - } -} - -impl Sink for Connection { - type SinkItem = Message; - type SinkError = error::Error; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - match self { - &mut Connection::Unsecured(ref mut inner) => inner.start_send(item), - &mut Connection::Secured(ref mut inner) => inner.start_send(item), - } - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - match self { - &mut Connection::Unsecured(ref mut inner) => inner.poll_complete(), - &mut Connection::Secured(ref mut inner) => inner.poll_complete(), - } - } -} diff --git a/src/client/conn.rs b/src/client/conn.rs index ac38a81..a5b6766 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -1,329 +1,107 @@ -//! Thread-safe connections on `IrcStreams`. -use std::io; -use std::io::prelude::*; -use std::io::Cursor; -use std::net::TcpStream; -use std::sync::Mutex; -use error::Result; -use bufstream::BufStream; -use encoding::DecoderTrap; -use encoding::label::encoding_from_whatwg_label; +use std::fmt; +use std::thread; +use std::thread::JoinHandle; +use error; +use client::data::Config; +use client::transport::IrcTransport; +use proto::{IrcCodec, Message}; +use futures::future; +use futures::{Async, Poll, Future, Sink, StartSend, Stream}; +use futures::stream::SplitStream; +use futures::sync::mpsc; +use futures::sync::oneshot; +use futures::sync::mpsc::UnboundedSender; +use native_tls::TlsConnector; +use tokio_core::reactor::{Core, Handle}; +use tokio_core::net::{TcpStream, TcpStreamNew}; +use tokio_io::AsyncRead; +use tokio_tls::{TlsConnectorExt, TlsStream}; -/// A connection. -pub trait Connection { - /// Sends a message over this connection. - fn send(&self, msg: &str, encoding: &str) -> Result<()>; - - /// Receives a single line from this connection. - fn recv(&self, encoding: &str) -> Result; - - /// Gets the full record of all sent messages if the Connection records this. - /// This is intended for use in writing tests. - fn written(&self, encoding: &str) -> Option; - - /// Re-establishes this connection, disconnecting from the existing case if necessary. - fn reconnect(&self) -> Result<()>; +pub enum Connection { + Unsecured(IrcTransport), + Secured(IrcTransport>), } -/// Useful internal type definitions. -type NetBufStream = BufStream; - -/// A thread-safe connection over a buffered `NetStream`. -pub struct NetConnection { - host: Mutex, - port: Mutex, - stream: Mutex, +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "IrcConnection") + } } -impl NetConnection { - fn new(host: &str, port: u16, stream: NetBufStream) -> NetConnection { - NetConnection { - host: Mutex::new(host.to_owned()), - port: Mutex::new(port), - stream: Mutex::new(stream), +type TlsFuture = Box> + Send>; + +pub enum ConnectionFuture<'a> { + Unsecured(&'a Config, TcpStreamNew), + Secured(&'a Config, TlsFuture), +} + +impl<'a> Future for ConnectionFuture<'a> { + type Item = Connection; + type Error = error::Error; + + fn poll(&mut self) -> Poll { + match self { + &mut ConnectionFuture::Unsecured(ref config, ref mut inner) => { + let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); + let transport = IrcTransport::new(config, framed); + + Ok(Async::Ready(Connection::Unsecured(transport))) + } + &mut ConnectionFuture::Secured(ref config, ref mut inner) => { + let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?); + let transport = IrcTransport::new(config, framed); + + Ok(Async::Ready(Connection::Secured(transport))) + } } } - - /// Creates a thread-safe TCP connection to the specified server. - pub fn connect(host: &str, port: u16) -> Result { - let stream = try!(NetConnection::connect_internal(host, port)); - Ok(NetConnection::new(host, port, stream)) - } - - /// connects to the specified server and returns a reader-writer pair. - fn connect_internal(host: &str, port: u16) -> Result { - let socket = try!(TcpStream::connect((host, port)).into()); - Ok(BufStream::new(NetStream::Unsecured(socket))) - } - - /// Creates a thread-safe TCP connection to the specified server over SSL. - /// If the library is compiled without SSL support, this method panics. - pub fn connect_ssl(host: &str, port: u16) -> Result { - let stream = try!(NetConnection::connect_ssl_internal(host, port)); - Ok(NetConnection::new(host, port, stream)) - } - - /// Panics because SSL support is not compiled in. - fn connect_ssl_internal(host: &str, port: u16) -> Result { - panic!("Cannot connect to {}:{} over SSL without compiling with SSL support.", host, port) - } } -impl Connection for NetConnection { - fn send(&self, msg: &str, encoding: &str) -> Result<()> { - imp::send(&self.stream, msg, encoding) - } - - fn recv(&self, encoding: &str) -> Result { - imp::recv(&self.stream, encoding) - } - - fn written(&self, _: &str) -> Option { - None - } - - fn reconnect(&self) -> Result<()> { - let use_ssl = match *self.stream.lock().unwrap().get_ref() { - NetStream::Unsecured(_) => false, - }; - let host = self.host.lock().unwrap(); - let port = self.port.lock().unwrap(); - let stream = if use_ssl { - try!(NetConnection::connect_ssl_internal(&host, *port)) +impl Connection { + pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result> { + if config.use_ssl() { + let domain = format!("{}:{}", config.server(), config.port()); + let connector = TlsConnector::builder()?.build()?; + let stream = TcpStream::connect(&config.socket_addr(), handle).map_err(|e| { + let res: error::Error = e.into(); + res + }).and_then(move |socket| { + connector.connect_async(&domain, socket).map_err(|e| e.into()) + }).boxed(); + Ok(ConnectionFuture::Secured(config, stream)) } else { - try!(NetConnection::connect_internal(&host, *port)) - }; - *self.stream.lock().unwrap() = stream; - Ok(()) - } -} - -/// A mock connection for testing purposes. -pub struct MockConnection { - reader: Mutex>>, - writer: Mutex>, -} - -impl MockConnection { - /// Creates a new mock connection with an empty read buffer. - pub fn empty() -> MockConnection { - MockConnection::from_byte_vec(Vec::new()) - } - - /// Creates a new mock connection with the specified string in the read buffer. - pub fn new(input: &str) -> MockConnection { - MockConnection::from_byte_vec(input.as_bytes().to_vec()) - } - - /// Creates a new mock connection with the specified bytes in the read buffer. - pub fn from_byte_vec(input: Vec) -> MockConnection { - MockConnection { - reader: Mutex::new(Cursor::new(input)), - writer: Mutex::new(Vec::new()), + Ok(ConnectionFuture::Unsecured(config, TcpStream::connect(&config.socket_addr(), handle))) } } } -impl Connection for MockConnection { - fn send(&self, msg: &str, encoding: &str) -> Result<()> { - imp::send(&self.writer, msg, encoding) - } +impl Stream for Connection { + type Item = Message; + type Error = error::Error; - fn recv(&self, encoding: &str) -> Result { - imp::recv(&self.reader, encoding) - } - - fn written(&self, encoding: &str) -> Option { - encoding_from_whatwg_label(encoding).and_then(|enc| { - enc.decode(&self.writer.lock().unwrap(), DecoderTrap::Replace) - .ok() - }) - } - - fn reconnect(&self) -> Result<()> { - Ok(()) - } -} - -mod imp { - use std::io::{Error, ErrorKind}; - use std::sync::Mutex; - use encoding::{DecoderTrap, EncoderTrap}; - use encoding::label::encoding_from_whatwg_label; - use error::Result; - use client::data::kinds::{IrcRead, IrcWrite}; - - pub fn send(writer: &Mutex, msg: &str, encoding: &str) -> Result<()> { - let encoding = match encoding_from_whatwg_label(encoding) { - Some(enc) => enc, - None => { - return Err(Error::new( - ErrorKind::InvalidInput, - &format!("Failed to find encoder. ({})", encoding)[..], - ).into()) - } - }; - let data = match encoding.encode(msg, EncoderTrap::Replace) { - Ok(data) => data, - Err(data) => { - return Err(Error::new( - ErrorKind::InvalidInput, - &format!( - "Failed to encode {} as {}.", - data, - encoding.name() - ) - [..], - ).into()) - } - }; - let mut writer = writer.lock().unwrap(); - try!(writer.write_all(&data)); - writer.flush().map_err(|e| e.into()) - } - - pub fn recv(reader: &Mutex, encoding: &str) -> Result { - let encoding = match encoding_from_whatwg_label(encoding) { - Some(enc) => enc, - None => { - return Err(Error::new( - ErrorKind::InvalidInput, - &format!("Failed to find decoder. ({})", encoding)[..], - ).into()) - } - }; - let mut buf = Vec::new(); - reader - .lock() - .unwrap() - .read_until(b'\n', &mut buf) - .and_then(|_| match encoding.decode(&buf, DecoderTrap::Replace) { - _ if buf.is_empty() => Err(Error::new(ErrorKind::Other, "EOF")), - Ok(data) => Ok(data), - Err(data) => Err(Error::new( - ErrorKind::InvalidInput, - &format!( - "Failed to decode {} as {}.", - data, - encoding.name() - ) - [..], - )), - }) - .map_err(|e| e.into()) - } - -} - -/// An abstraction over different networked streams. -pub enum NetStream { - /// An unsecured TcpStream. - Unsecured(TcpStream), -} - -impl Read for NetStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match *self { - NetStream::Unsecured(ref mut stream) => stream.read(buf), - #[cfg(feature = "ssl")] - NetStream::Ssl(ref mut stream) => stream.read(buf), + fn poll(&mut self) -> Poll, Self::Error> { + match self { + &mut Connection::Unsecured(ref mut inner) => inner.poll(), + &mut Connection::Secured(ref mut inner) => inner.poll(), } } } -impl Write for NetStream { - fn write(&mut self, buf: &[u8]) -> io::Result { - match *self { - NetStream::Unsecured(ref mut stream) => stream.write(buf), +impl Sink for Connection { + type SinkItem = Message; + type SinkError = error::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + match self { + &mut Connection::Unsecured(ref mut inner) => inner.start_send(item), + &mut Connection::Secured(ref mut inner) => inner.start_send(item), } } - fn flush(&mut self) -> io::Result<()> { - match *self { - NetStream::Unsecured(ref mut stream) => stream.flush(), + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + match self { + &mut Connection::Unsecured(ref mut inner) => inner.poll_complete(), + &mut Connection::Secured(ref mut inner) => inner.poll_complete(), } } } - -#[cfg(test)] -mod test { - use super::{Connection, MockConnection}; - use error::Result; - use client::data::Message; - use client::data::Command::PRIVMSG; - - fn send_to>(conn: &C, msg: M, encoding: &str) -> Result<()> { - conn.send(&msg.into().to_string(), encoding) - } - - #[test] - fn send_utf8() { - let conn = MockConnection::empty(); - assert!( - send_to( - &conn, - PRIVMSG("test".to_owned(), "€ŠšŽžŒœŸ".to_owned()), - "UTF-8", - ).is_ok() - ); - let data = conn.written("UTF-8").unwrap(); - assert_eq!(&data[..], "PRIVMSG test :€ŠšŽžŒœŸ\r\n"); - } - - #[test] - fn send_utf8_str() { - let exp = "PRIVMSG test :€ŠšŽžŒœŸ\r\n"; - let conn = MockConnection::empty(); - assert!(send_to(&conn, exp, "UTF-8").is_ok()); - let data = conn.written("UTF-8").unwrap(); - assert_eq!(&data[..], exp); - } - - #[test] - fn send_iso885915() { - let conn = MockConnection::empty(); - assert!( - send_to( - &conn, - PRIVMSG("test".to_owned(), "€ŠšŽžŒœŸ".to_owned()), - "l9", - ).is_ok() - ); - let data = conn.written("l9").unwrap(); - assert_eq!(&data[..], "PRIVMSG test :€ŠšŽžŒœŸ\r\n"); - } - - #[test] - fn send_iso885915_str() { - let exp = "PRIVMSG test :€ŠšŽžŒœŸ\r\n"; - let conn = MockConnection::empty(); - assert!(send_to(&conn, exp, "l9").is_ok()); - let data = conn.written("l9").unwrap(); - assert_eq!(&data[..], exp); - } - - #[test] - fn recv_utf8() { - let conn = MockConnection::new("PRIVMSG test :Testing!\r\n"); - assert_eq!( - &conn.recv("UTF-8").unwrap()[..], - "PRIVMSG test :Testing!\r\n" - ); - } - - #[test] - fn recv_iso885915() { - let data = [0xA4, 0xA6, 0xA8, 0xB4, 0xB8, 0xBC, 0xBD, 0xBE]; - let conn = MockConnection::from_byte_vec({ - let mut vec = Vec::new(); - vec.extend("PRIVMSG test :".as_bytes()); - vec.extend(data.iter()); - vec.extend("\r\n".as_bytes()); - vec.into_iter().collect::>() - }); - assert_eq!( - &conn.recv("l9").unwrap()[..], - "PRIVMSG test :€ŠšŽžŒœŸ\r\n" - ); - } -} diff --git a/src/client/mod.rs b/src/client/mod.rs index c4a08b8..5b953e1 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,6 +1,5 @@ //! A simple, thread-safe IRC client library. -pub mod async; pub mod conn; pub mod data; pub mod server; diff --git a/src/client/server/mod.rs b/src/client/server/mod.rs index c56ac9e..6fe9c4a 100644 --- a/src/client/server/mod.rs +++ b/src/client/server/mod.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use error; -use client::async::Connection; +use client::conn::Connection; use client::data::{Command, Config, Message, Response, User}; use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PONG, PRIVMSG, MODE, QUIT}; use client::server::utils::ServerExt;