Replaced old conn.rs with async.rs.

This commit is contained in:
Aaron Weiss 2017-06-21 16:55:37 -04:00
parent c363dc7837
commit 073b82feec
No known key found for this signature in database
GPG key ID: 0237035D9BF03AE2
4 changed files with 83 additions and 413 deletions

View file

@ -1,107 +0,0 @@
use std::fmt;
use std::thread;
use std::thread::JoinHandle;
use error;
use client::data::Config;
use client::transport::IrcTransport;
use proto::{IrcCodec, Message};
use futures::future;
use futures::{Async, Poll, Future, Sink, StartSend, Stream};
use futures::stream::SplitStream;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::sync::mpsc::UnboundedSender;
use native_tls::TlsConnector;
use tokio_core::reactor::{Core, Handle};
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_io::AsyncRead;
use tokio_tls::{TlsConnectorExt, TlsStream};
pub enum Connection {
Unsecured(IrcTransport<TcpStream>),
Secured(IrcTransport<TlsStream<TcpStream>>),
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "IrcConnection")
}
}
type TlsFuture = Box<Future<Error=error::Error, Item=TlsStream<TcpStream>> + Send>;
pub enum ConnectionFuture<'a> {
Unsecured(&'a Config, TcpStreamNew),
Secured(&'a Config, TlsFuture),
}
impl<'a> Future for ConnectionFuture<'a> {
type Item = Connection;
type Error = error::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut ConnectionFuture::Unsecured(ref config, ref mut inner) => {
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
let transport = IrcTransport::new(config, framed);
Ok(Async::Ready(Connection::Unsecured(transport)))
}
&mut ConnectionFuture::Secured(ref config, ref mut inner) => {
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
let transport = IrcTransport::new(config, framed);
Ok(Async::Ready(Connection::Secured(transport)))
}
}
}
}
impl Connection {
pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result<ConnectionFuture<'a>> {
if config.use_ssl() {
let domain = format!("{}:{}", config.server(), config.port());
let connector = TlsConnector::builder()?.build()?;
let stream = TcpStream::connect(&config.socket_addr(), handle).map_err(|e| {
let res: error::Error = e.into();
res
}).and_then(move |socket| {
connector.connect_async(&domain, socket).map_err(|e| e.into())
}).boxed();
Ok(ConnectionFuture::Secured(config, stream))
} else {
Ok(ConnectionFuture::Unsecured(config, TcpStream::connect(&config.socket_addr(), handle)))
}
}
}
impl Stream for Connection {
type Item = Message;
type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut Connection::Unsecured(ref mut inner) => inner.poll(),
&mut Connection::Secured(ref mut inner) => inner.poll(),
}
}
}
impl Sink for Connection {
type SinkItem = Message;
type SinkError = error::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
match self {
&mut Connection::Unsecured(ref mut inner) => inner.start_send(item),
&mut Connection::Secured(ref mut inner) => inner.start_send(item),
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
match self {
&mut Connection::Unsecured(ref mut inner) => inner.poll_complete(),
&mut Connection::Secured(ref mut inner) => inner.poll_complete(),
}
}
}

View file

