Implemented a major redesign to simplify API and eliminate

overly-complicated generics usage.
This commit is contained in:
Aaron Weiss 2016-02-10 00:15:08 -05:00
parent bacf6d019e
commit 3314c93c3e
5 changed files with 214 additions and 167 deletions

View file

@ -1,35 +1,71 @@
//! Thread-safe connections on IrcStreams. //! Thread-safe connections on IrcStreams.
#[cfg(feature = "ssl")] use std::error::Error as StdError; #[cfg(feature = "ssl")] use std::error::Error as StdError;
use std::io::prelude::*; use std::io::prelude::*;
use std::io::{BufReader, BufWriter, Cursor, Empty, Result, Sink}; use std::io::{BufReader, BufWriter, Cursor, Result};
use std::io::Error; use std::io::Error;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::net::TcpStream; use std::net::TcpStream;
#[cfg(feature = "ssl")] use std::result::Result as StdResult; #[cfg(feature = "ssl")] use std::result::Result as StdResult;
use std::sync::{Mutex, MutexGuard}; use std::sync::Mutex;
#[cfg(feature = "encode")] use encoding::{DecoderTrap, EncoderTrap, Encoding}; #[cfg(feature = "encode")] use encoding::{DecoderTrap, EncoderTrap, Encoding};
#[cfg(feature = "encode")] use encoding::label::encoding_from_whatwg_label; #[cfg(feature = "encode")] use encoding::label::encoding_from_whatwg_label;
use client::data::Message;
use client::data::kinds::{IrcRead, IrcWrite};
#[cfg(feature = "ssl")] use openssl::ssl::{SslContext, SslMethod, SslStream}; #[cfg(feature = "ssl")] use openssl::ssl::{SslContext, SslMethod, SslStream};
#[cfg(feature = "ssl")] use openssl::ssl::error::SslError; #[cfg(feature = "ssl")] use openssl::ssl::error::SslError;
/// A thread-safe connection. /// A connection.
pub struct Connection<T: IrcRead, U: IrcWrite> { pub trait Connection {
reader: Mutex<T>, /// Sends a message over this connection.
writer: Mutex<U>, #[cfg(feature = "encode")]
fn send(&self, msg: &str, encoding: &str) -> Result<()>;
/// Sends a message over this connection.
#[cfg(not(feature = "encode"))]
fn send(&self, msg: &str) -> Result<()>;
/// Receives a single line from this connection.
#[cfg(feature = "encoding")]
fn recv(&self, encoding: &str) -> Result<String>;
/// Receives a single line from this connection.
#[cfg(not(feature = "encoding"))]
fn recv(&self) -> Result<String>;
/// Gets the full record of all sent messages if the Connection records this.
/// This is intended for use in writing tests.
fn written(&self) -> Option<String>;
/// Re-establishes this connection, disconnecting from the existing case if necessary.
fn reconnect(&self) -> Result<()>;
} }
/// A Connection over a buffered NetStream.
pub type NetConnection = Connection<BufReader<NetStream>, BufWriter<NetStream>>;
/// An internal type
type NetReadWritePair = (BufReader<NetStream>, BufWriter<NetStream>);
impl Connection<BufReader<NetStream>, BufWriter<NetStream>> { /// Useful internal type definitions.
type NetReader = BufReader<NetStream>;
type NetWriter = BufWriter<NetStream>;
type NetReadWritePair = (NetReader, NetWriter);
/// A thread-safe connection over a buffered NetStream.
pub struct NetConnection {
host: Mutex<String>,
port: Mutex<u16>,
reader: Mutex<NetReader>,
writer: Mutex<NetWriter>,
}
impl NetConnection {
fn new(host: &str, port: u16, reader: NetReader, writer: NetWriter) -> NetConnection {
NetConnection {
host: Mutex::new(host.to_owned()),
port: Mutex::new(port),
reader: Mutex::new(reader),
writer: Mutex::new(writer),
}
}
/// Creates a thread-safe TCP connection to the specified server. /// Creates a thread-safe TCP connection to the specified server.
pub fn connect(host: &str, port: u16) -> Result<NetConnection> { pub fn connect(host: &str, port: u16) -> Result<NetConnection> {
let (reader, writer) = try!(Connection::connect_internal(host, port)); let (reader, writer) = try!(NetConnection::connect_internal(host, port));
Ok(Connection::new(reader, writer)) Ok(NetConnection::new(host, port, reader, writer))
} }
/// connects to the specified server and returns a reader-writer pair. /// connects to the specified server and returns a reader-writer pair.
@ -42,8 +78,8 @@ impl Connection<BufReader<NetStream>, BufWriter<NetStream>> {
/// Creates a thread-safe TCP connection to the specified server over SSL. /// Creates a thread-safe TCP connection to the specified server over SSL.
/// If the library is compiled without SSL support, this method panics. /// If the library is compiled without SSL support, this method panics.
pub fn connect_ssl(host: &str, port: u16) -> Result<NetConnection> { pub fn connect_ssl(host: &str, port: u16) -> Result<NetConnection> {
let (reader, writer) = try!(Connection::connect_ssl_internal(host, port)); let (reader, writer) = try!(NetConnection::connect_ssl_internal(host, port));
Ok(Connection::new(reader, writer)) Ok(NetConnection::new(host, port, reader, writer))
} }
/// Connects over SSL to the specified server and returns a reader-writer pair. /// Connects over SSL to the specified server and returns a reader-writer pair.
@ -61,46 +97,29 @@ impl Connection<BufReader<NetStream>, BufWriter<NetStream>> {
fn connect_ssl_internal(host: &str, port: u16) -> Result<NetReadWritePair> { fn connect_ssl_internal(host: &str, port: u16) -> Result<NetReadWritePair> {
panic!("Cannot connect to {}:{} over SSL without compiling with SSL support.", host, port) panic!("Cannot connect to {}:{} over SSL without compiling with SSL support.", host, port)
} }
/*
FIXME: removed until set_keepalive is stabilized.
/// Sets the keepalive for the network stream.
#[unstable = "Rust IO has not stabilized."]
pub fn set_keepalive(&self, delay_in_seconds: Option<u32>) -> Result<()> {
self.mod_stream(|tcp| tcp.set_keepalive(delay_in_seconds))
}
/// Modifies the internal TcpStream using a function.
fn mod_stream<F>(&self, f: F) -> Result<()> where F: FnOnce(&mut TcpStream) -> Result<()> {
match self.reader.lock().unwrap().get_mut() {
&mut NetStream::UnsecuredTcpStream(ref mut tcp) => f(tcp),
#[cfg(feature = "ssl")]
&mut NetStream::SslTcpStream(ref mut ssl) => f(ssl.get_mut()),
}
}
*/
} }
impl<T: IrcRead, U: IrcWrite> Connection<T, U> { /// Converts a Result<T, SslError> into an Result<T>.
/// Creates a new connection from an IrcReader and an IrcWriter. #[cfg(feature = "ssl")]
pub fn new(reader: T, writer: U) -> Connection<T, U> { fn ssl_to_io<T>(res: StdResult<T, SslError>) -> Result<T> {
Connection { match res {
reader: Mutex::new(reader), Ok(x) => Ok(x),
writer: Mutex::new(writer), Err(e) => Err(Error::new(ErrorKind::Other,
} &format!("An SSL error occurred. ({})", e.description())[..]
)),
} }
}
/// Sends a Message over this connection. impl Connection for NetConnection {
#[cfg(feature = "encode")] #[cfg(feature = "encode")]
pub fn send<M: Into<Message>>(&self, to_msg: M, encoding: &str) -> Result<()> { fn send(&self, msg: &str, encoding: &str) -> Result<()> {
let encoding = match encoding_from_whatwg_label(encoding) { let encoding = match encoding_from_whatwg_label(encoding) {
Some(enc) => enc, Some(enc) => enc,
None => return Err(Error::new( None => return Err(Error::new(
ErrorKind::InvalidInput, &format!("Failed to find encoder. ({})", encoding)[..] ErrorKind::InvalidInput, &format!("Failed to find encoder. ({})", encoding)[..]
)) ))
}; };
let msg: Message = to_msg.into(); let data = match encoding.encode(msg, EncoderTrap::Replace) {
let data = match encoding.encode(&msg.into_string(), EncoderTrap::Replace) {
Ok(data) => data, Ok(data) => data,
Err(data) => return Err(Error::new(ErrorKind::InvalidInput, Err(data) => return Err(Error::new(ErrorKind::InvalidInput,
&format!("Failed to encode {} as {}.", data, encoding.name())[..] &format!("Failed to encode {} as {}.", data, encoding.name())[..]
@ -111,18 +130,15 @@ impl<T: IrcRead, U: IrcWrite> Connection<T, U> {
writer.flush() writer.flush()
} }
/// Sends a message over this connection.
#[cfg(not(feature = "encode"))] #[cfg(not(feature = "encode"))]
pub fn send<M: Into<Message>>(&self, to_msg: M) -> Result<()> { fn send(&self, msg: &str) -> Result<()> {
let mut writer = self.writer.lock().unwrap(); let mut writer = self.writer.lock().unwrap();
let msg: Message = to_msg.into(); try!(writer.write_all(msg.as_bytes()));
try!(writer.write_all(&msg.into_string().as_bytes()));
writer.flush() writer.flush()
} }
/// Receives a single line from this connection.
#[cfg(feature = "encoding")] #[cfg(feature = "encoding")]
pub fn recv(&self, encoding: &str) -> Result<String> { fn recv(&self, encoding: &str) -> Result<String> {
let encoding = match encoding_from_whatwg_label(encoding) { let encoding = match encoding_from_whatwg_label(encoding) {
Some(enc) => enc, Some(enc) => enc,
None => return Err(Error::new( None => return Err(Error::new(
@ -141,9 +157,8 @@ impl<T: IrcRead, U: IrcWrite> Connection<T, U> {
) )
} }
/// Receives a single line from this connection.
#[cfg(not(feature = "encoding"))] #[cfg(not(feature = "encoding"))]
pub fn recv(&self) -> Result<String> { fn recv(&self) -> Result<String> {
let mut ret = String::new(); let mut ret = String::new();
try!(self.reader.lock().unwrap().read_line(&mut ret)); try!(self.reader.lock().unwrap().read_line(&mut ret));
if ret.is_empty() { if ret.is_empty() {
@ -153,55 +168,22 @@ impl<T: IrcRead, U: IrcWrite> Connection<T, U> {
} }
} }
/// Acquires the Reader lock. fn written(&self) -> Option<String> {
pub fn reader<'a>(&'a self) -> MutexGuard<'a, T> { None
self.reader.lock().unwrap()
} }
/// Acquires the Writer lock. fn reconnect(&self) -> Result<()> {
pub fn writer<'a>(&'a self) -> MutexGuard<'a, U> {
self.writer.lock().unwrap()
}
}
/// Converts a Result<T, SslError> into an Result<T>.
#[cfg(feature = "ssl")]
fn ssl_to_io<T>(res: StdResult<T, SslError>) -> Result<T> {
match res {
Ok(x) => Ok(x),
Err(e) => Err(Error::new(ErrorKind::Other,
&format!("An SSL error occurred. ({})", e.description())[..]
)),
}
}
/// A trait defining the ability to reconnect.
pub trait Reconnect {
/// Reconnects to the specified host and port, dropping the current connection if necessary.
fn reconnect(&self, host: &str, port: u16) -> Result<()>;
}
macro_rules! noop_reconnect {
($T:ty, $U:ty) => {
impl Reconnect for Connection<$T, $U> {
fn reconnect(&self, _: &str, _: u16) -> Result<()> {
Ok(())
}
}
}
}
impl Reconnect for NetConnection {
fn reconnect(&self, host: &str, port: u16) -> Result<()> {
let use_ssl = match self.reader.lock().unwrap().get_ref() { let use_ssl = match self.reader.lock().unwrap().get_ref() {
&NetStream::UnsecuredTcpStream(_) => false, &NetStream::UnsecuredTcpStream(_) => false,
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
&NetStream::SslTcpStream(_) => true, &NetStream::SslTcpStream(_) => true,
}; };
let host = self.host.lock().unwrap();
let port = self.port.lock().unwrap();
let (reader, writer) = if use_ssl { let (reader, writer) = if use_ssl {
try!(Connection::connect_ssl_internal(host, port)) try!(NetConnection::connect_ssl_internal(&host, *port))
} else { } else {
try!(Connection::connect_internal(host, port)) try!(NetConnection::connect_internal(&host, *port))
}; };
*self.reader.lock().unwrap() = reader; *self.reader.lock().unwrap() = reader;
*self.writer.lock().unwrap() = writer; *self.writer.lock().unwrap() = writer;
@ -209,11 +191,95 @@ impl Reconnect for NetConnection {
} }
} }
// TODO: replace all this with specialization when possible. :\ /// A mock connection for testing purposes.
noop_reconnect!(Cursor<Vec<u8>>, Vec<u8>); pub struct MockConnection {
noop_reconnect!(Cursor<Vec<u8>>, Sink); reader: Mutex<Cursor<Vec<u8>>>,
noop_reconnect!(BufReader<Empty>, Vec<u8>); writer: Mutex<Vec<u8>>,
noop_reconnect!(BufReader<Empty>, Sink); }
impl MockConnection {
/// Creates a new mock connection with the specified string in the read buffer.
pub fn new(input: &str) -> MockConnection {
MockConnection::from_byte_vec(input.as_bytes().to_vec())
}
/// Creates a new mock connection with the specified bytes in the read buffer.
pub fn from_byte_vec(input: Vec<u8>) -> MockConnection {
MockConnection {
reader: Mutex::new(Cursor::new(input)),
writer: Mutex::new(Vec::new()),
}
}
}
impl Connection for MockConnection {
#[cfg(feature = "encode")]
fn send(&self, msg: &str, encoding: &str) -> Result<()> {
let encoding = match encoding_from_whatwg_label(encoding) {
Some(enc) => enc,
None => return Err(Error::new(
ErrorKind::InvalidInput, &format!("Failed to find encoder. ({})", encoding)[..]
))
};
let data = match encoding.encode(msg, EncoderTrap::Replace) {
Ok(data) => data,
Err(data) => return Err(Error::new(ErrorKind::InvalidInput,
&format!("Failed to encode {} as {}.", data, encoding.name())[..]
))
};
let mut writer = self.writer.lock().unwrap();
try!(writer.write_all(&data));
writer.flush()
}
#[cfg(not(feature = "encode"))]
fn send(&self, msg: &str) -> Result<()> {
let mut writer = self.writer.lock().unwrap();
try!(writer.write_all(msg.as_bytes()));
writer.flush()
}
#[cfg(feature = "encoding")]
fn recv(&self, encoding: &str) -> Result<String> {
let encoding = match encoding_from_whatwg_label(encoding) {
Some(enc) => enc,
None => return Err(Error::new(
ErrorKind::InvalidInput, &format!("Failed to find decoder. ({})", encoding)[..]
))
};
let mut buf = Vec::new();
self.reader.lock().unwrap().read_until(b'\n', &mut buf).and_then(|_|
match encoding.decode(&buf, DecoderTrap::Replace) {
_ if buf.is_empty() => Err(Error::new(ErrorKind::Other, "EOF")),
Ok(data) => Ok(data),
Err(data) => return Err(Error::new(ErrorKind::InvalidInput,
&format!("Failed to decode {} as {}.", data, encoding.name())[..]
))
}
)
}
#[cfg(not(feature = "encoding"))]
fn recv(&self) -> Result<String> {
let mut ret = String::new();
try!(self.reader.lock().unwrap().read_line(&mut ret));
if ret.is_empty() {
Err(Error::new(ErrorKind::Other, "EOF"))
} else {
Ok(ret)
}
}
fn written(&self) -> Option<String> {
String::from_utf8(self.writer.lock().unwrap().clone()).ok()
}
fn reconnect(&self) -> Result<()> {
Ok(())
}
}
/// An abstraction over different networked streams. /// An abstraction over different networked streams.
pub enum NetStream { pub enum NetStream {

View file

@ -174,7 +174,7 @@ impl Config {
self.ping_timeout.as_ref().map(|t| *t).unwrap_or(10) self.ping_timeout.as_ref().map(|t| *t).unwrap_or(10)
} }
/// Gets whether or not to use NickServ GHOST /// Gets whether or not to attempt nickname reclamation using NickServ GHOST.
/// This defaults to false when not specified. /// This defaults to false when not specified.
pub fn should_ghost(&self) -> bool { pub fn should_ghost(&self) -> bool {
self.should_ghost.as_ref().map(|u| *u).unwrap_or(false) self.should_ghost.as_ref().map(|u| *u).unwrap_or(false)

View file

@ -14,6 +14,7 @@ pub mod kinds {
/// Trait describing all possible Writers for this library. /// Trait describing all possible Writers for this library.
pub trait IrcWrite: Write + Sized + Send + 'static {} pub trait IrcWrite: Write + Sized + Send + 'static {}
impl<T> IrcWrite for T where T: Write + Sized + Send + 'static {} impl<T> IrcWrite for T where T: Write + Sized + Send + 'static {}
/// Trait describing all possible Readers for this library. /// Trait describing all possible Readers for this library.
pub trait IrcRead: BufRead + Sized + Send + 'static {} pub trait IrcRead: BufRead + Sized + Send + 'static {}
impl<T> IrcRead for T where T: BufRead + Sized + Send + 'static {} impl<T> IrcRead for T where T: BufRead + Sized + Send + 'static {}

View file

@ -5,28 +5,30 @@ use std::borrow::ToOwned;
use std::cell::Cell; use std::cell::Cell;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error as StdError; use std::error::Error as StdError;
use std::io::{BufReader, BufWriter, Error, ErrorKind, Result}; use std::io::{Error, ErrorKind, Result};
use std::path::Path; use std::path::Path;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
use std::thread::{JoinHandle, spawn}; use std::thread::{JoinHandle, spawn};
use client::conn::{Connection, NetStream, Reconnect}; use client::conn::{Connection, NetConnection};
use client::data::{Command, Config, Message, Response, User}; use client::data::{Command, Config, Message, Response, User};
use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PRIVMSG, MODE}; use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PRIVMSG, MODE};
use client::data::kinds::{IrcRead, IrcWrite};
use client::server::utils::ServerExt; use client::server::utils::ServerExt;
use time::{Duration, Timespec, Tm, now}; use time::{Duration, Timespec, Tm, now};
pub mod utils; pub mod utils;
/// Trait describing core Server functionality. /// An interface for interacting with an IRC server.
pub trait Server<'a, T: IrcRead, U: IrcWrite> { pub trait Server {
/// Gets the configuration being used with this Server. /// Gets the configuration being used with this Server.
fn config(&self) -> &Config; fn config(&self) -> &Config;
/// Sends a Command to this Server. /// Sends a Command to this Server.
fn send<M: Into<Message>>(&self, message: M) -> Result<()> where Self: Sized; fn send<M: Into<Message>>(&self, message: M) -> Result<()> where Self: Sized;
/// Gets an Iterator over Messages received by this Server.
fn iter(&'a self) -> ServerIterator<'a, T, U>; /// Gets an iterator over received messages.
fn iter<'a>(&'a self) -> Box<Iterator<Item = Result<Message>> + 'a>;
/// Gets a list of Users in the specified channel. This will be none if the channel is not /// Gets a list of Users in the specified channel. This will be none if the channel is not
/// being tracked, or if tracking is not supported altogether. For best results, be sure to /// being tracked, or if tracking is not supported altogether. For best results, be sure to
/// request `multi-prefix` support from the server. /// request `multi-prefix` support from the server.
@ -34,21 +36,21 @@ pub trait Server<'a, T: IrcRead, U: IrcWrite> {
} }
/// A thread-safe implementation of an IRC Server connection. /// A thread-safe implementation of an IRC Server connection.
pub struct IrcServer<T: IrcRead, U: IrcWrite> { pub struct IrcServer {
/// The channel for sending messages to write. /// The channel for sending messages to write.
tx: Sender<Message>, tx: Sender<Message>,
/// The internal, thread-safe server state. /// The internal, thread-safe server state.
state: Arc<ServerState<T, U>>, state: Arc<ServerState>,
/// A thread-local count of reconnection attempts used for synchronization. /// A thread-local count of reconnection attempts used for synchronization.
reconnect_count: Cell<u32>, reconnect_count: Cell<u32>,
} }
/// Thread-safe internal state for an IRC server connection. /// Thread-safe internal state for an IRC server connection.
struct ServerState<T: IrcRead, U: IrcWrite> { struct ServerState {
/// A global copy of the channel for sending messages to write. /// A global copy of the channel for sending messages to write.
tx: Mutex<Option<Sender<Message>>>, tx: Mutex<Option<Sender<Message>>>,
/// The thread-safe IRC connection. /// The thread-safe IRC connection.
conn: Connection<T, U>, conn: Box<Connection + Send + Sync>,
/// The handle for the message sending thread. /// The handle for the message sending thread.
write_handle: Mutex<Option<JoinHandle<()>>>, write_handle: Mutex<Option<JoinHandle<()>>>,
/// The configuration used with this connection. /// The configuration used with this connection.
@ -65,11 +67,11 @@ struct ServerState<T: IrcRead, U: IrcWrite> {
last_ping_data: Mutex<Option<Timespec>>, last_ping_data: Mutex<Option<Timespec>>,
} }
impl<T: IrcRead, U: IrcWrite> ServerState<T, U> where Connection<T, U>: Reconnect { impl ServerState {
fn new(conn: Connection<T, U>, config: Config) -> ServerState<T, U> { fn new<C>(conn: C, config: Config) -> ServerState where C: Connection + Send + Sync + 'static {
ServerState { ServerState {
tx: Mutex::new(None), tx: Mutex::new(None),
conn: conn, conn: Box::new(conn),
write_handle: Mutex::new(None), write_handle: Mutex::new(None),
config: config, config: config,
chanlists: Mutex::new(HashMap::new()), chanlists: Mutex::new(HashMap::new()),
@ -81,7 +83,7 @@ impl<T: IrcRead, U: IrcWrite> ServerState<T, U> where Connection<T, U>: Reconnec
} }
fn reconnect(&self) -> Result<()> { fn reconnect(&self) -> Result<()> {
self.conn.reconnect(self.config.server(), self.config.port()) self.conn.reconnect()
} }
fn action_taken(&self) { fn action_taken(&self) {
@ -108,30 +110,27 @@ impl<T: IrcRead, U: IrcWrite> ServerState<T, U> where Connection<T, U>: Reconnec
} }
} }
/// An IrcServer over a buffered NetStream. impl IrcServer {
pub type NetIrcServer = IrcServer<BufReader<NetStream>, BufWriter<NetStream>>;
impl IrcServer<BufReader<NetStream>, BufWriter<NetStream>> {
/// Creates a new IRC Server connection from the configuration at the specified path, /// Creates a new IRC Server connection from the configuration at the specified path,
/// connecting immediately. /// connecting immediately.
pub fn new<P: AsRef<Path>>(config: P) -> Result<NetIrcServer> { pub fn new<P: AsRef<Path>>(config: P) -> Result<IrcServer> {
IrcServer::from_config(try!(Config::load(config))) IrcServer::from_config(try!(Config::load(config)))
} }
/// Creates a new IRC server connection from the specified configuration, connecting /// Creates a new IRC server connection from the specified configuration, connecting
/// immediately. /// immediately.
pub fn from_config(config: Config) -> Result<NetIrcServer> { pub fn from_config(config: Config) -> Result<IrcServer> {
let conn = try!(if config.use_ssl() { let conn = try!(if config.use_ssl() {
Connection::connect_ssl(config.server(), config.port()) NetConnection::connect_ssl(config.server(), config.port())
} else { } else {
Connection::connect(config.server(), config.port()) NetConnection::connect(config.server(), config.port())
}); });
Ok(IrcServer::from_connection(config, conn)) Ok(IrcServer::from_connection(config, conn))
} }
} }
impl<T: IrcRead, U: IrcWrite> Clone for IrcServer<T, U> { impl Clone for IrcServer {
fn clone(&self) -> IrcServer<T, U> { fn clone(&self) -> IrcServer {
IrcServer { IrcServer {
tx: self.tx.clone(), tx: self.tx.clone(),
state: self.state.clone(), state: self.state.clone(),
@ -140,7 +139,7 @@ impl<T: IrcRead, U: IrcWrite> Clone for IrcServer<T, U> {
} }
} }
impl<T: IrcRead, U: IrcWrite> Drop for ServerState<T, U> { impl Drop for ServerState {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.tx.lock().unwrap().take(); let _ = self.tx.lock().unwrap().take();
let mut guard = self.write_handle.lock().unwrap(); let mut guard = self.write_handle.lock().unwrap();
@ -150,7 +149,7 @@ impl<T: IrcRead, U: IrcWrite> Drop for ServerState<T, U> {
} }
} }
impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for ServerState<T, U> where Connection<T, U>: Reconnect { impl<'a> Server for ServerState {
fn config(&self) -> &Config { fn config(&self) -> &Config {
&self.config &self.config
} }
@ -164,7 +163,7 @@ impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for ServerState<T, U> where C
} }
} }
fn iter(&'a self) -> ServerIterator<'a, T, U> { fn iter(&self) -> Box<Iterator<Item = Result<Message>>> {
panic!("unimplemented") panic!("unimplemented")
} }
@ -180,7 +179,7 @@ impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for ServerState<T, U> where C
} }
} }
impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for IrcServer<T, U> where Connection<T, U>: Reconnect { impl Server for IrcServer {
fn config(&self) -> &Config { fn config(&self) -> &Config {
&self.state.config &self.state.config
} }
@ -189,8 +188,8 @@ impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for IrcServer<T, U> where Con
self.tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e)) self.tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e))
} }
fn iter(&'a self) -> ServerIterator<'a, T, U> { fn iter<'a>(&'a self) -> Box<Iterator<Item = Result<Message>> + 'a> {
ServerIterator::new(self) Box::new(ServerIterator::new(self))
} }
#[cfg(not(feature = "nochanlists"))] #[cfg(not(feature = "nochanlists"))]
@ -205,9 +204,10 @@ impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for IrcServer<T, U> where Con
} }
} }
impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> where Connection<T, U>: Reconnect { impl IrcServer {
/// Creates an IRC server from the specified configuration, and any arbitrary Connection. /// Creates an IRC server from the specified configuration, and any arbitrary sync connection.
pub fn from_connection(config: Config, conn: Connection<T, U>) -> IrcServer<T, U> { pub fn from_connection<C>(config: Config, conn: C) -> IrcServer
where C: Connection + Send + Sync + 'static {
let (tx, rx): (Sender<Message>, Receiver<Message>) = channel(); let (tx, rx): (Sender<Message>, Receiver<Message>) = channel();
let state = Arc::new(ServerState::new(conn, config)); let state = Arc::new(ServerState::new(conn, config));
let weak = Arc::downgrade(&state); let weak = Arc::downgrade(&state);
@ -251,11 +251,11 @@ impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> where Connection<T, U>: Reconnect
} }
/// Gets a reference to the IRC server's connection. /// Gets a reference to the IRC server's connection.
pub fn conn(&self) -> &Connection<T, U> { pub fn conn(&self) -> &Box<Connection + Send + Sync> {
&self.state.conn &self.state.conn
} }
/// Reconnects to the IRC server. /// Reconnects to the IRC server, disconnecting if necessary.
pub fn reconnect(&self) -> Result<()> { pub fn reconnect(&self) -> Result<()> {
let mut reconnect_count = self.state.reconnect_count.lock().unwrap(); let mut reconnect_count = self.state.reconnect_count.lock().unwrap();
let res = if self.reconnect_count.get() == *reconnect_count { let res = if self.reconnect_count.get() == *reconnect_count {
@ -269,13 +269,13 @@ impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> where Connection<T, U>: Reconnect
} }
#[cfg(feature = "encode")] #[cfg(feature = "encode")]
fn write<M: Into<Message>>(state: &Arc<ServerState<T, U>>, msg: M) -> Result<()> { fn write<M: Into<Message>>(state: &Arc<ServerState>, msg: M) -> Result<()> {
state.conn.send(msg, state.config.encoding()) state.conn.send(&msg.into().into_string(), state.config.encoding())
} }
#[cfg(not(feature = "encode"))] #[cfg(not(feature = "encode"))]
fn write<M: Into<Message>>(state: &Arc<ServerState<T, U>>, msg: M) -> Result<()> { fn write<M: Into<Message>>(state: &Arc<ServerState>, msg: M) -> Result<()> {
state.conn.send(msg) state.conn.send(&msg.into().into_string())
} }
/// Returns a reference to the server state's channel lists. /// Returns a reference to the server state's channel lists.
@ -415,33 +415,14 @@ impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> where Connection<T, U>: Reconnect
} }
} }
impl<T: IrcRead, U: IrcWrite + Clone> IrcServer<T, U> where Connection<T, U>: Reconnect {
/// Returns a copy of the server's connection after waiting for all pending messages to be
/// written. This function may cause unusual behavior when called on a server with operations
/// being performed on other threads. This function is destructive, and is primarily intended
/// for writing unit tests. Use it with care.
pub fn extract_writer(mut self) -> U {
let _ = self.state.tx.lock().unwrap().take();
// This is a terrible hack to get the real channel to drop.
// Otherwise, joining would never finish.
let (tx, _) = channel();
self.tx = tx;
let mut guard = self.state.write_handle.lock().unwrap();
if let Some(handle) = guard.take() {
handle.join().unwrap()
}
self.conn().writer().clone()
}
}
/// An Iterator over an IrcServer's incoming Messages. /// An Iterator over an IrcServer's incoming Messages.
pub struct ServerIterator<'a, T: IrcRead + 'a, U: IrcWrite + 'a> { pub struct ServerIterator<'a> {
server: &'a IrcServer<T, U> server: &'a IrcServer
} }
impl<'a, T: IrcRead + 'a, U: IrcWrite + 'a> ServerIterator<'a, T, U> where Connection<T, U>: Reconnect { impl<'a> ServerIterator<'a> {
/// Creates a new ServerIterator for the desired IrcServer. /// Creates a new ServerIterator for the desired IrcServer.
pub fn new(server: &IrcServer<T, U>) -> ServerIterator<T, U> { pub fn new(server: &'a IrcServer) -> ServerIterator {
ServerIterator { server: server } ServerIterator { server: server }
} }
@ -458,7 +439,7 @@ impl<'a, T: IrcRead + 'a, U: IrcWrite + 'a> ServerIterator<'a, T, U> where Conne
} }
} }
impl<'a, T: IrcRead + 'a, U: IrcWrite + 'a> Iterator for ServerIterator<'a, T, U> where Connection<T, U>: Reconnect { impl<'a> Iterator for ServerIterator<'a> {
type Item = Result<Message>; type Item = Result<Message>;
fn next(&mut self) -> Option<Result<Message>> { fn next(&mut self) -> Option<Result<Message>> {
loop { loop {

View file

@ -5,12 +5,11 @@ use client::data::{Capability, NegotiationVersion};
use client::data::Command::{AUTHENTICATE, CAP, INVITE, JOIN, KICK, KILL, MODE, NICK, NOTICE}; use client::data::Command::{AUTHENTICATE, CAP, INVITE, JOIN, KICK, KILL, MODE, NICK, NOTICE};
use client::data::Command::{OPER, PASS, PONG, PRIVMSG, QUIT, SAMODE, SANICK, TOPIC, USER}; use client::data::Command::{OPER, PASS, PONG, PRIVMSG, QUIT, SAMODE, SANICK, TOPIC, USER};
use client::data::command::CapSubCommand::{END, LS, REQ}; use client::data::command::CapSubCommand::{END, LS, REQ};
use client::data::kinds::{IrcRead, IrcWrite};
#[cfg(feature = "ctcp")] use time::get_time; #[cfg(feature = "ctcp")] use time::get_time;
use client::server::Server; use client::server::Server;
/// Extensions for Server capabilities that make it easier to work directly with the protocol. /// Extensions for Server capabilities that make it easier to work directly with the protocol.
pub trait ServerExt<'a, T: IrcRead, U: IrcWrite>: Server<'a, T, U> { pub trait ServerExt: Server {
/// Sends a request for a list of server capabilities for a specific IRCv3 version. /// Sends a request for a list of server capabilities for a specific IRCv3 version.
fn send_cap_ls(&self, version: NegotiationVersion) -> Result<()> where Self: Sized { fn send_cap_ls(&self, version: NegotiationVersion) -> Result<()> where Self: Sized {
self.send(CAP(None, LS, match version { self.send(CAP(None, LS, match version {
@ -219,7 +218,7 @@ pub trait ServerExt<'a, T: IrcRead, U: IrcWrite>: Server<'a, T, U> {
} }
} }
impl<'a, T: IrcRead, U: IrcWrite, K: Server<'a, T, U>> ServerExt<'a, T, U> for K {} impl<S> ServerExt for S where S: Server {}
#[cfg(test)] #[cfg(test)]
mod test { mod test {