Cleaned up code and added documentation.

This commit is contained in:
Aaron Weiss 2017-06-21 17:15:30 -04:00
parent 073b82feec
commit 3369ef5ff2
No known key found for this signature in database
GPG key ID: 0237035D9BF03AE2
9 changed files with 166 additions and 106 deletions

View file

@ -13,7 +13,9 @@ fn main() {
}; };
let server = IrcServer::from_config(config).unwrap(); let server = IrcServer::from_config(config).unwrap();
server.identify().unwrap(); server.identify().unwrap();
server.stream().for_each(|message| { server
.stream()
.for_each(|message| {
print!("{}", message); print!("{}", message);
match message.command { match message.command {
Command::PRIVMSG(ref target, ref msg) => { Command::PRIVMSG(ref target, ref msg) => {
@ -24,5 +26,7 @@ fn main() {
_ => (), _ => (),
} }
Ok(()) Ok(())
}).wait().unwrap() })
.wait()
.unwrap()
} }

View file

@ -14,7 +14,9 @@ fn main() {
}; };
let server = IrcServer::from_config(config).unwrap(); let server = IrcServer::from_config(config).unwrap();
server.identify().unwrap(); server.identify().unwrap();
server.stream().for_each(|message| { server
.stream()
.for_each(|message| {
print!("{}", message); print!("{}", message);
match message.command { match message.command {
Command::PRIVMSG(ref target, ref msg) => { Command::PRIVMSG(ref target, ref msg) => {
@ -25,5 +27,7 @@ fn main() {
_ => (), _ => (),
} }
Ok(()) Ok(())
}).wait().unwrap() })
.wait()
.unwrap()
} }

View file

@ -1,37 +1,45 @@
//! A module providing IRC connections for use by `IrcServer`s.
use std::fmt; use std::fmt;
use std::thread;
use std::thread::JoinHandle;
use error; use error;
use client::data::Config; use client::data::Config;
use client::transport::IrcTransport; use client::transport::IrcTransport;
use proto::{IrcCodec, Message}; use proto::{IrcCodec, Message};
use futures::future;
use futures::{Async, Poll, Future, Sink, StartSend, Stream}; use futures::{Async, Poll, Future, Sink, StartSend, Stream};
use futures::stream::SplitStream;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::sync::mpsc::UnboundedSender;
use native_tls::TlsConnector; use native_tls::TlsConnector;
use tokio_core::reactor::{Core, Handle}; use tokio_core::reactor::Handle;
use tokio_core::net::{TcpStream, TcpStreamNew}; use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
use tokio_tls::{TlsConnectorExt, TlsStream}; use tokio_tls::{TlsConnectorExt, TlsStream};
/// An IRC connection used internally by `IrcServer`.
pub enum Connection { pub enum Connection {
#[doc(hidden)]
Unsecured(IrcTransport<TcpStream>), Unsecured(IrcTransport<TcpStream>),
#[doc(hidden)]
Secured(IrcTransport<TlsStream<TcpStream>>), Secured(IrcTransport<TlsStream<TcpStream>>),
} }
impl fmt::Debug for Connection { impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "IrcConnection") write!(
f,
"{}",
match *self {
Connection::Unsecured(_) => "Connection::Unsecured(...)",
Connection::Secured(_) => "Connection::Secured(...)",
}
)
} }
} }
type TlsFuture = Box<Future<Error=error::Error, Item=TlsStream<TcpStream>> + Send>; /// A convenient type alias representing the TlsStream future.
type TlsFuture = Box<Future<Error = error::Error, Item = TlsStream<TcpStream>> + Send>;
/// A future representing an eventual `Connection`.
pub enum ConnectionFuture<'a> { pub enum ConnectionFuture<'a> {
#[doc(hidden)]
Unsecured(&'a Config, TcpStreamNew), Unsecured(&'a Config, TcpStreamNew),
#[doc(hidden)]
Secured(&'a Config, TlsFuture), Secured(&'a Config, TlsFuture),
} }
@ -58,19 +66,28 @@ impl<'a> Future for ConnectionFuture<'a> {
} }
impl Connection { impl Connection {
/// Creates a new `Connection` using the specified `Config` and `Handle`.
pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result<ConnectionFuture<'a>> { pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result<ConnectionFuture<'a>> {
if config.use_ssl() { if config.use_ssl() {
let domain = format!("{}:{}", config.server(), config.port()); let domain = format!("{}:{}", config.server(), config.port());
let connector = TlsConnector::builder()?.build()?; let connector = TlsConnector::builder()?.build()?;
let stream = TcpStream::connect(&config.socket_addr(), handle).map_err(|e| { let stream = TcpStream::connect(&config.socket_addr(), handle)
.map_err(|e| {
let res: error::Error = e.into(); let res: error::Error = e.into();
res res
}).and_then(move |socket| { })
connector.connect_async(&domain, socket).map_err(|e| e.into()) .and_then(move |socket| {
}).boxed(); connector.connect_async(&domain, socket).map_err(
|e| e.into(),
)
})
.boxed();
Ok(ConnectionFuture::Secured(config, stream)) Ok(ConnectionFuture::Secured(config, stream))
} else { } else {
Ok(ConnectionFuture::Unsecured(config, TcpStream::connect(&config.socket_addr(), handle))) Ok(ConnectionFuture::Unsecured(
config,
TcpStream::connect(&config.socket_addr(), handle),
))
} }
} }
} }