@ -1,329 +1,107 @@
//! Thread-safe connections on `IrcStreams`. use std::fmt;
use std::io; use std::thread;
use std::io::prelude::*; use std::thread::JoinHandle;
use std::io::Cursor; use error;
use std::net::TcpStream; use client::data::Config;
use std::sync::Mutex; use client::transport::IrcTransport;
use error::Result; use proto::{IrcCodec, Message};
use bufstream::BufStream; use futures::future;
use encoding::DecoderTrap; use futures::{Async, Poll, Future, Sink, StartSend, Stream};
use encoding::label::encoding_from_whatwg_label; use futures::stream::SplitStream;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::sync::mpsc::UnboundedSender;
use native_tls::TlsConnector;
use tokio_core::reactor::{Core, Handle};
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_io::AsyncRead;
use tokio_tls::{TlsConnectorExt, TlsStream};
/// A connection. pub enum Connection {
pub trait Connection { Unsecured(IrcTransport<TcpStream>),
/// Sends a message over this connection. Secured(IrcTransport<TlsStream<TcpStream>>),
fn send(&self, msg: &str, encoding: &str) -> Result<()>;
/// Receives a single line from this connection.
fn recv(&self, encoding: &str) -> Result<String>;
/// Gets the full record of all sent messages if the Connection records this.
/// This is intended for use in writing tests.
fn written(&self, encoding: &str) -> Option<String>;
/// Re-establishes this connection, disconnecting from the existing case if necessary.
fn reconnect(&self) -> Result<()>;
} }
/// Useful internal type definitions. impl fmt::Debug for Connection {
type NetBufStream = BufStream<NetStream>; fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "IrcConnection")
/// A thread-safe connection over a buffered `NetStream`. }
pub struct NetConnection {
host: Mutex<String>,
port: Mutex<u16>,
stream: Mutex<NetBufStream>,
} }
impl NetConnection { type TlsFuture = Box<Future<Error=error::Error, Item=TlsStream<TcpStream>> + Send>;
fn new(host: &str, port: u16, stream: NetBufStream) -> NetConnection {
NetConnection { pub enum ConnectionFuture<'a> {
host: Mutex::new(host.to_owned()), Unsecured(&'a Config, TcpStreamNew),
port: Mutex::new(port), Secured(&'a Config, TlsFuture),
stream: Mutex::new(stream), }
impl<'a> Future for ConnectionFuture<'a> {
type Item = Connection;
type Error = error::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut ConnectionFuture::Unsecured(ref config, ref mut inner) => {
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
let transport = IrcTransport::new(config, framed);
Ok(Async::Ready(Connection::Unsecured(transport)))
}
&mut ConnectionFuture::Secured(ref config, ref mut inner) => {
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
let transport = IrcTransport::new(config, framed);
Ok(Async::Ready(Connection::Secured(transport)))
}
} }
} }
/// Creates a thread-safe TCP connection to the specified server.
pub fn connect(host: &str, port: u16) -> Result<NetConnection> {
let stream = try!(NetConnection::connect_internal(host, port));
Ok(NetConnection::new(host, port, stream))
}
/// connects to the specified server and returns a reader-writer pair.
fn connect_internal(host: &str, port: u16) -> Result<NetBufStream> {
let socket = try!(TcpStream::connect((host, port)).into());
Ok(BufStream::new(NetStream::Unsecured(socket)))
}
/// Creates a thread-safe TCP connection to the specified server over SSL.
/// If the library is compiled without SSL support, this method panics.
pub fn connect_ssl(host: &str, port: u16) -> Result<NetConnection> {
let stream = try!(NetConnection::connect_ssl_internal(host, port));
Ok(NetConnection::new(host, port, stream))
}
/// Panics because SSL support is not compiled in.
fn connect_ssl_internal(host: &str, port: u16) -> Result<NetBufStream> {
panic!("Cannot connect to {}:{} over SSL without compiling with SSL support.", host, port)
}
} }
impl Connection for NetConnection { impl Connection {
fn send(&self, msg: &str, encoding: &str) -> Result<()> { pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result<ConnectionFuture<'a>> {
imp::send(&self.stream, msg, encoding) if config.use_ssl() {
} let domain = format!("{}:{}", config.server(), config.port());
let connector = TlsConnector::builder()?.build()?;
fn recv(&self, encoding: &str) -> Result<String> { let stream = TcpStream::connect(&config.socket_addr(), handle).map_err(|e| {
imp::recv(&self.stream, encoding) let res: error::Error = e.into();
} res
}).and_then(move |socket| {
fn written(&self, _: &str) -> Option<String> { connector.connect_async(&domain, socket).map_err(|e| e.into())
None }).boxed();
} Ok(ConnectionFuture::Secured(config, stream))
fn reconnect(&self) -> Result<()> {
let use_ssl = match *self.stream.lock().unwrap().get_ref() {
NetStream::Unsecured(_) => false,
};
let host = self.host.lock().unwrap();
let port = self.port.lock().unwrap();
let stream = if use_ssl {
try!(NetConnection::connect_ssl_internal(&host, *port))
} else { } else {
try!(NetConnection::connect_internal(&host, *port)) Ok(ConnectionFuture::Unsecured(config, TcpStream::connect(&config.socket_addr(), handle)))
};
*self.stream.lock().unwrap() = stream;
Ok(())
}
}
/// A mock connection for testing purposes.
pub struct MockConnection {
reader: Mutex<Cursor<Vec<u8>>>,
writer: Mutex<Vec<u8>>,
}
impl MockConnection {
/// Creates a new mock connection with an empty read buffer.
pub fn empty() -> MockConnection {
MockConnection::from_byte_vec(Vec::new())
}
/// Creates a new mock connection with the specified string in the read buffer.
pub fn new(input: &str) -> MockConnection {
MockConnection::from_byte_vec(input.as_bytes().to_vec())
}
/// Creates a new mock connection with the specified bytes in the read buffer.
pub fn from_byte_vec(input: Vec<u8>) -> MockConnection {
MockConnection {
reader: Mutex::new(Cursor::new(input)),
writer: Mutex::new(Vec::new()),
} }
} }
} }
impl Connection for MockConnection { impl Stream for Connection {
fn send(&self, msg: &str, encoding: &str) -> Result<()> { type Item = Message;
imp::send(&self.writer, msg, encoding) type Error = error::Error;
}
fn recv(&self, encoding: &str) -> Result<String> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
imp::recv(&self.reader, encoding) match self {
} &mut Connection::Unsecured(ref mut inner) => inner.poll(),
&mut Connection::Secured(ref mut inner) => inner.poll(),
fn written(&self, encoding: &str) -> Option<String> {
encoding_from_whatwg_label(encoding).and_then(|enc| {
enc.decode(&self.writer.lock().unwrap(), DecoderTrap::Replace)
.ok()
})
}
fn reconnect(&self) -> Result<()> {
Ok(())
}
}
mod imp {
use std::io::{Error, ErrorKind};
use std::sync::Mutex;
use encoding::{DecoderTrap, EncoderTrap};
use encoding::label::encoding_from_whatwg_label;
use error::Result;
use client::data::kinds::{IrcRead, IrcWrite};
pub fn send<T: IrcWrite>(writer: &Mutex<T>, msg: &str, encoding: &str) -> Result<()> {
let encoding = match encoding_from_whatwg_label(encoding) {
Some(enc) => enc,
None => {
return Err(Error::new(
ErrorKind::InvalidInput,
&format!("Failed to find encoder. ({})", encoding)[..],
).into())
}
};
let data = match encoding.encode(msg, EncoderTrap::Replace) {
Ok(data) => data,
Err(data) => {
return Err(Error::new(
ErrorKind::InvalidInput,
&format!(
"Failed to encode {} as {}.",
data,
encoding.name()
)
[..],
).into())
}
};
let mut writer = writer.lock().unwrap();
try!(writer.write_all(&data));
writer.flush().map_err(|e| e.into())
}
pub fn recv<T: IrcRead>(reader: &Mutex<T>, encoding: &str) -> Result<String> {
let encoding = match encoding_from_whatwg_label(encoding) {
Some(enc) => enc,
None => {
return Err(Error::new(
ErrorKind::InvalidInput,
&format!("Failed to find decoder. ({})", encoding)[..],
).into())
}
};
let mut buf = Vec::new();
reader
.lock()
.unwrap()
.read_until(b'\n', &mut buf)
.and_then(|_| match encoding.decode(&buf, DecoderTrap::Replace) {
_ if buf.is_empty() => Err(Error::new(ErrorKind::Other, "EOF")),
Ok(data) => Ok(data),
Err(data) => Err(Error::new(
ErrorKind::InvalidInput,
&format!(
"Failed to decode {} as {}.",
data,
encoding.name()
)
[..],
)),
})
.map_err(|e| e.into())
}
}
/// An abstraction over different networked streams.
pub enum NetStream {
/// An unsecured TcpStream.
Unsecured(TcpStream),
}
impl Read for NetStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
NetStream::Unsecured(ref mut stream) => stream.read(buf),
#[cfg(feature = "ssl")]
NetStream::Ssl(ref mut stream) => stream.read(buf),
} }
} }
} }
impl Write for NetStream { impl Sink for Connection {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { type SinkItem = Message;
match *self { type SinkError = error::Error;
NetStream::Unsecured(ref mut stream) => stream.write(buf),
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
match self {
&mut Connection::Unsecured(ref mut inner) => inner.start_send(item),
&mut Connection::Secured(ref mut inner) => inner.start_send(item),
} }
} }
fn flush(&mut self) -> io::Result<()> { fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
match *self { match self {
NetStream::Unsecured(ref mut stream) => stream.flush(), &mut Connection::Unsecured(ref mut inner) => inner.poll_complete(),
&mut Connection::Secured(ref mut inner) => inner.poll_complete(),
} }
} }
} }
#[cfg(test)]
mod test {
use super::{Connection, MockConnection};
use error::Result;
use client::data::Message;
use client::data::Command::PRIVMSG;
fn send_to<C: Connection, M: Into<Message>>(conn: &C, msg: M, encoding: &str) -> Result<()> {
conn.send(&msg.into().to_string(), encoding)
}
#[test]
fn send_utf8() {
let conn = MockConnection::empty();
assert!(
send_to(
&conn,
PRIVMSG("test".to_owned(), "€ŠšŽžŒœŸ".to_owned()),
"UTF-8",
).is_ok()
);
let data = conn.written("UTF-8").unwrap();
assert_eq!(&data[..], "PRIVMSG test :€ŠšŽžŒœŸ\r\n");
}
#[test]
fn send_utf8_str() {
let exp = "PRIVMSG test :€ŠšŽžŒœŸ\r\n";
let conn = MockConnection::empty();
assert!(send_to(&conn, exp, "UTF-8").is_ok());
let data = conn.written("UTF-8").unwrap();
assert_eq!(&data[..], exp);
}
#[test]
fn send_iso885915() {
let conn = MockConnection::empty();
assert!(
send_to(
&conn,
PRIVMSG("test".to_owned(), "€ŠšŽžŒœŸ".to_owned()),
"l9",
).is_ok()
);
let data = conn.written("l9").unwrap();
assert_eq!(&data[..], "PRIVMSG test :€ŠšŽžŒœŸ\r\n");
}
#[test]
fn send_iso885915_str() {
let exp = "PRIVMSG test :€ŠšŽžŒœŸ\r\n";
let conn = MockConnection::empty();
assert!(send_to(&conn, exp, "l9").is_ok());
let data = conn.written("l9").unwrap();
assert_eq!(&data[..], exp);
}
#[test]
fn recv_utf8() {
let conn = MockConnection::new("PRIVMSG test :Testing!\r\n");
assert_eq!(
&conn.recv("UTF-8").unwrap()[..],
"PRIVMSG test :Testing!\r\n"
);
}
#[test]
fn recv_iso885915() {
let data = [0xA4, 0xA6, 0xA8, 0xB4, 0xB8, 0xBC, 0xBD, 0xBE];
let conn = MockConnection::from_byte_vec({
let mut vec = Vec::new();
vec.extend("PRIVMSG test :".as_bytes());
vec.extend(data.iter());
vec.extend("\r\n".as_bytes());
vec.into_iter().collect::<Vec<_>>()
});
assert_eq!(
&conn.recv("l9").unwrap()[..],
"PRIVMSG test :€ŠšŽžŒœŸ\r\n"
);
}
}

View file

@ -1,6 +1,5 @@
//! A simple, thread-safe IRC client library. //! A simple, thread-safe IRC client library.
pub mod async;
pub mod conn; pub mod conn;
pub mod data; pub mod data;
pub mod server; pub mod server;

View file

@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::thread; use std::thread;
use error; use error;
use client::async::Connection; use client::conn::Connection;
use client::data::{Command, Config, Message, Response, User}; use client::data::{Command, Config, Message, Response, User};
use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PONG, PRIVMSG, MODE, QUIT}; use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PONG, PRIVMSG, MODE, QUIT};
use client::server::utils::ServerExt; use client::server::utils::ServerExt;