Merged develop onto 0.14 (which was tricky, and this might have introduced bugs?).

This commit is contained in:
Aaron Weiss 2018-09-17 17:50:53 -04:00
parent c69e944033
commit 53fb890a7e
No known key found for this signature in database
GPG key ID: 047D32DF25DC22EF
21 changed files with 425 additions and 367 deletions

View file

@ -16,7 +16,7 @@ notifications:
email: false
irc:
channels:
- "irc.fyrechat.net#vana-commits"
- "ircs://irc.pdgn.co:6697/#commits"
template:
- "%{repository_slug}/%{branch} (%{commit} - %{author}): %{message}"
skip_join: true

View file

@ -33,18 +33,18 @@ encoding = "0.2"
failure = "0.1"
futures = "0.1"
irc-proto = { version = "*", path = "irc-proto" }
log = "0.3"
native-tls = "0.1"
log = "0.4"
native-tls = "0.2"
serde = "1.0"
serde_derive = "1.0"
serde_json = { version = "1.0", optional = true }
serde_yaml = { version = "0.7", optional = true }
tokio = "0.1"
tokio-codec = "0.1"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-mockstream = "1.1"
tokio-timer = "0.1"
tokio-tls = "0.1"
tokio-tls = "0.2"
toml = { version = "0.4", optional = true }
[dev-dependencies]

View file