View file

@ -52,11 +52,13 @@ pub struct Config {
pub ping_time: Option<u32>, pub ping_time: Option<u32>,
/// The amount of time in seconds for a client to reconnect due to no ping response. /// The amount of time in seconds for a client to reconnect due to no ping response.
pub ping_timeout: Option<u32>, pub ping_timeout: Option<u32>,
/// Whether the client should use NickServ GHOST to reclaim its primary nickname if it is in use. /// Whether the client should use NickServ GHOST to reclaim its primary nickname if it is in
/// This has no effect if `nick_password` is not set. /// use. This has no effect if `nick_password` is not set.
pub should_ghost: Option<bool>, pub should_ghost: Option<bool>,
/// The command(s) that should be sent to NickServ to recover a nickname. The nickname and password will be appended in that order after the command. /// The command(s) that should be sent to NickServ to recover a nickname. The nickname and
/// E.g. `["RECOVER", "RELEASE"]` means `RECOVER nick pass` and `RELEASE nick pass` will be sent in that order. /// password will be appended in that order after the command.
/// E.g. `["RECOVER", "RELEASE"]` means `RECOVER nick pass` and `RELEASE nick pass` will be sent
/// in that order.
pub ghost_sequence: Option<Vec<String>>, pub ghost_sequence: Option<Vec<String>>,
/// A map of additional options to be stored in config. /// A map of additional options to be stored in config.
pub options: Option<HashMap<String, String>>, pub options: Option<HashMap<String, String>>,
@ -149,7 +151,11 @@ impl Config {
/// Gets the server and port as a `SocketAddr`. /// Gets the server and port as a `SocketAddr`.
/// This panics when server is not specified or the address is malformed. /// This panics when server is not specified or the address is malformed.
pub fn socket_addr(&self) -> SocketAddr { pub fn socket_addr(&self) -> SocketAddr {
format!("{}:{}", self.server(), self.port()).to_socket_addrs().unwrap().next().unwrap() format!("{}:{}", self.server(), self.port())
.to_socket_addrs()
.unwrap()
.next()
.unwrap()
} }
/// Gets the server password specified in the configuration. /// Gets the server password specified in the configuration.

View file

@ -1,23 +1,21 @@
//! Interface for working with IRC Servers. //! Interface for working with IRC Servers.
use std::cell::Cell;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread; use std::thread;
use error; use error;
use client::conn::Connection; use client::conn::Connection;
use client::data::{Command, Config, Message, Response, User}; use client::data::{Command, Config, Message, Response, User};
use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PONG, PRIVMSG, MODE, QUIT}; use client::data::Command::{JOIN, NICK, NICKSERV, PART, PRIVMSG, MODE, QUIT};
use client::server::utils::ServerExt; use client::server::utils::ServerExt;
use futures::{Async, Poll, Future, Sink, StartSend, Stream}; use futures::{Async, Poll, Future, Sink, Stream};
use futures::future; use futures::future;
use futures::stream::{BoxStream, SplitStream}; use futures::stream::SplitStream;
use futures::sync::mpsc; use futures::sync::mpsc;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::sync::mpsc::UnboundedSender; use futures::sync::mpsc::UnboundedSender;
use time; use time;
use tokio_core::reactor::{Core, Handle}; use tokio_core::reactor::Core;
pub mod utils; pub mod utils;
@ -34,7 +32,8 @@ pub trait Server {
/// Gets a stream of incoming messages from the Server. /// Gets a stream of incoming messages from the Server.
fn stream(&self) -> ServerStream; fn stream(&self) -> ServerStream;
/// Gets a list of currently joined channels. This will be none if tracking is not supported altogether. /// Gets a list of currently joined channels. This will be none if tracking is not supported
/// altogether (such as when the `nochanlists` feature is enabled).
fn list_channels(&self) -> Option<Vec<String>>; fn list_channels(&self) -> Option<Vec<String>>;
/// Gets a list of Users in the specified channel. This will be none if the channel is not /// Gets a list of Users in the specified channel. This will be none if the channel is not
@ -43,6 +42,8 @@ pub trait Server {
fn list_users(&self, channel: &str) -> Option<Vec<User>>; fn list_users(&self, channel: &str) -> Option<Vec<User>>;
} }
/// A stream of `Messages` from the `IrcServer`. Interaction with this stream relies on the
/// `futures` API.
pub struct ServerStream { pub struct ServerStream {
state: Arc<ServerState>, state: Arc<ServerState>,
stream: SplitStream<Connection>, stream: SplitStream<Connection>,
@ -86,8 +87,11 @@ impl<'a> Server for ServerState {
where where
Self: Sized, Self: Sized,
{ {
let msg = &msg.into();
try!(self.handle_sent_message(&msg));
Ok((&self.outgoing).send( Ok((&self.outgoing).send(
ServerState::sanitize(&msg.into().to_string()).into(), ServerState::sanitize(&msg.to_string())
.into(),
)?) )?)
} }
@ -128,8 +132,11 @@ impl<'a> Server for ServerState {
} }
impl ServerState { impl ServerState {
fn new(incoming: SplitStream<Connection>, outgoing: UnboundedSender<Message>, config: Config) -> ServerState fn new(
{ incoming: SplitStream<Connection>,
outgoing: UnboundedSender<Message>,
config: Config,
) -> ServerState {
ServerState { ServerState {
config: config, config: config,
chanlists: Mutex::new(HashMap::new()), chanlists: Mutex::new(HashMap::new()),
@ -140,7 +147,8 @@ impl ServerState {
} }
/// Sanitizes the input string by cutting up to (and including) the first occurence of a line /// Sanitizes the input string by cutting up to (and including) the first occurence of a line
/// terminiating phrase (`\r\n`, `\r`, or `\n`). /// terminiating phrase (`\r\n`, `\r`, or `\n`). This is used in sending messages back to
/// prevent the injection of additional commands.
fn sanitize(data: &str) -> &str { fn sanitize(data: &str) -> &str {
// n.b. ordering matters here to prefer "\r\n" over "\r" // n.b. ordering matters here to prefer "\r\n" over "\r"
if let Some((pos, len)) = ["\r\n", "\r", "\n"] if let Some((pos, len)) = ["\r\n", "\r", "\n"]
@ -164,6 +172,17 @@ impl ServerState {
} }
} }
/// Handles sent messages internally for basic client functionality.
fn handle_sent_message(&self, msg: &Message) -> error::Result<()> {
match msg.command {
PART(ref chan, _) => {
let _ = self.chanlists.lock().unwrap().remove(chan);
}
_ => (),
}
Ok(())
}
/// Handles received messages internally for basic client functionality. /// Handles received messages internally for basic client functionality.
fn handle_message(&self, msg: &Message) -> error::Result<()> { fn handle_message(&self, msg: &Message) -> error::Result<()> {
match msg.command { match msg.command {
@ -225,7 +244,7 @@ impl ServerState {
*index += 1; *index += 1;
} }
} }
_ => () _ => (),
} }
Ok(()) Ok(())
} }
@ -355,7 +374,6 @@ impl ServerState {
} }
} }
/// Handles CTCP requests if the CTCP feature is enabled.
#[cfg(feature = "ctcp")] #[cfg(feature = "ctcp")]
fn handle_ctcp(&self, resp: &str, tokens: Vec<&str>) -> error::Result<()> { fn handle_ctcp(&self, resp: &str, tokens: Vec<&str>) -> error::Result<()> {
if tokens.is_empty() { if tokens.is_empty() {
@ -393,13 +411,11 @@ impl ServerState {
} }
} }
/// Sends a CTCP-escaped message.
#[cfg(feature = "ctcp")] #[cfg(feature = "ctcp")]
fn send_ctcp_internal(&self, target: &str, msg: &str) -> error::Result<()> { fn send_ctcp_internal(&self, target: &str, msg: &str) -> error::Result<()> {
self.send_notice(target, &format!("\u{001}{}\u{001}", msg)) self.send_notice(target, &format!("\u{001}{}\u{001}", msg))
} }
/// Handles CTCP requests if the CTCP feature is enabled.
#[cfg(not(feature = "ctcp"))] #[cfg(not(feature = "ctcp"))]
fn handle_ctcp(&self, _: &str, _: Vec<&str>) -> Result<()> { fn handle_ctcp(&self, _: &str, _: Vec<&str>) -> Result<()> {
Ok(()) Ok(())
@ -422,8 +438,6 @@ impl Server for IrcServer {
where where
Self: Sized, Self: Sized,
{ {
let msg = msg.into();
try!(self.handle_sent_message(&msg));
self.state.send(msg) self.state.send(msg)
} }
@ -488,7 +502,10 @@ impl IrcServer {
// Setting up internal processing stuffs. // Setting up internal processing stuffs.
let handle = reactor.handle(); let handle = reactor.handle();
let (sink, stream) = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap().split(); let (sink, stream) = reactor
.run(Connection::new(&cfg, &handle).unwrap())
.unwrap()
.split();
let outgoing_future = sink.send_all(rx_outgoing.map_err(|_| { let outgoing_future = sink.send_all(rx_outgoing.map_err(|_| {
let res: error::Error = error::ErrorKind::ChannelError.into(); let res: error::Error = error::ErrorKind::ChannelError.into();
@ -496,12 +513,7 @@ impl IrcServer {
})); }));
handle.spawn(outgoing_future.map(|_| ()).map_err(|_| ())); handle.spawn(outgoing_future.map(|_| ()).map_err(|_| ()));
// let incoming_future = tx_incoming.sink_map_err(|e| { // Send the stream half back to the original thread.
// let res: error::Error = e.into();
// res
// }).send_all(stream);
// // let incoming_future = stream.forward(tx_incoming);
// handle.spawn(incoming_future.map(|_| ()).map_err(|_| ()));
tx_incoming.send(stream).unwrap(); tx_incoming.send(stream).unwrap();
reactor.run(future::empty::<(), ()>()).unwrap(); reactor.run(future::empty::<(), ()>()).unwrap();
@ -511,18 +523,6 @@ impl IrcServer {
state: Arc::new(ServerState::new(rx_incoming.wait()?, tx_outgoing, config)), state: Arc::new(ServerState::new(rx_incoming.wait()?, tx_outgoing, config)),
}) })
} }
/// Handles sent messages internally for basic client functionality.
fn handle_sent_message(&self, msg: &Message) -> error::Result<()> {
match msg.command {
PART(ref chan, _) => {
let _ = self.state.chanlists.lock().unwrap().remove(chan);
}
_ => (),
}
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]

View file

@ -1,3 +1,4 @@
//! An IRC transport that wraps an IRC-framed stream to provide automatic PING replies.
use std::io; use std::io;
use std::time::Instant; use std::time::Instant;
use error; use error;
@ -7,13 +8,21 @@ use futures::{Async, Poll, Sink, StartSend, Stream};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Framed; use tokio_io::codec::Framed;
pub struct IrcTransport<T> where T: AsyncRead + AsyncWrite { /// An IRC transport that handles automatically replying to PINGs.
pub struct IrcTransport<T>
where
T: AsyncRead + AsyncWrite,
{
inner: Framed<T, IrcCodec>, inner: Framed<T, IrcCodec>,
ping_timeout: u64, ping_timeout: u64,
last_ping: Instant, last_ping: Instant,
} }
impl<T> IrcTransport<T> where T: AsyncRead + AsyncWrite { impl<T> IrcTransport<T>
where
T: AsyncRead + AsyncWrite,
{
/// Creates a new `IrcTransport` from the given IRC stream.
pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> { pub fn new(config: &Config, inner: Framed<T, IrcCodec>) -> IrcTransport<T> {
IrcTransport { IrcTransport {
inner: inner, inner: inner,
@ -23,14 +32,19 @@ impl<T> IrcTransport<T> where T: AsyncRead + AsyncWrite {
} }
} }
impl<T> Stream for IrcTransport<T> where T: AsyncRead + AsyncWrite { impl<T> Stream for IrcTransport<T>
where
T: AsyncRead + AsyncWrite,
{
type Item = Message; type Item = Message;
type Error = error::Error; type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.last_ping.elapsed().as_secs() >= self.ping_timeout { if self.last_ping.elapsed().as_secs() >= self.ping_timeout {
self.close()?; self.close()?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, "Ping timed out.").into()) Err(
io::Error::new(io::ErrorKind::ConnectionReset, "Ping timed out.").into(),
)
} else { } else {
loop { loop {
match try_ready!(self.inner.poll()) { match try_ready!(self.inner.poll()) {
@ -40,14 +54,17 @@ impl<T> Stream for IrcTransport<T> where T: AsyncRead + AsyncWrite {
assert!(result.is_ready()); assert!(result.is_ready());
self.poll_complete()?; self.poll_complete()?;
} }
message => return Ok(Async::Ready(message)) message => return Ok(Async::Ready(message)),
} }
} }
} }
} }
} }
impl<T> Sink for IrcTransport<T> where T: AsyncRead + AsyncWrite { impl<T> Sink for IrcTransport<T>
where
T: AsyncRead + AsyncWrite,
{
type SinkItem = Message; type SinkItem = Message;
type SinkError = error::Error; type SinkError = error::Error;

View file

@ -23,9 +23,9 @@ impl Decoder for IrcCodec {
type Error = error::Error; type Error = error::Error;
fn decode(&mut self, src: &mut BytesMut) -> error::Result<Option<Message>> { fn decode(&mut self, src: &mut BytesMut) -> error::Result<Option<Message>> {
self.inner.decode(src).and_then(|res| res.map_or(Ok(None), |msg| { self.inner.decode(src).and_then(|res| {
msg.parse::<Message>().map(|msg| Some(msg)) res.map_or(Ok(None), |msg| msg.parse::<Message>().map(|msg| Some(msg)))
})) })
} }
} }

View file

@ -15,11 +15,13 @@ pub struct LineCodec {
impl LineCodec { impl LineCodec {
/// Creates a new instance of LineCodec from the specified encoding. /// Creates a new instance of LineCodec from the specified encoding.
pub fn new(label: &str) -> error::Result<LineCodec> { pub fn new(label: &str) -> error::Result<LineCodec> {
encoding_from_whatwg_label(label).map(|enc| LineCodec { encoding: enc }).ok_or( encoding_from_whatwg_label(label)
.map(|enc| LineCodec { encoding: enc })
.ok_or(
io::Error::new( io::Error::new(
io::ErrorKind::InvalidInput, io::ErrorKind::InvalidInput,
&format!("Attempted to use unknown codec {}.", label)[..] &format!("Attempted to use unknown codec {}.", label)[..],
).into() ).into(),
) )
} }
} }
@ -36,10 +38,12 @@ impl Decoder for LineCodec {
// Decode the line using the codec's encoding. // Decode the line using the codec's encoding.
match self.encoding.decode(line.as_ref(), DecoderTrap::Replace) { match self.encoding.decode(line.as_ref(), DecoderTrap::Replace) {
Ok(data) => Ok(Some(data)), Ok(data) => Ok(Some(data)),
Err(data) => Err(io::Error::new( Err(data) => Err(
io::Error::new(
io::ErrorKind::InvalidInput, io::ErrorKind::InvalidInput,
&format!("Failed to decode {} as {}.", data, self.encoding.name())[..] &format!("Failed to decode {} as {}.", data, self.encoding.name())[..],
).into()) ).into(),
),
} }
} else { } else {
Ok(None) Ok(None)
@ -53,10 +57,12 @@ impl Encoder for LineCodec {
fn encode(&mut self, msg: String, dst: &mut BytesMut) -> error::Result<()> { fn encode(&mut self, msg: String, dst: &mut BytesMut) -> error::Result<()> {
// Encode the message using the codec's encoding. // Encode the message using the codec's encoding.
let data: error::Result<Vec<u8>> = self.encoding.encode(&msg, EncoderTrap::Replace).map_err(|data| { let data: error::Result<Vec<u8>> = self.encoding
.encode(&msg, EncoderTrap::Replace)
.map_err(|data| {
io::Error::new( io::Error::new(
io::ErrorKind::InvalidInput, io::ErrorKind::InvalidInput,
&format!("Failed to encode {} as {}.", data, self.encoding.name())[..] &format!("Failed to encode {} as {}.", data, self.encoding.name())[..],
).into() ).into()
}); });

View file

@ -258,14 +258,19 @@ mod test {
prefix: None, prefix: None,
command: PRIVMSG(format!("test"), format!("Testing!")), command: PRIVMSG(format!("test"), format!("Testing!")),
}; };
assert_eq!("PRIVMSG test :Testing!\r\n".parse::<Message>().unwrap(), message); assert_eq!(
"PRIVMSG test :Testing!\r\n".parse::<Message>().unwrap(),
message
);
let message = Message { let message = Message {
tags: None, tags: None,
prefix: Some(format!("test!test@test")), prefix: Some(format!("test!test@test")),
command: PRIVMSG(format!("test"), format!("Still testing!")), command: PRIVMSG(format!("test"), format!("Still testing!")),
}; };
assert_eq!( assert_eq!(
":test!test@test PRIVMSG test :Still testing!\r\n".parse::<Message>().unwrap(), ":test!test@test PRIVMSG test :Still testing!\r\n"
.parse::<Message>()
.unwrap(),
message message
); );
let message = Message { let message = Message {
@ -280,7 +285,8 @@ mod test {
assert_eq!( assert_eq!(
"@aaa=bbb;ccc;example.com/ddd=eee :test!test@test PRIVMSG test :Testing with \ "@aaa=bbb;ccc;example.com/ddd=eee :test!test@test PRIVMSG test :Testing with \
tags!\r\n" tags!\r\n"
.parse::<Message>().unwrap(), .parse::<Message>()
.unwrap(),
message message
) )
} }