From c363dc783794c07f601b6d0329baa3a0b6e2a36f Mon Sep 17 00:00:00 2001 From: Aaron Weiss Date: Wed, 21 Jun 2017 16:53:28 -0400 Subject: [PATCH] Migrated real IrcServer API to be async based on experiment. --- examples/async.rs | 39 -- examples/multithreaded.rs | 29 -- examples/simple.rs | 6 +- examples/simple_ssl.rs | 6 +- examples/tweeter.rs | 8 +- src/client/async.rs | 82 ---- src/client/mod.rs | 1 + src/client/server/mod.rs | 911 ++++++++++++++++---------------------- 8 files changed, 382 insertions(+), 700 deletions(-) delete mode 100644 examples/async.rs delete mode 100644 examples/multithreaded.rs diff --git a/examples/async.rs b/examples/async.rs deleted file mode 100644 index 9db90d2..0000000 --- a/examples/async.rs +++ /dev/null @@ -1,39 +0,0 @@ -extern crate futures; -extern crate irc; - -use std::default::Default; -use std::thread; -use futures::{Future, Stream}; -use irc::client::async::IrcServer; -use irc::client::data::Config; -use irc::proto::{CapSubCommand, Command}; - -fn main() { - let config = Config { - nickname: Some("pickles".to_owned()), - alt_nicks: Some(vec!["bananas".to_owned(), "apples".to_owned()]), - server: Some("chat.freenode.net".to_owned()), - channels: Some(vec!["##yulli".to_owned()]), - ..Default::default() - }; - - let mut server = IrcServer::new(config).unwrap(); - thread::sleep_ms(100); - server.send(Command::CAP(None, CapSubCommand::END, None, None)).unwrap(); - server.send(Command::NICK("aatxebot".to_owned())).unwrap(); - server.send(Command::USER("aatxebot".to_owned(), "0".to_owned(), "aatxebot".to_owned())).unwrap(); - thread::sleep_ms(100); - server.send(Command::JOIN("##yulli".to_owned(), None, None)).unwrap(); - server.recv().for_each(|msg| { - print!("{}", msg); - match msg.command { - Command::PRIVMSG(ref target, ref msg) => { - if msg.contains("pickles") { - server.send(Command::PRIVMSG(target.to_owned(), "Hi!".to_owned())).unwrap(); - } - } - _ => (), - } - Ok(()) - }).wait().unwrap(); -} diff --git a/examples/multithreaded.rs b/examples/multithreaded.rs deleted file mode 100644 index 80c54f7..0000000 --- a/examples/multithreaded.rs +++ /dev/null @@ -1,29 +0,0 @@ -extern crate irc; - -use std::default::Default; -use std::thread::spawn; -use irc::client::prelude::*; - -fn main() { - let config = Config { - nickname: Some("pickles".to_owned()), - server: Some("irc.fyrechat.net".to_owned()), - channels: Some(vec!["#irc-crate".to_owned()]), - ..Default::default() - }; - let server = IrcServer::from_config(config).unwrap(); - server.identify().unwrap(); - let server = server.clone(); - let _ = spawn(move || for message in server.iter() { - let message = message.unwrap(); // We'll just panic if there's an error. - print!("{}", message); - match message.command { - Command::PRIVMSG(ref target, ref msg) => { - if msg.contains("pickles") { - server.send_privmsg(target, "Hi!").unwrap(); - } - } - _ => (), - } - }).join(); // You might not want to join here for actual multi-threading. -} diff --git a/examples/simple.rs b/examples/simple.rs index 3b1fce7..119d349 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -13,8 +13,7 @@ fn main() { }; let server = IrcServer::from_config(config).unwrap(); server.identify().unwrap(); - for message in server.iter() { - let message = message.unwrap(); // We'll just panic if there's an error. + server.stream().for_each(|message| { print!("{}", message); match message.command { Command::PRIVMSG(ref target, ref msg) => { @@ -24,5 +23,6 @@ fn main() { } _ => (), } - } + Ok(()) + }).wait().unwrap() } diff --git a/examples/simple_ssl.rs b/examples/simple_ssl.rs index 054bca3..5949296 100644 --- a/examples/simple_ssl.rs +++ b/examples/simple_ssl.rs @@ -14,8 +14,7 @@ fn main() { }; let server = IrcServer::from_config(config).unwrap(); server.identify().unwrap(); - for message in server.iter() { - let message = message.unwrap(); // We'll just panic if there's an error. + server.stream().for_each(|message| { print!("{}", message); match message.command { Command::PRIVMSG(ref target, ref msg) => { @@ -25,5 +24,6 @@ fn main() { } _ => (), } - } + Ok(()) + }).wait().unwrap() } diff --git a/examples/tweeter.rs b/examples/tweeter.rs index a49b761..a3f3fdb 100644 --- a/examples/tweeter.rs +++ b/examples/tweeter.rs @@ -1,7 +1,7 @@ extern crate irc; use std::default::Default; -use std::thread::{sleep, spawn}; +use std::thread; use std::time::Duration; use irc::client::prelude::*; @@ -16,11 +16,11 @@ fn main() { server.identify().unwrap(); let server2 = server.clone(); // Let's set up a loop that just prints the messages. - spawn(move || { - server2.iter().map(|m| print!("{}", m.unwrap())).count(); + thread::spawn(move || { + server2.stream().map(|m| print!("{}", m)).wait().count(); }); loop { server.send_privmsg("#irc-crate", "TWEET TWEET").unwrap(); - sleep(Duration::new(10, 0)); + thread::sleep(Duration::new(10, 0)); } } diff --git a/src/client/async.rs b/src/client/async.rs index 6b236c0..a5b6766 100644 --- a/src/client/async.rs +++ b/src/client/async.rs @@ -105,85 +105,3 @@ impl Sink for Connection { } } } - -pub struct IrcServer { - config: Config, - handle: JoinHandle<()>, - incoming: Option>, - outgoing: UnboundedSender, -} - -impl IrcServer { - pub fn new(config: Config) -> error::Result { - // Setting up a remote reactor running forever. - let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); - let (tx_incoming, rx_incoming) = oneshot::channel(); - - let cfg = config.clone(); - let handle = thread::spawn(move || { - let mut reactor = Core::new().unwrap(); - - // Setting up internal processing stuffs. - let handle = reactor.handle(); - let (sink, stream) = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap().split(); - - let outgoing_future = sink.send_all(rx_outgoing.map_err(|_| { - let res: error::Error = error::ErrorKind::ChannelError.into(); - res - })); - handle.spawn(outgoing_future.map(|_| ()).map_err(|_| ())); - - // let incoming_future = tx_incoming.sink_map_err(|e| { - // let res: error::Error = e.into(); - // res - // }).send_all(stream); - // // let incoming_future = stream.forward(tx_incoming); - // handle.spawn(incoming_future.map(|_| ()).map_err(|_| ())); - tx_incoming.send(stream).unwrap(); - - reactor.run(future::empty::<(), ()>()).unwrap(); - }); - - Ok(IrcServer { - config: config, - handle: handle, - incoming: Some(rx_incoming.wait()?), - outgoing: tx_outgoing, - }) - } - - pub fn send>(&self, msg: M) -> error::Result<()> { - (&self.outgoing).send(msg.into())?; - Ok(()) - } - - pub fn recv(&mut self) -> SplitStream { - self.incoming.take().unwrap() - } - - pub fn join(self) -> () { - self.handle.join().unwrap() - } -} - -impl Stream for IrcServer { - type Item = Message; - type Error = error::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - self.incoming.as_mut().unwrap().poll().map_err(|_| error::ErrorKind::ChannelError.into()) - } -} - -impl Sink for IrcServer { - type SinkItem = Message; - type SinkError = error::Error; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - Ok(self.outgoing.start_send(item)?) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - Ok(self.outgoing.poll_complete()?) - } -} diff --git a/src/client/mod.rs b/src/client/mod.rs index 7227d96..c4a08b8 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -12,4 +12,5 @@ pub mod prelude { pub use client::server::utils::ServerExt; pub use client::data::{Capability, Command, Config, Message, NegotiationVersion, Response}; pub use client::data::kinds::{IrcRead, IrcWrite}; + pub use futures::{Future, Stream}; } diff --git a/src/client/server/mod.rs b/src/client/server/mod.rs index 0df003e..c56ac9e 100644 --- a/src/client/server/mod.rs +++ b/src/client/server/mod.rs @@ -1,20 +1,23 @@ //! Interface for working with IRC Servers. -use std::borrow::ToOwned; use std::cell::Cell; use std::collections::HashMap; -use std::error::Error as StdError; -use std::io::{Error, ErrorKind}; use std::path::Path; use std::sync::{Arc, Mutex, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread::{spawn, sleep}; -use std::time::Duration as StdDuration; -use error::Result; -use client::conn::{Connection, NetConnection}; +use std::thread; +use error; +use client::async::Connection; use client::data::{Command, Config, Message, Response, User}; use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PONG, PRIVMSG, MODE, QUIT}; use client::server::utils::ServerExt; -use time::{Duration, Timespec, Tm, now}; +use futures::{Async, Poll, Future, Sink, StartSend, Stream}; +use futures::future; +use futures::stream::{BoxStream, SplitStream}; +use futures::sync::mpsc; +use futures::sync::oneshot; +use futures::sync::mpsc::UnboundedSender; +use time; +use tokio_core::reactor::{Core, Handle}; pub mod utils; @@ -24,12 +27,12 @@ pub trait Server { fn config(&self) -> &Config; /// Sends a Command to this Server. - fn send>(&self, message: M) -> Result<()> + fn send>(&self, message: M) -> error::Result<()> where Self: Sized; - /// Gets an iterator over received messages. - fn iter<'a>(&'a self) -> Box> + 'a>; + /// Gets a stream of incoming messages from the Server. + fn stream(&self) -> ServerStream; /// Gets a list of currently joined channels. This will be none if tracking is not supported altogether. fn list_channels(&self) -> Option>; @@ -40,152 +43,38 @@ pub trait Server { fn list_users(&self, channel: &str) -> Option>; } -/// A thread-safe implementation of an IRC Server connection. -pub struct IrcServer { - /// The internal, thread-safe server state. +pub struct ServerStream { state: Arc, - /// A thread-local count of reconnection attempts used for synchronization. - reconnect_count: Cell, + stream: SplitStream, +} + +impl Stream for ServerStream { + type Item = Message; + type Error = error::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match try_ready!(self.stream.poll()) { + Some(msg) => { + self.state.handle_message(&msg)?; + Ok(Async::Ready(Some(msg))) + } + None => Ok(Async::Ready(None)), + } + } } /// Thread-safe internal state for an IRC server connection. struct ServerState { - /// The thread-safe IRC connection. - conn: Box, /// The configuration used with this connection. config: Config, /// A thread-safe map of channels to the list of users in them. chanlists: Mutex>>, /// A thread-safe index to track the current alternative nickname being used. alt_nick_index: RwLock, - /// A thread-safe count of reconnection attempts used for synchronization. - reconnect_count: Mutex, - /// A thread-safe store for the time of the last action. - last_action_time: Mutex, - /// A thread-safe store for the last ping data. - last_ping_data: Mutex>, - /// A thread-safe check of pong reply. - waiting_pong_reply: AtomicBool, -} - -impl ServerState { - fn new(conn: C, config: Config) -> ServerState - where - C: Connection + Send + Sync + 'static, - { - ServerState { - conn: Box::new(conn), - config: config, - chanlists: Mutex::new(HashMap::new()), - alt_nick_index: RwLock::new(0), - reconnect_count: Mutex::new(0), - last_action_time: Mutex::new(now()), - last_ping_data: Mutex::new(None), - waiting_pong_reply: AtomicBool::new(false), - } - } - - fn reconnect(&self) -> Result<()> { - let mut ping_data = self.last_ping_data.lock().unwrap(); - *ping_data = None; - self.waiting_pong_reply.store(false, Ordering::SeqCst); - self.conn.reconnect() - } - - fn action_taken(&self) { - let mut time = self.last_action_time.lock().unwrap(); - *time = now(); - } - - fn should_ping(&self) -> bool { - let time = self.last_action_time.lock().unwrap(); - (now() - *time) > Duration::seconds(self.config.ping_time() as i64) - } - - fn last_action_time(&self) -> Tm { - *self.last_action_time.lock().unwrap() - } - - fn update_ping_data(&self, data: Timespec) { - let mut ping_data = self.last_ping_data.lock().unwrap(); - *ping_data = Some(data); - self.waiting_pong_reply.store(true, Ordering::SeqCst); - } - - fn last_ping_data(&self) -> Option { - *self.last_ping_data.lock().unwrap() - } - - fn waiting_pong_reply(&self) -> bool { - self.waiting_pong_reply.load(Ordering::SeqCst) - } - - fn check_pong(&self, data: &str) { - if let Some(ref time) = self.last_ping_data() { - let fmt = format!("{}", time.sec); - if fmt == data { - // found matching pong reply - self.waiting_pong_reply.store(false, Ordering::SeqCst); - } - } - } - - fn send_impl(&self, msg: Message) { - while let Err(_) = self.write(msg.clone()) { - let _ = self.reconnect().and_then(|_| self.identify()); - } - self.action_taken(); - } - - /// Sanitizes the input string by cutting up to (and including) the first occurence of a line - /// terminiating phrase (`\r\n`, `\r`, or `\n`). - fn sanitize(data: &str) -> &str { - // n.b. ordering matters here to prefer "\r\n" over "\r" - if let Some((pos, len)) = ["\r\n", "\r", "\n"] - .iter() - .flat_map(|needle| data.find(needle).map(|pos| (pos, needle.len()))) - .min_by_key(|&(pos, _)| pos) - { - data.split_at(pos + len).0 - } else { - data - } - } - - fn write>(&self, msg: M) -> Result<()> { - self.conn.send( - ServerState::sanitize(&msg.into().to_string()), - self.config.encoding(), - ) - } -} - -impl IrcServer { - /// Creates a new IRC Server connection from the configuration at the specified path, - /// connecting immediately. - pub fn new>(config: P) -> Result { - IrcServer::from_config(try!(Config::load(config))) - } - - /// Creates a new IRC server connection from the specified configuration, connecting - /// immediately. - pub fn from_config(config: Config) -> Result { - let conn = try!(if config.use_ssl() { - NetConnection::connect_ssl(config.server(), config.port()) - } else { - NetConnection::connect(config.server(), config.port()) - }); - Ok(IrcServer::from_connection(config, conn)) - } -} - -impl Clone for IrcServer { - fn clone(&self) -> IrcServer { - IrcServer { - state: self.state.clone(), - reconnect_count: self.reconnect_count.clone(), - } - } + /// A thread-safe internal IRC stream used for the reading API. + incoming: Mutex>>, + /// A thread-safe copy of the outgoing channel. + outgoing: UnboundedSender, } impl<'a> Server for ServerState { @@ -193,16 +82,17 @@ impl<'a> Server for ServerState { &self.config } - fn send>(&self, msg: M) -> Result<()> + fn send>(&self, msg: M) -> error::Result<()> where Self: Sized, { - self.send_impl(msg.into()); - Ok(()) + Ok((&self.outgoing).send( + ServerState::sanitize(&msg.into().to_string()).into(), + )?) } - fn iter(&self) -> Box>> { - panic!("unimplemented") + fn stream(&self) -> ServerStream { + unimplemented!() } #[cfg(not(feature = "nochanlists"))] @@ -237,12 +127,298 @@ impl<'a> Server for ServerState { } } +impl ServerState { + fn new(incoming: SplitStream, outgoing: UnboundedSender, config: Config) -> ServerState + { + ServerState { + config: config, + chanlists: Mutex::new(HashMap::new()), + alt_nick_index: RwLock::new(0), + incoming: Mutex::new(Some(incoming)), + outgoing: outgoing, + } + } + + /// Sanitizes the input string by cutting up to (and including) the first occurence of a line + /// terminiating phrase (`\r\n`, `\r`, or `\n`). + fn sanitize(data: &str) -> &str { + // n.b. ordering matters here to prefer "\r\n" over "\r" + if let Some((pos, len)) = ["\r\n", "\r", "\n"] + .iter() + .flat_map(|needle| data.find(needle).map(|pos| (pos, needle.len()))) + .min_by_key(|&(pos, _)| pos) + { + data.split_at(pos + len).0 + } else { + data + } + } + + /// Gets the current nickname in use. + pub fn current_nickname(&self) -> &str { + let alt_nicks = self.config().alternate_nicknames(); + let index = self.alt_nick_index.read().unwrap(); + match *index { + 0 => self.config().nickname(), + i => alt_nicks[i - 1], + } + } + + /// Handles received messages internally for basic client functionality. + fn handle_message(&self, msg: &Message) -> error::Result<()> { + match msg.command { + JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan), + PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan), + QUIT(_) => self.handle_quit(msg.source_nickname().unwrap_or("")), + NICK(ref new_nick) => { + self.handle_nick_change(msg.source_nickname().unwrap_or(""), new_nick) + } + MODE(ref chan, ref mode, Some(ref user)) => self.handle_mode(chan, mode, user), + PRIVMSG(ref target, ref body) => { + if body.starts_with('\u{001}') { + let tokens: Vec<_> = { + let end = if body.ends_with('\u{001}') { + body.len() - 1 + } else { + body.len() + }; + body[1..end].split(' ').collect() + }; + if target.starts_with('#') { + try!(self.handle_ctcp(target, tokens)) + } else if let Some(user) = msg.source_nickname() { + try!(self.handle_ctcp(user, tokens)) + } + } + } + Command::Response(Response::RPL_NAMREPLY, ref args, ref suffix) => { + self.handle_namreply(args, suffix) + } + Command::Response(Response::RPL_ENDOFMOTD, _, _) | + Command::Response(Response::ERR_NOMOTD, _, _) => { + self.send_nick_password()?; + self.send_umodes()?; + + let config_chans = self.config().channels(); + for chan in &config_chans { + match self.config().channel_key(chan) { + Some(key) => self.send_join_with_keys(chan, key)?, + None => self.send_join(chan)?, + } + } + let joined_chans = self.chanlists.lock().unwrap(); + for chan in joined_chans.keys().filter( + |x| !config_chans.contains(&x.as_str()), + ) + { + self.send_join(chan)? + } + } + Command::Response(Response::ERR_NICKNAMEINUSE, _, _) | + Command::Response(Response::ERR_ERRONEOUSNICKNAME, _, _) => { + let alt_nicks = self.config().alternate_nicknames(); + let mut index = self.alt_nick_index.write().unwrap(); + if *index >= alt_nicks.len() { + panic!("All specified nicknames were in use or disallowed.") + } else { + try!(self.send(NICK(alt_nicks[*index].to_owned()))); + *index += 1; + } + } + _ => () + } + Ok(()) + } + + fn send_nick_password(&self) -> error::Result<()> { + if self.config().nick_password().is_empty() { + Ok(()) + } else { + let mut index = self.alt_nick_index.write().unwrap(); + if self.config().should_ghost() && *index != 0 { + for seq in &self.config().ghost_sequence() { + try!(self.send(NICKSERV(format!( + "{} {} {}", + seq, + self.config().nickname(), + self.config().nick_password() + )))); + } + *index = 0; + try!(self.send(NICK(self.config().nickname().to_owned()))) + } + self.send(NICKSERV( + format!("IDENTIFY {}", self.config().nick_password()), + )) + } + } + + fn send_umodes(&self) -> error::Result<()> { + if self.config().umodes().is_empty() { + Ok(()) + } else { + self.send_mode(self.current_nickname(), self.config().umodes(), "") + } + } + + #[cfg(feature = "nochanlists")] + fn handle_join(&self, _: &str, _: &str) {} + + #[cfg(not(feature = "nochanlists"))] + fn handle_join(&self, src: &str, chan: &str) { + if let Some(vec) = self.chanlists.lock().unwrap().get_mut(&chan.to_owned()) { + if !src.is_empty() { + vec.push(User::new(src)) + } + } + } + + #[cfg(feature = "nochanlists")] + fn handle_part(&self, src: &str, chan: &str) {} + + #[cfg(not(feature = "nochanlists"))] + fn handle_part(&self, src: &str, chan: &str) { + if let Some(vec) = self.chanlists.lock().unwrap().get_mut(&chan.to_owned()) { + if !src.is_empty() { + if let Some(n) = vec.iter().position(|x| x.get_nickname() == src) { + vec.swap_remove(n); + } + } + } + } + + #[cfg(feature = "nochanlists")] + fn handle_quit(&self, _: &str) {} + + #[cfg(not(feature = "nochanlists"))] + fn handle_quit(&self, src: &str) { + if src.is_empty() { + return; + } + let mut chanlists = self.chanlists.lock().unwrap(); + for channel in chanlists.clone().keys() { + if let Some(vec) = chanlists.get_mut(&channel.to_owned()) { + if let Some(p) = vec.iter().position(|x| x.get_nickname() == src) { + vec.swap_remove(p); + } + } + } + } + + #[cfg(feature = "nochanlists")] + fn handle_nick_change(&self, _: &str, _: &str) {} + + #[cfg(not(feature = "nochanlists"))] + fn handle_nick_change(&self, old_nick: &str, new_nick: &str) { + if old_nick.is_empty() || new_nick.is_empty() { + return; + } + let mut chanlists = self.chanlists.lock().unwrap(); + for channel in chanlists.clone().keys() { + if let Some(vec) = chanlists.get_mut(&channel.to_owned()) { + if let Some(n) = vec.iter().position(|x| x.get_nickname() == old_nick) { + let new_entry = User::new(new_nick); + vec[n] = new_entry; + } + } + } + } + + #[cfg(feature = "nochanlists")] + fn handle_mode(&self, chan: &str, mode: &str, user: &str) {} + + #[cfg(not(feature = "nochanlists"))] + fn handle_mode(&self, chan: &str, mode: &str, user: &str) { + if let Some(vec) = self.chanlists.lock().unwrap().get_mut(chan) { + if let Some(n) = vec.iter().position(|x| x.get_nickname() == user) { + vec[n].update_access_level(mode) + } + } + } + + #[cfg(feature = "nochanlists")] + fn handle_namreply(&self, _: &[String], _: &Option) {} + + #[cfg(not(feature = "nochanlists"))] + fn handle_namreply(&self, args: &[String], suffix: &Option) { + if let Some(ref users) = *suffix { + if args.len() == 3 { + let chan = &args[2]; + for user in users.split(' ') { + let mut chanlists = self.chanlists.lock().unwrap(); + chanlists + .entry(chan.clone()) + .or_insert_with(Vec::new) + .push(User::new(user)) + } + } + } + } + + /// Handles CTCP requests if the CTCP feature is enabled. + #[cfg(feature = "ctcp")] + fn handle_ctcp(&self, resp: &str, tokens: Vec<&str>) -> error::Result<()> { + if tokens.is_empty() { + return Ok(()); + } + match tokens[0] { + "FINGER" => { + self.send_ctcp_internal( + resp, + &format!( + "FINGER :{} ({})", + self.config().real_name(), + self.config().username() + ), + ) + } + "VERSION" => { + self.send_ctcp_internal(resp, &format!("VERSION {}", self.config().version())) + } + "SOURCE" => { + try!(self.send_ctcp_internal( + resp, + &format!("SOURCE {}", self.config().source()), + )); + self.send_ctcp_internal(resp, "SOURCE") + } + "PING" if tokens.len() > 1 => { + self.send_ctcp_internal(resp, &format!("PING {}", tokens[1])) + } + "TIME" => self.send_ctcp_internal(resp, &format!("TIME :{}", time::now().rfc822z())), + "USERINFO" => { + self.send_ctcp_internal(resp, &format!("USERINFO :{}", self.config().user_info())) + } + _ => Ok(()), + } + } + + /// Sends a CTCP-escaped message. + #[cfg(feature = "ctcp")] + fn send_ctcp_internal(&self, target: &str, msg: &str) -> error::Result<()> { + self.send_notice(target, &format!("\u{001}{}\u{001}", msg)) + } + + /// Handles CTCP requests if the CTCP feature is enabled. + #[cfg(not(feature = "ctcp"))] + fn handle_ctcp(&self, _: &str, _: Vec<&str>) -> Result<()> { + Ok(()) + } +} + +/// A thread-safe implementation of an IRC Server connection. +#[derive(Clone)] +pub struct IrcServer { + /// The internal, thread-safe server state. + state: Arc, +} + impl Server for IrcServer { fn config(&self) -> &Config { &self.state.config } - fn send>(&self, msg: M) -> Result<()> + fn send>(&self, msg: M) -> error::Result<()> where Self: Sized, { @@ -251,8 +427,11 @@ impl Server for IrcServer { self.state.send(msg) } - fn iter<'a>(&'a self) -> Box> + 'a> { - Box::new(ServerIterator::new(self)) + fn stream(&self) -> ServerStream { + ServerStream { + state: self.state.clone(), + stream: self.state.incoming.lock().unwrap().take().unwrap(), + } } #[cfg(not(feature = "nochanlists"))] @@ -290,107 +469,51 @@ impl Server for IrcServer { } impl IrcServer { - /// Creates an IRC server from the specified configuration, and any arbitrary sync connection. - pub fn from_connection(config: Config, conn: C) -> IrcServer - where - C: Connection + Send + Sync + 'static, - { - let state = Arc::new(ServerState::new(conn, config)); - let ping_time = (state.config.ping_time() as i64) * 1000; - let ping_timeout = (state.config.ping_timeout() as i64) * 1000; - let weak = Arc::downgrade(&state); + /// Creates a new IRC Server connection from the configuration at the specified path, + /// connecting immediately. + pub fn new>(config: P) -> error::Result { + IrcServer::from_config(Config::load(config)?) + } - spawn(move || while let Some(strong) = weak.upgrade() { - let ping_idle_timespec = { - let last_action = strong.last_action_time().to_timespec(); - if let Some(last_ping) = strong.last_ping_data() { - if last_action < last_ping { - last_ping - } else { - last_action - } - } else { - last_action - } - }; - let now_timespec = now().to_timespec(); - let sleep_dur_ping_time = ping_time - - (now_timespec - ping_idle_timespec).num_milliseconds(); - let sleep_dur_ping_timeout = if let Some(time) = strong.last_ping_data() { - ping_timeout - (now_timespec - time).num_milliseconds() - } else { - ping_timeout - }; + /// Creates a new IRC server connection from the specified configuration, connecting + /// immediately. + pub fn from_config(config: Config) -> error::Result { + // Setting up a remote reactor running forever. + let (tx_outgoing, rx_outgoing) = mpsc::unbounded(); + let (tx_incoming, rx_incoming) = oneshot::channel(); - if strong.waiting_pong_reply() && sleep_dur_ping_timeout < sleep_dur_ping_time { - // timeout check is earlier - let sleep_dur = sleep_dur_ping_timeout; - if sleep_dur > 0 { - sleep(StdDuration::from_millis(sleep_dur as u64)) - } - if strong.waiting_pong_reply() { - let _ = strong.reconnect(); - while let Err(_) = strong.identify() { - let _ = strong.reconnect(); - } - } - } else { - let sleep_dur = sleep_dur_ping_time; - if sleep_dur > 0 { - sleep(StdDuration::from_millis(sleep_dur as u64)) - } - let now_timespec = now().to_timespec(); - if strong.should_ping() { - let data = now_timespec; - strong.update_ping_data(data); - let fmt = format!("{}", data.sec); - while let Err(_) = strong.write(PING(fmt.clone(), None)) { - let _ = strong.reconnect(); - } - } - } + let cfg = config.clone(); + let _ = thread::spawn(move || { + let mut reactor = Core::new().unwrap(); + + // Setting up internal processing stuffs. + let handle = reactor.handle(); + let (sink, stream) = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap().split(); + + let outgoing_future = sink.send_all(rx_outgoing.map_err(|_| { + let res: error::Error = error::ErrorKind::ChannelError.into(); + res + })); + handle.spawn(outgoing_future.map(|_| ()).map_err(|_| ())); + + // let incoming_future = tx_incoming.sink_map_err(|e| { + // let res: error::Error = e.into(); + // res + // }).send_all(stream); + // // let incoming_future = stream.forward(tx_incoming); + // handle.spawn(incoming_future.map(|_| ()).map_err(|_| ())); + tx_incoming.send(stream).unwrap(); + + reactor.run(future::empty::<(), ()>()).unwrap(); }); - IrcServer { - state: state, - reconnect_count: Cell::new(0), - } - } - /// Gets a reference to the IRC server's connection. - pub fn conn(&self) -> &Box { - &self.state.conn - } - - /// Reconnects to the IRC server, disconnecting if necessary. - pub fn reconnect(&self) -> Result<()> { - let mut reconnect_count = self.state.reconnect_count.lock().unwrap(); - let res = if self.reconnect_count.get() == *reconnect_count { - *reconnect_count += 1; - self.state.reconnect() - } else { - Ok(()) - }; - self.reconnect_count.set(*reconnect_count); - res - } - - /// Gets the current nickname in use. - pub fn current_nickname(&self) -> &str { - let alt_nicks = self.config().alternate_nicknames(); - let index = self.state.alt_nick_index.read().unwrap(); - match *index { - 0 => self.config().nickname(), - i => alt_nicks[i - 1], - } - } - - /// Returns a reference to the server state's channel lists. - fn chanlists(&self) -> &Mutex>> { - &self.state.chanlists + Ok(IrcServer { + state: Arc::new(ServerState::new(rx_incoming.wait()?, tx_outgoing, config)), + }) } /// Handles sent messages internally for basic client functionality. - fn handle_sent_message(&self, msg: &Message) -> Result<()> { + fn handle_sent_message(&self, msg: &Message) -> error::Result<()> { match msg.command { PART(ref chan, _) => { let _ = self.state.chanlists.lock().unwrap().remove(chan); @@ -400,298 +523,6 @@ impl IrcServer { Ok(()) } - /// Handles received messages internally for basic client functionality. - fn handle_message(&self, msg: &Message) -> Result<()> { - match msg.command { - PING(ref data, _) => try!(self.send_pong(data)), - PONG(ref pingdata, None) | - PONG(_, Some(ref pingdata)) => self.state.check_pong(pingdata), - JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan), - PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan), - QUIT(_) => self.handle_quit(msg.source_nickname().unwrap_or("")), - NICK(ref new_nick) => { - self.handle_nick_change(msg.source_nickname().unwrap_or(""), new_nick) - } - MODE(ref chan, ref mode, Some(ref user)) => self.handle_mode(chan, mode, user), - PRIVMSG(ref target, ref body) => { - if body.starts_with('\u{001}') { - let tokens: Vec<_> = { - let end = if body.ends_with('\u{001}') { - body.len() - 1 - } else { - body.len() - }; - body[1..end].split(' ').collect() - }; - if target.starts_with('#') { - try!(self.handle_ctcp(target, tokens)) - } else if let Some(user) = msg.source_nickname() { - try!(self.handle_ctcp(user, tokens)) - } - } - } - Command::Response(Response::RPL_NAMREPLY, ref args, ref suffix) => { - self.handle_namreply(args, suffix) - } - Command::Response(Response::RPL_ENDOFMOTD, _, _) | - Command::Response(Response::ERR_NOMOTD, _, _) => { - try!(self.send_nick_password()); - try!(self.send_umodes()); - - let config_chans = self.config().channels(); - for chan in &config_chans { - match self.config().channel_key(chan) { - Some(key) => try!(self.send_join_with_keys(chan, key)), - None => try!(self.send_join(chan)), - } - } - let joined_chans = self.state.chanlists.lock().unwrap(); - for chan in joined_chans.keys().filter( - |x| !config_chans.contains(&x.as_str()), - ) - { - try!(self.send_join(chan)) - } - } - Command::Response(Response::ERR_NICKNAMEINUSE, _, _) | - Command::Response(Response::ERR_ERRONEOUSNICKNAME, _, _) => { - let alt_nicks = self.config().alternate_nicknames(); - let mut index = self.state.alt_nick_index.write().unwrap(); - if *index >= alt_nicks.len() { - panic!("All specified nicknames were in use or disallowed.") - } else { - try!(self.send(NICK(alt_nicks[*index].to_owned()))); - *index += 1; - } - } - _ => (), - } - Ok(()) - } - - fn send_nick_password(&self) -> Result<()> { - if self.config().nick_password().is_empty() { - Ok(()) - } else { - let mut index = self.state.alt_nick_index.write().unwrap(); - if self.config().should_ghost() && *index != 0 { - for seq in &self.config().ghost_sequence() { - try!(self.send(NICKSERV(format!( - "{} {} {}", - seq, - self.config().nickname(), - self.config().nick_password() - )))); - } - *index = 0; - try!(self.send(NICK(self.config().nickname().to_owned()))) - } - self.send(NICKSERV( - format!("IDENTIFY {}", self.config().nick_password()), - )) - } - } - - fn send_umodes(&self) -> Result<()> { - if self.config().umodes().is_empty() { - Ok(()) - } else { - self.send_mode(self.current_nickname(), self.config().umodes(), "") - } - } - - #[cfg(feature = "nochanlists")] - fn handle_join(&self, _: &str, _: &str) {} - - #[cfg(not(feature = "nochanlists"))] - fn handle_join(&self, src: &str, chan: &str) { - if let Some(vec) = self.chanlists().lock().unwrap().get_mut(&chan.to_owned()) { - if !src.is_empty() { - vec.push(User::new(src)) - } - } - } - - #[cfg(feature = "nochanlists")] - fn handle_part(&self, src: &str, chan: &str) {} - - #[cfg(not(feature = "nochanlists"))] - fn handle_part(&self, src: &str, chan: &str) { - if let Some(vec) = self.chanlists().lock().unwrap().get_mut(&chan.to_owned()) { - if !src.is_empty() { - if let Some(n) = vec.iter().position(|x| x.get_nickname() == src) { - vec.swap_remove(n); - } - } - } - } - - #[cfg(feature = "nochanlists")] - fn handle_quit(&self, _: &str) {} - - #[cfg(not(feature = "nochanlists"))] - fn handle_quit(&self, src: &str) { - if src.is_empty() { - return; - } - let mut chanlists = self.chanlists().lock().unwrap(); - for channel in chanlists.clone().keys() { - if let Some(vec) = chanlists.get_mut(&channel.to_owned()) { - if let Some(p) = vec.iter().position(|x| x.get_nickname() == src) { - vec.swap_remove(p); - } - } - } - } - - #[cfg(feature = "nochanlists")] - fn handle_nick_change(&self, _: &str, _: &str) {} - - #[cfg(not(feature = "nochanlists"))] - fn handle_nick_change(&self, old_nick: &str, new_nick: &str) { - if old_nick.is_empty() || new_nick.is_empty() { - return; - } - let mut chanlists = self.chanlists().lock().unwrap(); - for channel in chanlists.clone().keys() { - if let Some(vec) = chanlists.get_mut(&channel.to_owned()) { - if let Some(n) = vec.iter().position(|x| x.get_nickname() == old_nick) { - let new_entry = User::new(new_nick); - vec[n] = new_entry; - } - } - } - } - - #[cfg(feature = "nochanlists")] - fn handle_mode(&self, chan: &str, mode: &str, user: &str) {} - - #[cfg(not(feature = "nochanlists"))] - fn handle_mode(&self, chan: &str, mode: &str, user: &str) { - if let Some(vec) = self.chanlists().lock().unwrap().get_mut(chan) { - if let Some(n) = vec.iter().position(|x| x.get_nickname() == user) { - vec[n].update_access_level(mode) - } - } - } - - #[cfg(feature = "nochanlists")] - fn handle_namreply(&self, _: &[String], _: &Option) {} - - #[cfg(not(feature = "nochanlists"))] - fn handle_namreply(&self, args: &[String], suffix: &Option) { - if let Some(ref users) = *suffix { - if args.len() == 3 { - let chan = &args[2]; - for user in users.split(' ') { - let mut chanlists = self.state.chanlists.lock().unwrap(); - chanlists - .entry(chan.clone()) - .or_insert_with(Vec::new) - .push(User::new(user)) - } - } - } - } - - /// Handles CTCP requests if the CTCP feature is enabled. - #[cfg(feature = "ctcp")] - fn handle_ctcp(&self, resp: &str, tokens: Vec<&str>) -> Result<()> { - if tokens.is_empty() { - return Ok(()); - } - match tokens[0] { - "FINGER" => { - self.send_ctcp_internal( - resp, - &format!( - "FINGER :{} ({})", - self.config().real_name(), - self.config().username() - ), - ) - } - "VERSION" => { - self.send_ctcp_internal(resp, &format!("VERSION {}", self.config().version())) - } - "SOURCE" => { - try!(self.send_ctcp_internal( - resp, - &format!("SOURCE {}", self.config().source()), - )); - self.send_ctcp_internal(resp, "SOURCE") - } - "PING" if tokens.len() > 1 => { - self.send_ctcp_internal(resp, &format!("PING {}", tokens[1])) - } - "TIME" => self.send_ctcp_internal(resp, &format!("TIME :{}", now().rfc822z())), - "USERINFO" => { - self.send_ctcp_internal(resp, &format!("USERINFO :{}", self.config().user_info())) - } - _ => Ok(()), - } - } - - /// Sends a CTCP-escaped message. - #[cfg(feature = "ctcp")] - fn send_ctcp_internal(&self, target: &str, msg: &str) -> Result<()> { - self.send_notice(target, &format!("\u{001}{}\u{001}", msg)) - } - - /// Handles CTCP requests if the CTCP feature is enabled. - #[cfg(not(feature = "ctcp"))] - fn handle_ctcp(&self, _: &str, _: Vec<&str>) -> Result<()> { - Ok(()) - } -} - -/// An `Iterator` over an `IrcServer`'s incoming `Messages`. -pub struct ServerIterator<'a> { - server: &'a IrcServer, -} - -impl<'a> ServerIterator<'a> { - /// Creates a new ServerIterator for the desired IrcServer. - pub fn new(server: &'a IrcServer) -> ServerIterator { - ServerIterator { server: server } - } - - /// Gets the next line from the connection. - fn get_next_line(&self) -> Result { - self.server.conn().recv(self.server.config().encoding()) - } -} - -impl<'a> Iterator for ServerIterator<'a> { - type Item = Result; - fn next(&mut self) -> Option> { - loop { - match self.get_next_line() { - Ok(msg) => { - match msg.parse() { - Ok(res) => { - match self.server.handle_message(&res) { - Ok(()) => (), - Err(err) => return Some(Err(err)), - } - self.server.state.action_taken(); - return Some(Ok(res)); - } - Err(_) => { - return Some(Err(Error::new( - ErrorKind::InvalidInput, - &format!("Failed to parse message. (Message: {})", msg)[..], - ).into())) - } - } - } - Err(ref err) if err.description() == "EOF" => return None, - Err(_) => { - let _ = self.server.reconnect().and_then(|_| self.server.identify()); - } - } - } - } } #[cfg(test)]