@ -39,7 +39,7 @@ Making your own project? [Submit a pull request](https://github.com/aatxe/irc/pu
## Getting Started
To start using the irc crate with cargo, you can simply add `irc = "0.13"` to your dependencies in
To start using the irc crate with cargo, you can add `irc = "0.13"` to your dependencies in
your Cargo.toml file. The high-level API can be found in [`irc::client::prelude`][irc-prelude].
You'll find a number of examples to help you get started in `examples/`, throughout the
documentation, and below.
@ -174,7 +174,7 @@ cargo run --example convertconf -- -i client_config.json -o client_config.toml
```
Note that the formats are automatically determined based on the selected file extensions. This
tool should make it easy for users to migrate their old configurations to TOML.
tool should make it easier for users to migrate their old configurations to TOML.
## Contributing
the irc crate is a free, open source library that relies on contributions from its maintainers,

View file

@ -7,15 +7,15 @@ use irc::client::prelude::*;
fn main() {
let cfg1 = Config {
nickname: Some("pickles".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
server: Some("irc.mozilla.org".to_owned()),
channels: Some(vec!["#rust-spam".to_owned()]),
..Default::default()
};
let cfg2 = Config {
nickname: Some("bananas".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
server: Some("irc.mozilla.org".to_owned()),
channels: Some(vec!["#rust-spam".to_owned()]),
..Default::default()
};
@ -26,7 +26,7 @@ fn main() {
for config in configs {
// Immediate errors like failure to resolve the server's domain or to establish any connection will
// manifest here in the result of prepare_client_and_connect.
let client = reactor.prepare_client_and_connect(config).unwrap();
let client = reactor.prepare_client_and_connect(&config).unwrap();
client.identify().unwrap();
// Here, we tell the reactor to setup this client for future handling (in run) using the specified
// handler function process_msg.

View file

@ -8,13 +8,13 @@ fn main() {
let config = Config {
nickname: Some("pickles".to_owned()),
alt_nicks: Some(vec!["bananas".to_owned(), "apples".to_owned()]),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
server: Some("irc.mozilla.org".to_owned()),
channels: Some(vec!["#rust-spam".to_owned()]),
..Default::default()
};
let mut reactor = IrcReactor::new().unwrap();
let client = reactor.prepare_client_and_connect(config).unwrap();
let client = reactor.prepare_client_and_connect(&config).unwrap();
client.identify().unwrap();
reactor.register_client_with_handler(client, |client, message| {

View file

@ -7,15 +7,15 @@ use irc::client::prelude::*;
fn main() {
let cfg1 = Config {
nickname: Some("pickles".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
server: Some("irc.mozilla.org".to_owned()),
channels: Some(vec!["#rust-spam".to_owned()]),
..Default::default()
};
let cfg2 = Config {
nickname: Some("bananas".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
server: Some("irc.mozilla.org".to_owned()),
channels: Some(vec!["#rust-spam".to_owned()]),
..Default::default()
};
@ -26,7 +26,7 @@ fn main() {
loop {
let res = configs.iter().fold(Ok(()), |acc, config| {
acc.and(
reactor.prepare_client_and_connect(config.clone()).and_then(|client| {
reactor.prepare_client_and_connect(&config).and_then(|client| {
client.identify().and(Ok(client))
}).and_then(|client| {
reactor.register_client_with_handler(client, process_msg);

View file

@ -7,8 +7,8 @@ fn main() {
let config = Config {
nickname: Some("pickles".to_owned()),
alt_nicks: Some(vec!["bananas".to_owned(), "apples".to_owned()]),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
server: Some("irc.mozilla.org".to_owned()),
channels: Some(vec!["#rust-spam".to_owned()]),
..Default::default()
};

View file

@ -6,8 +6,8 @@ use irc::client::prelude::*;
fn main() {
let config = Config {
nickname: Some("pickles".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
server: Some("irc.mozilla.org".to_owned()),
channels: Some(vec!["#rust-spam".to_owned()]),
use_ssl: Some(true),
..Default::default()
};

51
examples/tooter.rs Normal file
View file

@ -0,0 +1,51 @@
extern crate irc;
extern crate tokio_timer;
use std::default::Default;
use std::time::Duration;
use irc::client::prelude::*;
use irc::error::IrcError;
// NOTE: this example is a conversion of `tweeter.rs` to an asynchronous style with `IrcReactor`.
fn main() {
let config = Config {
nickname: Some("mastodon".to_owned()),
server: Some("irc.mozilla.org".to_owned()),
channels: Some(vec!["#rust-spam".to_owned()]),
..Default::default()
};
// We need to create a reactor first and foremost
let mut reactor = IrcReactor::new().unwrap();
// and then create a client via its API.
let client = reactor.prepare_client_and_connect(&config).unwrap();
// Then, we identify
client.identify().unwrap();
// and clone just as before.
let send_client = client.clone();
// Rather than spawn a thread that reads the messages separately, we register a handler with the
// reactor. just as in the original version, we don't do any real handling and instead just
// print the messages that are received.
reactor.register_client_with_handler(client, |_, message| {
print!("{}", message);
Ok(())
});
// We construct an interval using a wheel timer from tokio_timer. This interval will fire every
// ten seconds (and is roughly accurate to the second).
let send_interval = tokio_timer::wheel()
.tick_duration(Duration::from_secs(1))
.num_slots(256)
.build()
.interval(Duration::from_secs(10));
// And then spawn a new future that performs the given action each time it fires.
reactor.register_future(send_interval.map_err(IrcError::Timer).for_each(move |()| {
// Anything in here will happen every 10 seconds!
send_client.send_privmsg("#rust-spam", "AWOOOOOOOOOO")
}));
// Then, on the main thread, we finally run the reactor which blocks the program indefinitely.
reactor.run().unwrap();
}

View file

@ -5,11 +5,12 @@ use std::thread;
use std::time::Duration;
use irc::client::prelude::*;
// NOTE: you can find an asynchronous version of this example with `IrcReactor` in `tooter.rs`.
fn main() {
let config = Config {
nickname: Some("pickles".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
server: Some("irc.mozilla.org".to_owned()),
channels: Some(vec!["#rust-spam".to_owned()]),
..Default::default()
};
let client = IrcClient::from_config(config).unwrap();
@ -20,7 +21,7 @@ fn main() {
client2.stream().map(|m| print!("{}", m)).wait().count();
});
loop {
client.send_privmsg("#irc-crate", "TWEET TWEET").unwrap();
client.send_privmsg("#rust-spam", "TWEET TWEET").unwrap();
thread::sleep(Duration::new(10, 0));
}
}

View file

@ -14,10 +14,11 @@ travis-ci = { repository = "aatxe/irc" }
[features]
default = ["tokio"]
tokio = ["tokio-io", "bytes"]
tokio = ["tokio-codec", "tokio-io", "bytes"]
[dependencies]
bytes = { version = "0.4", optional = true }
encoding = "0.2"
failure = "0.1"
tokio-codec = { version = "0.1", optional = true }
tokio-io = { version = "0.1", optional = true }

View file

@ -1,163 +1,188 @@
//! An extension trait that provides the ability to strip IRC colors from a string
use std::borrow::Cow;
#[derive(PartialEq)]
enum ParserState {
Text,
ColorCode,
Foreground1,
Foreground1(char),
Foreground2,
Comma,
Background1,
Background1(char),
}
struct Parser {
state: ParserState,
}
/// An extension trait giving strings a function to strip IRC colors
pub trait FormattedStringExt {
pub trait FormattedStringExt<'a> {
/// Returns true if the string contains color, bold, underline or italics
fn is_formatted(&self) -> bool;
/// Returns the string with all color, bold, underline and italics stripped
fn strip_formatting(&self) -> Cow<str>;
fn strip_formatting(self) -> Cow<'a, str>;
}
const FORMAT_CHARACTERS: &[char] = &[
'\x02', // bold
'\x1F', // underline
'\x16', // reverse
'\x0F', // normal
'\x03', // color
];
impl FormattedStringExt for str {
impl<'a> FormattedStringExt<'a> for &'a str {
fn is_formatted(&self) -> bool {
self.contains('\x02') || // bold
self.contains('\x1F') || // underline
self.contains('\x16') || // reverse
self.contains('\x0F') || // normal
self.contains('\x03') // color
self.contains(FORMAT_CHARACTERS)
}
fn strip_formatting(&self) -> Cow<str> {
let mut parser = Parser {
fn strip_formatting(self) -> Cow<'a, str> {
if !self.is_formatted() {
return Cow::Borrowed(self);
}
let mut s = String::from(self);
strip_formatting(&mut s);
Cow::Owned(s)
}
}
fn strip_formatting(buf: &mut String) {
let mut parser = Parser::new();
buf.retain(|cur| parser.next(cur));
}
impl Parser {
fn new() -> Self {
Parser {
state: ParserState::Text,
};
let mut prev: char = '\x00';
let result: Cow<str> = self
.chars()
.filter(move |cur| {
let result = match parser.state {
ParserState::Text | ParserState::Foreground1 | ParserState::Foreground2 if *cur == '\x03' => {
parser.state = ParserState::ColorCode;
false
},
ParserState::Text => !['\x02', '\x1F', '\x16', '\x0F'].contains(cur),
ParserState::ColorCode if (*cur).is_digit(10) => {
parser.state = ParserState::Foreground1;
false
},
ParserState::Foreground1 if (*cur).is_digit(6) => {
// can only consume another digit if previous char was 1.
if (prev) == '1' {
parser.state = ParserState::Foreground2;
false
} else {
parser.state = ParserState::Text;
true
}
},
ParserState::Foreground1 if *cur == ',' => {
parser.state = ParserState::Comma;
false
},
ParserState::Foreground2 if *cur == ',' => {
parser.state = ParserState::Comma;
false
},
ParserState::Comma if ((*cur).is_digit(10)) => {
parser.state = ParserState::Background1;
false
},
ParserState::Background1 if (*cur).is_digit(6) => {
// can only consume another digit if previous char was 1.
parser.state = ParserState::Text;
if (prev) == '1' {
false
} else {
true
}
}
_ => {
parser.state = ParserState::Text;
true
}
};
prev = *cur;
return result
})
.collect();
result
}
}
fn next(&mut self, cur: char) -> bool {
use self::ParserState::*;
match self.state {
Text | Foreground1(_) | Foreground2 if cur == '\x03' => {
self.state = ColorCode;
false
}
Text => {
!FORMAT_CHARACTERS.contains(&cur)
}
ColorCode if cur.is_digit(10) => {
self.state = Foreground1(cur);
false
}
Foreground1('1') if cur.is_digit(6) => {
// can only consume another digit if previous char was 1.
self.state = Foreground2;
false
}
Foreground1(_) if cur.is_digit(6) => {
self.state = Text;
true
}
Foreground1(_) if cur == ',' => {
self.state = Comma;
false
}
Foreground2 if cur == ',' => {
self.state = Comma;
false
}
Comma if (cur.is_digit(10)) => {
self.state = Background1(cur);
false
}
Background1(prev) if cur.is_digit(6) => {
// can only consume another digit if previous char was 1.
self.state = Text;
prev != '1'
}
_ => {
self.state = Text;
true
}
}
}
}
impl FormattedStringExt for String {
impl FormattedStringExt<'static> for String {
fn is_formatted(&self) -> bool {
(&self[..]).is_formatted()
self.as_str().is_formatted()
}
fn strip_formatting(&self) -> Cow<str> {
(&self[..]).strip_formatting()
fn strip_formatting(mut self) -> Cow<'static, str> {
if !self.is_formatted() {
return Cow::Owned(self);
}
strip_formatting(&mut self);
Cow::Owned(self)
}
}
#[cfg(test)]
mod test {
use super::FormattedStringExt;
use std::borrow::Cow;
use colors::FormattedStringExt;
#[test]
fn test_strip_bold() {
assert_eq!("l\x02ol".strip_formatting(), "lol");
macro_rules! test_formatted_string_ext {
{ $( $name:ident ( $($line:tt)* ), )* } => {
$(
mod $name {
use super::*;
test_formatted_string_ext!(@ $($line)*);
}
)*
};
(@ $text:expr, should stripped into $expected:expr) => {
#[test]
fn test_formatted() {
assert!($text.is_formatted());
}
#[test]
fn test_strip() {
assert_eq!($text.strip_formatting(), $expected);
}
};
(@ $text:expr, is not formatted) => {
#[test]
fn test_formatted() {
assert!(!$text.is_formatted());
}
#[test]
fn test_strip() {
assert_eq!($text.strip_formatting(), $text);
}
}
}
test_formatted_string_ext! {
blank("", is not formatted),
blank2(" ", is not formatted),
blank3("\t\r\n", is not formatted),
bold("l\x02ol", should stripped into "lol"),
bold_from_string(String::from("l\x02ol"), should stripped into "lol"),
bold_hangul("우왕\x02", should stripped into "우왕굳"),
fg_color("l\x033ol", should stripped into "lol"),
fg_color2("l\x0312ol", should stripped into "lol"),
fg_bg_11("l\x031,2ol", should stripped into "lol"),
fg_bg_21("l\x0312,3ol", should stripped into "lol"),
fg_bg_12("l\x031,12ol", should stripped into "lol"),
fg_bg_22("l\x0312,13ol", should stripped into "lol"),
string_with_multiple_colors("hoo\x034r\x033a\x0312y", should stripped into "hooray"),
string_with_digit_after_color("\x0344\x0355\x0366", should stripped into "456"),
string_with_multiple_2digit_colors("hoo\x0310r\x0311a\x0312y", should stripped into "hooray"),
string_with_digit_after_2digit_color("\x031212\x031111\x031010", should stripped into "121110"),
thinking("🤔...", is not formatted),
unformatted("a plain text", is not formatted),
}
#[test]
fn test_strip_fg_color() {
assert_eq!("l\x033ol".strip_formatting(), "lol");
}
#[test]
fn test_strip_fg_color2() {
assert_eq!("l\x0312ol".strip_formatting(), "lol");
}
#[test]
fn test_strip_fg_bg_11() {
assert_eq!("l\x031,2ol".strip_formatting(), "lol");
}
#[test]
fn test_strip_fg_bg_21() {
assert_eq!("l\x0312,3ol".strip_formatting(), "lol");
}
#[test]
fn test_strip_fg_bg_12() {
assert_eq!("l\x031,12ol".strip_formatting(), "lol");
}
#[test]
fn test_strip_fg_bg_22() {
assert_eq!("l\x0312,13ol".strip_formatting(), "lol");
}
#[test]
fn test_strip_string_with_multiple_colors() {
assert_eq!("hoo\x034r\x033a\x0312y".strip_formatting(), "hooray");
}
#[test]
fn test_strip_string_with_digit_after_color() {
assert_eq!("\x0344\x0355\x0366".strip_formatting(), "456");
}
#[test]
fn test_strip_string_with_multiple_2digit_colors() {
assert_eq!("hoo\x0310r\x0311a\x0312y".strip_formatting(), "hooray");
}
#[test]
fn test_strip_string_with_digit_after_2digit_color() {
assert_eq!("\x031212\x031111\x031010".strip_formatting(), "121110");
fn test_strip_no_allocation_for_unformatted_text() {
if let Cow::Borrowed(formatted) = "plain text".strip_formatting() {
assert_eq!(formatted, "plain text");
} else {
panic!("allocation detected");
}
}
}

View file

@ -1,5 +1,4 @@
//! Enumeration of all available client commands.
use std::ascii::AsciiExt;
use std::str::FromStr;
use error::MessageParseError;
@ -51,8 +50,32 @@ pub enum Command {
// 3.3 Sending messages
/// PRIVMSG msgtarget :message
///
/// ## Responding to a `PRIVMSG`
///
/// When responding to a message, it is not sufficient to simply copy the message target
/// (msgtarget). This will work just fine for responding to messages in channels where the
/// target is the same for all participants. However, when the message is sent directly to a
/// user, this target will be that client's username, and responding to that same target will
/// actually mean sending itself a response. In such a case, you should instead respond to the
/// user sending the message as specified in the message prefix. Since this is a common
/// pattern, there is a utility function
/// [`Message::response_target`](../message/struct.Message.html#method.response_target)
/// which is used for this exact purpose.
PRIVMSG(String, String),
/// NOTICE msgtarget :message
///
/// ## Responding to a `NOTICE`
///
/// When responding to a notice, it is not sufficient to simply copy the message target
/// (msgtarget). This will work just fine for responding to messages in channels where the
/// target is the same for all participants. However, when the message is sent directly to a
/// user, this target will be that client's username, and responding to that same target will
/// actually mean sending itself a response. In such a case, you should instead respond to the
/// user sending the message as specified in the message prefix. Since this is a common
/// pattern, there is a utility function
/// [`Message::response_target`](../message/struct.Message.html#method.response_target)
/// which is used for this exact purpose.
NOTICE(String, String),
// 3.4 Server queries and commands

View file

@ -1,6 +1,6 @@
//! Implementation of IRC codec for Tokio.
use bytes::BytesMut;
use tokio_io::codec::{Decoder, Encoder};
use tokio_codec::{Decoder, Encoder};
use error;
use line::LineCodec;

View file

@ -8,6 +8,8 @@ extern crate encoding;
#[macro_use]
extern crate failure;
#[cfg(feature = "tokio")]
extern crate tokio_codec;
#[cfg(feature = "tokio")]
extern crate tokio_io;
pub mod caps;

View file

@ -12,13 +12,14 @@ use error;
/// A line-based codec parameterized by an encoding.
pub struct LineCodec {
encoding: EncodingRef,
next_index: usize,
}
impl LineCodec {
/// Creates a new instance of LineCodec from the specified encoding.
pub fn new(label: &str) -> error::Result<LineCodec> {
encoding_from_whatwg_label(label)
.map(|enc| LineCodec { encoding: enc })
.map(|enc| LineCodec { encoding: enc, next_index: 0 })
.ok_or_else(|| io::Error::new(
io::ErrorKind::InvalidInput,
&format!("Attempted to use unknown codec {}.", label)[..],
@ -31,9 +32,12 @@ impl Decoder for LineCodec {
type Error = error::ProtocolError;
fn decode(&mut self, src: &mut BytesMut) -> error::Result<Option<String>> {
if let Some(n) = src.as_ref().iter().position(|b| *b == b'\n') {
if let Some(offset) = src[self.next_index..].iter().position(|b| *b == b'\n') {
// Remove the next frame from the buffer.
let line = src.split_to(n + 1);
let line = src.split_to(self.next_index + offset + 1);
// Set the search start index back to 0 since we found a newline.
self.next_index = 0;
// Decode the line using the codec's encoding.
match self.encoding.decode(line.as_ref(), DecoderTrap::Replace) {
@ -46,6 +50,9 @@ impl Decoder for LineCodec {
),
}
} else {
// Set the search start index to the current length since we know that none of the
// characters we've already looked at are newlines.
self.next_index = src.len();
Ok(None)
}
}

View file

@ -5,13 +5,13 @@ use std::io::Read;
use encoding::EncoderTrap;
use encoding::label::encoding_from_whatwg_label;
use futures::future;
use futures::{Async, Poll, Future, Sink, StartSend, Stream};
use native_tls::{Certificate, TlsConnector, Pkcs12};
use tokio::net::{ConnectFuture, TcpStream};
use tokio_io::AsyncRead;
use native_tls::{Certificate, TlsConnector, Identity};
use tokio_codec::Decoder;
use tokio_core::reactor::Handle;
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_mockstream::MockStream;
use tokio_tls::{TlsConnectorExt, TlsStream};
use tokio_tls::{self, TlsStream};
use error;
use client::data::Config;
@ -42,19 +42,20 @@ impl fmt::Debug for Connection {
}
}
/// A convenient type alias representing the `TlsStream` future.
type TlsFuture = Box<Future<Error = error::IrcError, Item = TlsStream<TcpStream>> + Send>;
/// A future representing an eventual `Connection`.
pub enum ConnectionFuture {
pub enum ConnectionFuture<'a> {
#[doc(hidden)]
Unsecured(Config, ConnectFuture),
Unsecured(&'a Config, TcpStreamNew),
#[doc(hidden)]
Secured(Config, TlsFuture),
Secured(&'a Config, TlsFuture),
#[doc(hidden)]
Mock(Config),
Mock(&'a Config),
}
impl fmt::Debug for ConnectionFuture {
impl<'a> fmt::Debug for ConnectionFuture<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
@ -65,33 +66,35 @@ impl fmt::Debug for ConnectionFuture {
ConnectionFuture::Mock(_) => "ConnectionFuture::Mock",
},
match *self {
ConnectionFuture::Unsecured(ref cfg, _) |
ConnectionFuture::Secured(ref cfg, _) |
ConnectionFuture::Mock(ref cfg) => cfg,
ConnectionFuture::Unsecured(cfg, _) |
ConnectionFuture::Secured(cfg, _) |
ConnectionFuture::Mock(cfg) => cfg,
}
)
}
}
impl Future for ConnectionFuture {
impl<'a> Future for ConnectionFuture<'a> {
type Item = Connection;
type Error = error::IrcError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
ConnectionFuture::Unsecured(ref config, ref mut inner) => {
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
ConnectionFuture::Unsecured(config, ref mut inner) => {
let stream = try_ready!(inner.poll());
let framed = IrcCodec::new(config.encoding())?.framed(stream);
let transport = IrcTransport::new(config, framed);
Ok(Async::Ready(Connection::Unsecured(transport)))
}
ConnectionFuture::Secured(ref config, ref mut inner) => {
let framed = try_ready!(inner.poll()).framed(IrcCodec::new(config.encoding())?);
ConnectionFuture::Secured(config, ref mut inner) => {
let stream = try_ready!(inner.poll());
let framed = IrcCodec::new(config.encoding())?.framed(stream);
let transport = IrcTransport::new(config, framed);
Ok(Async::Ready(Connection::Secured(transport)))
}
ConnectionFuture::Mock(ref config) => {
ConnectionFuture::Mock(config) => {
let enc: error::Result<_> = encoding_from_whatwg_label(
config.encoding()
).ok_or_else(|| error::IrcError::UnknownCodec {
@ -108,7 +111,8 @@ impl Future for ConnectionFuture {
})
};
let framed = MockStream::new(&initial?).framed(IrcCodec::new(config.encoding())?);
let stream = MockStream::new(&initial?);
let framed = IrcCodec::new(config.encoding())?.framed(stream);
let transport = IrcTransport::new(config, framed);
Ok(Async::Ready(Connection::Mock(Logged::wrap(transport))))
@ -118,56 +122,46 @@ impl Future for ConnectionFuture {
}
impl Connection {
/// Creates a future yielding a new `Connection` using the specified `Config`.
pub fn new(config: Config) -> impl Future<Item = Connection, Error = error::IrcError> {
future::lazy(move || Connection::new_inner(config))
.and_then(|connection_future| connection_future)
}
fn new_inner(config: Config) -> error::Result<ConnectionFuture> {
/// Creates a new `Connection` using the specified `Config` and `Handle`.
pub fn new<'a>(config: &'a Config, handle: &Handle) -> error::Result<ConnectionFuture<'a>> {
if config.use_mock_connection() {
Ok(ConnectionFuture::Mock(config))
} else if config.use_ssl() {
let domain = format!("{}", config.server()?);
info!("Connecting via SSL to {}.", domain);
let mut builder = TlsConnector::builder()?;
let mut builder = TlsConnector::builder();
if let Some(cert_path) = config.cert_path() {
let mut file = File::open(cert_path)?;
let mut cert_data = vec![];
file.read_to_end(&mut cert_data)?;
let cert = Certificate::from_der(&cert_data)?;
builder.add_root_certificate(cert)?;
builder.add_root_certificate(cert);
info!("Added {} to trusted certificates.", cert_path);
}
if let Some(client_cert_path) = config.client_cert_path() {
let client_cert_pass = config.client_cert_pass();
let mut file = File::open(client_cert_path)?;
let mut client_cert_data = vec![];
file.read_to_end(&mut client_cert_data)?;
let pkcs12_archive = Pkcs12::from_der(&client_cert_data, &client_cert_pass)?;
builder.identity(pkcs12_archive)?;
let pkcs12_archive = Identity::from_pkcs12(&client_cert_data, &client_cert_pass)?;
builder.identity(pkcs12_archive);
info!("Using {} for client certificate authentication.", client_cert_path);
}
let connector = builder.build()?;
let socket_addr = config.socket_addr()?;
let stream = TcpStream::connect(&socket_addr).map_err(|e| {
let connector: tokio_tls::TlsConnector = builder.build()?.into();
let stream = Box::new(TcpStream::connect(&config.socket_addr()?, handle).map_err(|e| {
let res: error::IrcError = e.into();
res
}).and_then(move |socket| {
connector.connect_async(&domain, socket).map_err(|e| e.into())
});
Ok(ConnectionFuture::Secured(config, Box::new(stream)))
connector.connect(&domain, socket).map_err(
|e| e.into(),
)
}));
Ok(ConnectionFuture::Secured(config, stream))
} else {
info!("Connecting to {}.", config.server()?);
let socket_addr = config.socket_addr()?;
Ok(ConnectionFuture::Unsecured(
config,
TcpStream::connect(&socket_addr),
TcpStream::connect(&config.socket_addr()?, handle),
))
}
}

View file

@ -37,7 +37,7 @@
//! # client.identify().unwrap();
//! client.for_each_incoming(|irc_msg| {
//! if let Command::PRIVMSG(channel, message) = irc_msg.command {
//! if message.contains(&*client.current_nickname()) {
//! if message.contains(client.current_nickname()) {
//! client.send_privmsg(&channel, "beep boop").unwrap();
//! }
//! }
@ -45,11 +45,9 @@
//! # }
//! ```
#[cfg(feature = "ctcp")]
use std::ascii::AsciiExt;
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
#[cfg(feature = "ctcp")]
@ -58,11 +56,11 @@ use futures::{Async, Poll, Future, Sink, Stream};
use futures::stream::SplitStream;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::sync::mpsc::UnboundedSender;
use tokio_core::reactor::Core;
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio_core::reactor::{Core, Handle};
use error;
use client::conn::Connection;
use client::conn::{Connection, ConnectionFuture};
use client::data::{Config, User};
use client::ext::ClientExt;
use client::transport::LogView;
@ -94,7 +92,7 @@ pub mod transport;
/// # client.identify().unwrap();
/// client.stream().for_each_incoming(|irc_msg| {
/// match irc_msg.command {
/// Command::PRIVMSG(channel, message) => if message.contains(&*client.current_nickname()) {
/// Command::PRIVMSG(channel, message) => if message.contains(client.current_nickname()) {
/// client.send_privmsg(&channel, "beep boop").unwrap();
/// }
/// _ => ()
@ -159,7 +157,7 @@ pub trait Client {
/// # client.identify().unwrap();
/// client.for_each_incoming(|irc_msg| {
/// if let Command::PRIVMSG(channel, message) = irc_msg.command {
/// if message.contains(&*client.current_nickname()) {
/// if message.contains(client.current_nickname()) {
/// client.send_privmsg(&channel, "beep boop").unwrap();
/// }
/// }
@ -223,29 +221,17 @@ impl Stream for ClientStream {
}
/// Thread-safe internal state for an IRC server connection.
///
/// Anything that should be synchronized across threads should be stuffed here. As `IrcClient` will
/// hold a single shared instance of `ClientState` using an `Arc`.
#[derive(Debug)]
struct ClientState {
/// The configuration used with this connection.
config: Config,
/// A thread-safe map of channels to the list of users in them. This is used to implement
/// the user tracking for each channel.
/// A thread-safe map of channels to the list of users in them.
chanlists: Mutex<HashMap<String, Vec<User>>>,
/// A thread-safe index into `config.alt_nicks` to handle alternative nickname usage.
/// A thread-safe index to track the current alternative nickname being used.
alt_nick_index: RwLock<usize>,
/// The current nickname in use by this client, which may differ from the one implied by
/// `alt_nick_index`. This can be the case if, for example, a new `NICK` command is sent.
current_nickname: RwLock<String>,
/// The internal IRC stream used for the reading API. This stream can only be given out to one
/// thread, and from then on, the option will be empty. This may change in the future if we
/// split `Message` into an owned and borrowed variant (the latter being cheap to clone) since
/// we might then be able to forward the messages to many stream copies.
/// A thread-safe internal IRC stream used for the reading API.
incoming: Mutex<Option<SplitStream<Connection>>>,
/// The outgoing channel used for the sending API. This channel will send messages to the
/// writing task (which could be on a different thread) that will handle the actual transmission
/// over the wire.
/// A thread-safe copy of the outgoing channel.
outgoing: UnboundedSender<Message>,
}
@ -256,8 +242,6 @@ impl<'a> Client for ClientState {
fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()> where Self: Sized {
let msg = msg.into();
// Before sending any messages to the writing task, we first process them for any special
// library-provided functionality.
self.handle_sent_message(&msg)?;
Ok(self.outgoing.unbounded_send(msg)?)
}
@ -303,29 +287,33 @@ impl ClientState {
incoming: SplitStream<Connection>,
outgoing: UnboundedSender<Message>,
config: Config,
) -> error::Result<ClientState> {
Ok(ClientState {
) -> ClientState {
ClientState {
config: config,
chanlists: Mutex::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
current_nickname: RwLock::new(config.nickname()?.to_owned()),
incoming: Mutex::new(Some(incoming)),
outgoing, config,
})
outgoing: outgoing,
}
}
/// Gets the current nickname in use.
fn current_nickname(&self) -> RwLockReadGuard<String> {
// This should never panic since we should never be poisoning the lock.
self.current_nickname.read().unwrap()
fn current_nickname(&self) -> &str {
let alt_nicks = self.config().alternate_nicknames();
let index = self.alt_nick_index.read().unwrap();
match *index {
0 => self.config().nickname().expect(
"current_nickname should not be callable if nickname is not defined."
),
i => alt_nicks[i - 1],
}
}
/// Handles sent messages internally for basic client functionality.
fn handle_sent_message(&self, msg: &Message) -> error::Result<()> {
trace!("[SENT] {}", msg.to_string());
match msg.command {
// On sending a `PART`, we remove the channel from the channel listing.
PART(ref chan, _) => {
// This should never panic since we should never be poisoning the mutex.
let _ = self.chanlists.lock().unwrap().remove(chan);
}
_ => (),
@ -337,37 +325,14 @@ impl ClientState {
fn handle_message(&self, msg: &Message) -> error::Result<()> {
trace!("[RECV] {}", msg.to_string());
match msg.command {
// On a `JOIN` message, we add the user to the channel in the channel listing. This
// works on the assumption that the client will only see `JOIN` commands for channels it
// is a member of.
JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan),
// On a `PART` message, we remove the user from the channel in the channel listing. This
// works on the assumption that the client will only see `PART` commands for channels it
// is a member of.
PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan),
// On a `KICK` message, we remove the user from the channel in the channel listing. This
// works on the assumption that the client will only see `KICK` commands for channels it
// is a member of.
KICK(ref chan, ref user, _) => self.handle_part(user, chan),
// On a `QUIT` message, we remove the user from all channels in the channel listing.
QUIT(_) => self.handle_quit(msg.source_nickname().unwrap_or("")),
// On a `NICK` message, we might update the current nickname (if the `NICK` source is
// the client), and we always update channel tracking accordingly.
NICK(ref new_nick) => {
self.handle_current_nick_change(msg.source_nickname().unwrap_or(""), new_nick);
self.handle_nick_change(msg.source_nickname().unwrap_or(""), new_nick)
}
// On a channel `MODE` message, the access level of a user might have changed (for
// example, in `MODE #pdgn +o awe`, the user `awe` is made an operator). Thus, we have
// to update the user instance in the channel listing accordingly.
ChannelMODE(ref chan, ref modes) => self.handle_mode(chan, modes),
// On `PRIVMSG` commands, we pull out CTCP messages and process them accordingly.
PRIVMSG(ref target, ref body) => {
if body.starts_with('\u{001}') {
let tokens: Vec<_> = {
@ -385,15 +350,9 @@ impl ClientState {
}
}
}
// When we receive `RPL_NAMREPLY`, we use it to populate the channel listing according
// to who is included in the reply.
Command::Response(Response::RPL_NAMREPLY, ref args, ref suffix) => {
self.handle_namreply(args, suffix)
}
// After `RPL_ENDOFMOTD` or `ERR_NOMOTD`, the client is considered "ready" and is
// allowed to perform tasks such as joining channels or setting their usermode.
Command::Response(Response::RPL_ENDOFMOTD, _, _) |
Command::Response(Response::ERR_NOMOTD, _, _) => {
self.send_nick_password()?;
@ -407,14 +366,13 @@ impl ClientState {
}
}
let joined_chans = self.chanlists.lock().unwrap();
for chan in joined_chans.keys().filter(|x| !config_chans.contains(&x.as_str())) {
for chan in joined_chans.keys().filter(
|x| !config_chans.contains(&x.as_str()),
)
{
self.send_join(chan)?
}
}
// When `ERR_NICKNAMEINUSE` or `ERR_ERRONEOUSNICKNAME` occurs, we use the alternative
// nicknames listed in the configuration to try a different `NICK`. Each time it fails,
// we move to the next alternative, until all alternatives are exhausted.
Command::Response(Response::ERR_NICKNAMEINUSE, _, _) |
Command::Response(Response::ERR_ERRONEOUSNICKNAME, _, _) => {
let alt_nicks = self.config().alternate_nicknames();
@ -426,15 +384,11 @@ impl ClientState {
*index += 1;
}
}
_ => (),
}
Ok(())
}
/// If a password for the nickname is registered, send an identification command via `NICKSERV`.
/// This will also attempt to handle the necessary steps to replace an existing user with the
/// given nickname according to the ghost sequence specified in the configuration.
fn send_nick_password(&self) -> error::Result<()> {
if self.config().nick_password().is_empty() {
Ok(())
@ -458,7 +412,6 @@ impl ClientState {
}
}
/// If any user modes are specified in the configuration, this will send them to the server.
fn send_umodes(&self) -> error::Result<()> {
if self.config().umodes().is_empty() {
Ok(())
@ -520,17 +473,6 @@ impl ClientState {
}
}
/// If `old_nick` is the current nickname for this client, we'll update the current nickname to
/// `new_nick`. This should handle both user-initiated nickname changes _and_ server-intiated
/// ones.
fn handle_current_nick_change(&self, old_nick: &str, new_nick: &str) {
if old_nick.is_empty() || new_nick.is_empty() || old_nick != &*self.current_nickname() {
return;
}
let mut nick = self.current_nickname.write().unwrap();
*nick = new_nick.to_owned();
}
#[cfg(feature = "nochanlists")]
fn handle_nick_change(&self, _: &str, _: &str) {}
@ -655,7 +597,10 @@ impl Client for IrcClient {
&self.state.config
}
fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()> where Self: Sized {
fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()>
where
Self: Sized,
{
self.state.send(msg)
}
@ -748,39 +693,34 @@ impl IrcClient {
let cfg = config.clone();
// This thread is run separately to writing outgoing messages to the wire, and hides the
// internal details of the `tokio` reactor from the programmer. However, by virtue of being
// a separate thread hidden from the programmer, its errors cannot be handled gracefully and
// will instead panic.
let _ = thread::spawn(move || {
let mut reactor = Core::new().unwrap();
let conn = reactor.run(Connection::new(cfg)).unwrap();
// Setting up internal processing stuffs.
let handle = reactor.handle();
let conn = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap();
tx_view.send(conn.log_view()).unwrap();
let (sink, stream) = conn.split();
// Forward every message from the outgoing channel to the sink.
let outgoing_future = sink.send_all(rx_outgoing.map_err::<error::IrcError, _>(|_| {
unreachable!("futures::sync::mpsc::Receiver should never return Err");
})).map(|_| ()).map_err(|e| panic!("{}", e));
// Send the stream half back to the original thread, to be stored in the client state.
// Send the stream half back to the original thread.
tx_incoming.send(stream).unwrap();
// Run the future that writes outgoing messages to the wire forever or until we panic.
// This will block the thread.
reactor.run(outgoing_future).unwrap();
});
Ok(IrcClient {
state: Arc::new(ClientState::new(rx_incoming.wait()?, tx_outgoing, config)?),
state: Arc::new(ClientState::new(rx_incoming.wait()?, tx_outgoing, config)),
view: rx_view.wait()?,
})
}
/// Creates a `Future` of an `IrcClient` from the specified configuration.
/// This can be used to set up a number of `IrcClients` on a single,
/// shared event loop. It can also be used to take more control over execution and error
/// Creates a `Future` of an `IrcClient` from the specified configuration and on the event loop
/// corresponding to the given handle. This can be used to set up a number of `IrcClients` on a
/// single, shared event loop. It can also be used to take more control over execution and error
/// handling. Connection will not occur until the event loop is run.
///
/// Proper usage requires familiarity with `tokio` and `futures`. You can find more information
@ -796,6 +736,7 @@ impl IrcClient {
/// # extern crate tokio_core;
/// # use std::default::Default;
/// # use irc::client::prelude::*;
/// # use irc::client::PackedIrcClient;
/// # use irc::error;
/// # use tokio_core::reactor::Core;
/// # fn main() {
@ -805,9 +746,9 @@ impl IrcClient {
/// # .. Default::default()
/// # };
/// let mut reactor = Core::new().unwrap();
/// let future = IrcClient::new_future(config);
/// let future = IrcClient::new_future(reactor.handle(), &config).unwrap();
/// // immediate connection errors (like no internet) will turn up here...
/// let (client, future) = reactor.run(future).unwrap();
/// let PackedIrcClient(client, future) = reactor.run(future).unwrap();
/// // runtime errors (like disconnections and so forth) will turn up here...
/// reactor.run(client.stream().for_each(move |irc_msg| {
/// // processing messages works like usual
@ -816,34 +757,22 @@ impl IrcClient {
/// # }
/// # fn process_msg(server: &IrcClient, message: Message) -> error::Result<()> { Ok(()) }
/// ```
pub fn new_future(config: Config) -> impl Future<
Item = (IrcClient, impl Future<Item = (), Error = error::IrcError> + 'static),
Error = error::IrcError
> {
Connection::new(config.clone())
.and_then(move |connection| {
let (tx_outgoing, rx_outgoing) = mpsc::unbounded();
let log_view = connection.log_view();
let (sink, stream) = connection.split();
let outgoing_future = sink.send_all(
rx_outgoing.map_err::<error::IrcError, _>(|()| {
unreachable!("futures::sync::mpsc::Receiver should never return Err");
})
).map(|_| ());
ClientState::new(stream, tx_outgoing, config).map(|state| {
let client = IrcClient {
state: Arc::new(state),
view: log_view,
};
(client, outgoing_future)
})
})
pub fn new_future(handle: Handle, config: &Config) -> error::Result<IrcClientFuture> {
let (tx_outgoing, rx_outgoing) = mpsc::unbounded();
Ok(IrcClientFuture {
conn: Connection::new(config, &handle)?,
_handle: handle,
config: config,
tx_outgoing: Some(tx_outgoing),
rx_outgoing: Some(rx_outgoing),
})
}
/// Gets the current nickname in use. This may be the primary username set in the configuration,
/// or it could be any of the alternative nicknames listed as well. As a result, this is the
/// preferred way to refer to the client's nickname.
pub fn current_nickname(&self) -> RwLockReadGuard<String> {
pub fn current_nickname(&self) -> &str {
self.state.current_nickname()
}
@ -854,6 +783,56 @@ impl IrcClient {
}
}
/// A future representing the eventual creation of an `IrcClient`.
///
/// Interaction with this future relies on the `futures` API, but is only expected for more advanced
/// use cases. To learn more, you can view the documentation for the
/// [`futures`](https://docs.rs/futures/) crate, or the tutorials for
/// [`tokio`](https://tokio.rs/docs/getting-started/futures/). An easy to use abstraction that does
/// not require this knowledge is available via [`IrcReactors`](./reactor/struct.IrcReactor.html).
#[derive(Debug)]
pub struct IrcClientFuture<'a> {
conn: ConnectionFuture<'a>,
_handle: Handle,
config: &'a Config,
tx_outgoing: Option<UnboundedSender<Message>>,
rx_outgoing: Option<UnboundedReceiver<Message>>,
}
impl<'a> Future for IrcClientFuture<'a> {
type Item = PackedIrcClient;
type Error = error::IrcError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let conn = try_ready!(self.conn.poll());
let view = conn.log_view();
let (sink, stream) = conn.split();
let outgoing_future = sink.send_all(
self.rx_outgoing.take().unwrap().map_err::<error::IrcError, _>(|()| {
unreachable!("futures::sync::mpsc::Receiver should never return Err");
})
).map(|_| ());
let server = IrcClient {
state: Arc::new(ClientState::new(
stream, self.tx_outgoing.take().unwrap(), self.config.clone()
)),
view: view,
};
Ok(Async::Ready(PackedIrcClient(server, Box::new(outgoing_future))))
}
}
/// An `IrcClient` packaged with a future that drives its message sending. In order for the client
/// to actually work properly, this future _must_ be running.
///
/// This type should only be used by advanced users who are familiar with the implementation of this
/// crate. An easy to use abstraction that does not require this knowledge is available via
/// [`IrcReactors`](./reactor/struct.IrcReactor.html).
pub struct PackedIrcClient(pub IrcClient, pub Box<Future<Item = (), Error = error::IrcError>>);
#[cfg(test)]
mod test {
use std::collections::HashMap;
@ -1067,22 +1046,6 @@ mod test {
}
}
#[test]
fn current_nickname_tracking() {
let value = ":test!test@test NICK :t3st\r\n\
:t3st!test@test NICK :t35t\r\n";
let client = IrcClient::from_config(Config {
mock_initial_value: Some(value.to_owned()),
..test_config()
}).unwrap();
assert_eq!(&*client.current_nickname(), "test");
client.for_each_incoming(|message| {
println!("{:?}", message);
}).unwrap();
assert_eq!(&*client.current_nickname(), "t35t");
}
#[test]
fn send() {
let client = IrcClient::from_config(test_config()).unwrap();

View file

@ -16,7 +16,7 @@
//! fn main() {
//! let config = Config::default();
//! let mut reactor = IrcReactor::new().unwrap();
//! let client = reactor.prepare_client_and_connect(config).unwrap();
//! let client = reactor.prepare_client_and_connect(&config).unwrap();
//! reactor.register_client_with_handler(client, process_msg);
//! reactor.run().unwrap();
//! }
@ -28,7 +28,7 @@ use futures::future;
use tokio_core::reactor::{Core, Handle};
use client::data::Config;
use client::{IrcClient, Client};
use client::{IrcClient, IrcClientFuture, PackedIrcClient, Client};
use error;
use proto::Message;
@ -65,15 +65,12 @@ impl IrcReactor {
/// # fn main() {
/// # let config = Config::default();
/// let future_client = IrcReactor::new().and_then(|mut reactor| {
/// Ok(reactor.prepare_client(config))
/// reactor.prepare_client(&config)
/// });
/// # }
/// ```
pub fn prepare_client(&mut self, config: Config) -> impl Future<
Item = (IrcClient, impl Future<Item = (), Error = error::IrcError> + 'static),
Error = error::IrcError
> {
IrcClient::new_future(config)
pub fn prepare_client<'a>(&mut self, config: &'a Config) -> error::Result<IrcClientFuture<'a>> {
IrcClient::new_future(self.inner_handle(), config)
}
/// Runs an [`IrcClientFuture`](../struct.IrcClientFuture.html), such as one from
@ -87,17 +84,14 @@ impl IrcReactor {
/// # fn main() {
/// # let config = Config::default();
/// let client = IrcReactor::new().and_then(|mut reactor| {
/// let future = reactor.prepare_client(config);
/// reactor.connect_client(future)
/// reactor.prepare_client(&config).and_then(|future| {
/// reactor.connect_client(future)
/// })
/// });
/// # }
/// ```
pub fn connect_client<F, G>(&mut self, future: F) -> error::Result<IrcClient>
where
F: Future<Item = (IrcClient, G), Error = error::IrcError>,
G: Future<Item = (), Error = error::IrcError> + 'static,
{
self.inner.run(future).map(|(client, future)| {
pub fn connect_client(&mut self, future: IrcClientFuture) -> error::Result<IrcClient> {
self.inner.run(future).map(|PackedIrcClient(client, future)| {
self.register_future(future);
client
})
@ -115,13 +109,12 @@ impl IrcReactor {
/// # fn main() {
/// # let config = Config::default();
/// let client = IrcReactor::new().and_then(|mut reactor| {
/// reactor.prepare_client_and_connect(config)
/// reactor.prepare_client_and_connect(&config)
/// });
/// # }
/// ```
pub fn prepare_client_and_connect(&mut self, config: Config) -> error::Result<IrcClient> {
let client_future = self.prepare_client(config);
self.connect_client(client_future)
pub fn prepare_client_and_connect(&mut self, config: &Config) -> error::Result<IrcClient> {
self.prepare_client(config).and_then(|future| self.connect_client(future))
}
/// Registers the given client with the specified message handler. The reactor will store this
@ -129,8 +122,6 @@ impl IrcReactor {
/// connection indefinitely (or until failure). As registration is consumed by `run`, subsequent
/// calls to run will require new registration.
///
/// **Note**: A client can only be registered once. Subsequent attempts will cause a panic.
///
/// # Example
/// ```no_run
/// # extern crate irc;
@ -139,7 +130,7 @@ impl IrcReactor {
/// # fn main() {
/// # let config = Config::default();
/// let mut reactor = IrcReactor::new().unwrap();
/// let client = reactor.prepare_client_and_connect(config).unwrap();
/// let client = reactor.prepare_client_and_connect(&config).unwrap();
/// reactor.register_client_with_handler(client, |client, msg| {
/// // Message processing happens here.
/// Ok(())
@ -185,7 +176,7 @@ impl IrcReactor {
/// # fn main() {
/// # let config = Config::default();
/// let mut reactor = IrcReactor::new().unwrap();
/// let client = reactor.prepare_client_and_connect(config).unwrap();
/// let client = reactor.prepare_client_and_connect(&config).unwrap();
/// reactor.register_client_with_handler(client, process_msg);
/// reactor.run().unwrap();
/// # }

View file

@ -7,8 +7,8 @@ use std::time::{Duration, Instant};
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use chrono::prelude::*;
use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Framed;
use tokio_timer;
use tokio_timer::{Interval, Sleep, Timer};

View file

@ -59,7 +59,7 @@ extern crate serde_derive;
extern crate serde_json;
#[cfg(feature = "yaml")]
extern crate serde_yaml;
extern crate tokio;
extern crate tokio_codec;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_mockstream;