Migrated real IrcServer API to be async based on experiment.

This commit is contained in:
Aaron Weiss 2017-06-21 16:53:28 -04:00
parent 6118516951
commit c363dc7837
No known key found for this signature in database
GPG key ID: 0237035D9BF03AE2
8 changed files with 382 additions and 700 deletions

View file

@ -1,39 +0,0 @@
extern crate futures;
extern crate irc;
use std::default::Default;
use std::thread;
use futures::{Future, Stream};
use irc::client::async::IrcServer;
use irc::client::data::Config;
use irc::proto::{CapSubCommand, Command};
fn main() {
let config = Config {
nickname: Some("pickles".to_owned()),
alt_nicks: Some(vec!["bananas".to_owned(), "apples".to_owned()]),
server: Some("chat.freenode.net".to_owned()),
channels: Some(vec!["##yulli".to_owned()]),
..Default::default()
};
let mut server = IrcServer::new(config).unwrap();
thread::sleep_ms(100);
server.send(Command::CAP(None, CapSubCommand::END, None, None)).unwrap();
server.send(Command::NICK("aatxebot".to_owned())).unwrap();
server.send(Command::USER("aatxebot".to_owned(), "0".to_owned(), "aatxebot".to_owned())).unwrap();
thread::sleep_ms(100);
server.send(Command::JOIN("##yulli".to_owned(), None, None)).unwrap();
server.recv().for_each(|msg| {
print!("{}", msg);
match msg.command {
Command::PRIVMSG(ref target, ref msg) => {
if msg.contains("pickles") {
server.send(Command::PRIVMSG(target.to_owned(), "Hi!".to_owned())).unwrap();
}
}
_ => (),
}
Ok(())
}).wait().unwrap();
}

View file

@ -1,29 +0,0 @@
extern crate irc;
use std::default::Default;
use std::thread::spawn;
use irc::client::prelude::*;
fn main() {
let config = Config {
nickname: Some("pickles".to_owned()),
server: Some("irc.fyrechat.net".to_owned()),
channels: Some(vec!["#irc-crate".to_owned()]),
..Default::default()
};
let server = IrcServer::from_config(config).unwrap();
server.identify().unwrap();
let server = server.clone();
let _ = spawn(move || for message in server.iter() {
let message = message.unwrap(); // We'll just panic if there's an error.
print!("{}", message);
match message.command {
Command::PRIVMSG(ref target, ref msg) => {
if msg.contains("pickles") {
server.send_privmsg(target, "Hi!").unwrap();
}
}
_ => (),
}
}).join(); // You might not want to join here for actual multi-threading.
}

View file

@ -13,8 +13,7 @@ fn main() {
};
let server = IrcServer::from_config(config).unwrap();
server.identify().unwrap();
for message in server.iter() {
let message = message.unwrap(); // We'll just panic if there's an error.
server.stream().for_each(|message| {
print!("{}", message);
match message.command {
Command::PRIVMSG(ref target, ref msg) => {
@ -24,5 +23,6 @@ fn main() {
}
_ => (),
}
}
Ok(())
}).wait().unwrap()
}

View file

@ -14,8 +14,7 @@ fn main() {
};
let server = IrcServer::from_config(config).unwrap();
server.identify().unwrap();
for message in server.iter() {
let message = message.unwrap(); // We'll just panic if there's an error.
server.stream().for_each(|message| {
print!("{}", message);
match message.command {
Command::PRIVMSG(ref target, ref msg) => {
@ -25,5 +24,6 @@ fn main() {
}
_ => (),
}
}
Ok(())
}).wait().unwrap()
}

View file

@ -1,7 +1,7 @@
extern crate irc;
use std::default::Default;
use std::thread::{sleep, spawn};
use std::thread;
use std::time::Duration;
use irc::client::prelude::*;
@ -16,11 +16,11 @@ fn main() {
server.identify().unwrap();
let server2 = server.clone();
// Let's set up a loop that just prints the messages.
spawn(move || {
server2.iter().map(|m| print!("{}", m.unwrap())).count();
thread::spawn(move || {
server2.stream().map(|m| print!("{}", m)).wait().count();
});
loop {
server.send_privmsg("#irc-crate", "TWEET TWEET").unwrap();
sleep(Duration::new(10, 0));
thread::sleep(Duration::new(10, 0));
}
}

View file

@ -105,85 +105,3 @@ impl Sink for Connection {
}
}
}
pub struct IrcServer {
config: Config,
handle: JoinHandle<()>,
incoming: Option<SplitStream<Connection>>,
outgoing: UnboundedSender<Message>,
}
impl IrcServer {
pub fn new(config: Config) -> error::Result<IrcServer> {
// Setting up a remote reactor running forever.
let (tx_outgoing, rx_outgoing) = mpsc::unbounded();
let (tx_incoming, rx_incoming) = oneshot::channel();
let cfg = config.clone();
let handle = thread::spawn(move || {
let mut reactor = Core::new().unwrap();
// Setting up internal processing stuffs.
let handle = reactor.handle();
let (sink, stream) = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap().split();
let outgoing_future = sink.send_all(rx_outgoing.map_err(|_| {
let res: error::Error = error::ErrorKind::ChannelError.into();
res
}));
handle.spawn(outgoing_future.map(|_| ()).map_err(|_| ()));
// let incoming_future = tx_incoming.sink_map_err(|e| {
// let res: error::Error = e.into();
// res
// }).send_all(stream);
// // let incoming_future = stream.forward(tx_incoming);
// handle.spawn(incoming_future.map(|_| ()).map_err(|_| ()));
tx_incoming.send(stream).unwrap();
reactor.run(future::empty::<(), ()>()).unwrap();
});
Ok(IrcServer {
config: config,
handle: handle,
incoming: Some(rx_incoming.wait()?),
outgoing: tx_outgoing,
})
}
pub fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()> {
(&self.outgoing).send(msg.into())?;
Ok(())
}
pub fn recv(&mut self) -> SplitStream<Connection> {
self.incoming.take().unwrap()
}
pub fn join(self) -> () {
self.handle.join().unwrap()
}
}
impl Stream for IrcServer {
type Item = Message;
type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.incoming.as_mut().unwrap().poll().map_err(|_| error::ErrorKind::ChannelError.into())
}
}
impl Sink for IrcServer {
type SinkItem = Message;
type SinkError = error::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
Ok(self.outgoing.start_send(item)?)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
Ok(self.outgoing.poll_complete()?)
}
}

View file

@ -12,4 +12,5 @@ pub mod prelude {
pub use client::server::utils::ServerExt;
pub use client::data::{Capability, Command, Config, Message, NegotiationVersion, Response};
pub use client::data::kinds::{IrcRead, IrcWrite};
pub use futures::{Future, Stream};
}

View file

@ -1,20 +1,23 @@
//! Interface for working with IRC Servers.
use std::borrow::ToOwned;
use std::cell::Cell;
use std::collections::HashMap;
use std::error::Error as StdError;
use std::io::{Error, ErrorKind};
use std::path::Path;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{spawn, sleep};
use std::time::Duration as StdDuration;
use error::Result;
use client::conn::{Connection, NetConnection};
use std::thread;
use error;
use client::async::Connection;
use client::data::{Command, Config, Message, Response, User};
use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PONG, PRIVMSG, MODE, QUIT};
use client::server::utils::ServerExt;
use time::{Duration, Timespec, Tm, now};
use futures::{Async, Poll, Future, Sink, StartSend, Stream};
use futures::future;
use futures::stream::{BoxStream, SplitStream};
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::sync::mpsc::UnboundedSender;
use time;
use tokio_core::reactor::{Core, Handle};
pub mod utils;
@ -24,12 +27,12 @@ pub trait Server {
fn config(&self) -> &Config;
/// Sends a Command to this Server.
fn send<M: Into<Message>>(&self, message: M) -> Result<()>
fn send<M: Into<Message>>(&self, message: M) -> error::Result<()>
where
Self: Sized;
/// Gets an iterator over received messages.
fn iter<'a>(&'a self) -> Box<Iterator<Item = Result<Message>> + 'a>;
/// Gets a stream of incoming messages from the Server.
fn stream(&self) -> ServerStream;
/// Gets a list of currently joined channels. This will be none if tracking is not supported altogether.
fn list_channels(&self) -> Option<Vec<String>>;
@ -40,103 +43,102 @@ pub trait Server {
fn list_users(&self, channel: &str) -> Option<Vec<User>>;
}
/// A thread-safe implementation of an IRC Server connection.
pub struct IrcServer {
/// The internal, thread-safe server state.
pub struct ServerStream {
state: Arc<ServerState>,
/// A thread-local count of reconnection attempts used for synchronization.
reconnect_count: Cell<u32>,
stream: SplitStream<Connection>,
}
impl Stream for ServerStream {
type Item = Message;
type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.stream.poll()) {
Some(msg) => {
self.state.handle_message(&msg)?;
Ok(Async::Ready(Some(msg)))
}
None => Ok(Async::Ready(None)),
}
}
}
/// Thread-safe internal state for an IRC server connection.
struct ServerState {
/// The thread-safe IRC connection.
conn: Box<Connection + Send + Sync>,
/// The configuration used with this connection.
config: Config,
/// A thread-safe map of channels to the list of users in them.
chanlists: Mutex<HashMap<String, Vec<User>>>,
/// A thread-safe index to track the current alternative nickname being used.
alt_nick_index: RwLock<usize>,
/// A thread-safe count of reconnection attempts used for synchronization.
reconnect_count: Mutex<u32>,
/// A thread-safe store for the time of the last action.
last_action_time: Mutex<Tm>,
/// A thread-safe store for the last ping data.
last_ping_data: Mutex<Option<Timespec>>,
/// A thread-safe check of pong reply.
waiting_pong_reply: AtomicBool,
/// A thread-safe internal IRC stream used for the reading API.
incoming: Mutex<Option<SplitStream<Connection>>>,
/// A thread-safe copy of the outgoing channel.
outgoing: UnboundedSender<Message>,
}
impl<'a> Server for ServerState {
fn config(&self) -> &Config {
&self.config
}
fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()>
where
Self: Sized,
{
Ok((&self.outgoing).send(
ServerState::sanitize(&msg.into().to_string()).into(),
)?)
}
fn stream(&self) -> ServerStream {
unimplemented!()
}
#[cfg(not(feature = "nochanlists"))]
fn list_channels(&self) -> Option<Vec<String>> {
Some(
self.chanlists
.lock()
.unwrap()
.keys()
.map(|k| k.to_owned())
.collect(),
)
}
#[cfg(feature = "nochanlists")]
fn list_channels(&self) -> Option<Vec<String>> {
None
}
#[cfg(not(feature = "nochanlists"))]
fn list_users(&self, chan: &str) -> Option<Vec<User>> {
self.chanlists
.lock()
.unwrap()
.get(&chan.to_owned())
.cloned()
}
#[cfg(feature = "nochanlists")]
fn list_users(&self, _: &str) -> Option<Vec<User>> {
None
}
}
impl ServerState {
fn new<C>(conn: C, config: Config) -> ServerState
where
C: Connection + Send + Sync + 'static,
fn new(incoming: SplitStream<Connection>, outgoing: UnboundedSender<Message>, config: Config) -> ServerState
{
ServerState {
conn: Box::new(conn),
config: config,
chanlists: Mutex::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
reconnect_count: Mutex::new(0),
last_action_time: Mutex::new(now()),
last_ping_data: Mutex::new(None),
waiting_pong_reply: AtomicBool::new(false),
incoming: Mutex::new(Some(incoming)),
outgoing: outgoing,
}
}
fn reconnect(&self) -> Result<()> {
let mut ping_data = self.last_ping_data.lock().unwrap();
*ping_data = None;
self.waiting_pong_reply.store(false, Ordering::SeqCst);
self.conn.reconnect()
}
fn action_taken(&self) {
let mut time = self.last_action_time.lock().unwrap();
*time = now();
}
fn should_ping(&self) -> bool {
let time = self.last_action_time.lock().unwrap();
(now() - *time) > Duration::seconds(self.config.ping_time() as i64)
}
fn last_action_time(&self) -> Tm {
*self.last_action_time.lock().unwrap()
}
fn update_ping_data(&self, data: Timespec) {
let mut ping_data = self.last_ping_data.lock().unwrap();
*ping_data = Some(data);
self.waiting_pong_reply.store(true, Ordering::SeqCst);
}
fn last_ping_data(&self) -> Option<Timespec> {
*self.last_ping_data.lock().unwrap()
}
fn waiting_pong_reply(&self) -> bool {
self.waiting_pong_reply.load(Ordering::SeqCst)
}
fn check_pong(&self, data: &str) {
if let Some(ref time) = self.last_ping_data() {
let fmt = format!("{}", time.sec);
if fmt == data {
// found matching pong reply
self.waiting_pong_reply.store(false, Ordering::SeqCst);
}
}
}
fn send_impl(&self, msg: Message) {
while let Err(_) = self.write(msg.clone()) {
let _ = self.reconnect().and_then(|_| self.identify());
}
self.action_taken();
}
/// Sanitizes the input string by cutting up to (and including) the first occurence of a line
/// terminiating phrase (`\r\n`, `\r`, or `\n`).
fn sanitize(data: &str) -> &str {
@ -152,260 +154,19 @@ impl ServerState {
}
}
fn write<M: Into<Message>>(&self, msg: M) -> Result<()> {
self.conn.send(
ServerState::sanitize(&msg.into().to_string()),
self.config.encoding(),
)
}
}
impl IrcServer {
/// Creates a new IRC Server connection from the configuration at the specified path,
/// connecting immediately.
pub fn new<P: AsRef<Path>>(config: P) -> Result<IrcServer> {
IrcServer::from_config(try!(Config::load(config)))
}
/// Creates a new IRC server connection from the specified configuration, connecting
/// immediately.
pub fn from_config(config: Config) -> Result<IrcServer> {
let conn = try!(if config.use_ssl() {
NetConnection::connect_ssl(config.server(), config.port())
} else {
NetConnection::connect(config.server(), config.port())
});
Ok(IrcServer::from_connection(config, conn))
}
}
impl Clone for IrcServer {
fn clone(&self) -> IrcServer {
IrcServer {
state: self.state.clone(),
reconnect_count: self.reconnect_count.clone(),
}
}
}
impl<'a> Server for ServerState {
fn config(&self) -> &Config {
&self.config
}
fn send<M: Into<Message>>(&self, msg: M) -> Result<()>
where
Self: Sized,
{
self.send_impl(msg.into());
Ok(())
}
fn iter(&self) -> Box<Iterator<Item = Result<Message>>> {
panic!("unimplemented")
}
#[cfg(not(feature = "nochanlists"))]
fn list_channels(&self) -> Option<Vec<String>> {
Some(
self.chanlists
.lock()
.unwrap()
.keys()
.map(|k| k.to_owned())
.collect(),
)
}
#[cfg(feature = "nochanlists")]
fn list_channels(&self) -> Option<Vec<String>> {
None
}
#[cfg(not(feature = "nochanlists"))]
fn list_users(&self, chan: &str) -> Option<Vec<User>> {
self.chanlists
.lock()
.unwrap()
.get(&chan.to_owned())
.cloned()
}
#[cfg(feature = "nochanlists")]
fn list_users(&self, _: &str) -> Option<Vec<User>> {
None
}
}
impl Server for IrcServer {
fn config(&self) -> &Config {
&self.state.config
}
fn send<M: Into<Message>>(&self, msg: M) -> Result<()>
where
Self: Sized,
{
let msg = msg.into();
try!(self.handle_sent_message(&msg));
self.state.send(msg)
}
fn iter<'a>(&'a self) -> Box<Iterator<Item = Result<Message>> + 'a> {
Box::new(ServerIterator::new(self))
}
#[cfg(not(feature = "nochanlists"))]
fn list_channels(&self) -> Option<Vec<String>> {
Some(
self.state
.chanlists
.lock()
.unwrap()
.keys()
.map(|k| k.to_owned())
.collect(),
)
}
#[cfg(feature = "nochanlists")]
fn list_channels(&self) -> Option<Vec<String>> {
None
}
#[cfg(not(feature = "nochanlists"))]
fn list_users(&self, chan: &str) -> Option<Vec<User>> {
self.state
.chanlists
.lock()
.unwrap()
.get(&chan.to_owned())
.cloned()
}
#[cfg(feature = "nochanlists")]
fn list_users(&self, _: &str) -> Option<Vec<User>> {
None
}
}
impl IrcServer {
/// Creates an IRC server from the specified configuration, and any arbitrary sync connection.
pub fn from_connection<C>(config: Config, conn: C) -> IrcServer
where
C: Connection + Send + Sync + 'static,
{
let state = Arc::new(ServerState::new(conn, config));
let ping_time = (state.config.ping_time() as i64) * 1000;
let ping_timeout = (state.config.ping_timeout() as i64) * 1000;
let weak = Arc::downgrade(&state);
spawn(move || while let Some(strong) = weak.upgrade() {
let ping_idle_timespec = {
let last_action = strong.last_action_time().to_timespec();
if let Some(last_ping) = strong.last_ping_data() {
if last_action < last_ping {
last_ping
} else {
last_action
}
} else {
last_action
}
};
let now_timespec = now().to_timespec();
let sleep_dur_ping_time = ping_time -
(now_timespec - ping_idle_timespec).num_milliseconds();
let sleep_dur_ping_timeout = if let Some(time) = strong.last_ping_data() {
ping_timeout - (now_timespec - time).num_milliseconds()
} else {
ping_timeout
};
if strong.waiting_pong_reply() && sleep_dur_ping_timeout < sleep_dur_ping_time {
// timeout check is earlier
let sleep_dur = sleep_dur_ping_timeout;
if sleep_dur > 0 {
sleep(StdDuration::from_millis(sleep_dur as u64))
}
if strong.waiting_pong_reply() {
let _ = strong.reconnect();
while let Err(_) = strong.identify() {
let _ = strong.reconnect();
}
}
} else {
let sleep_dur = sleep_dur_ping_time;
if sleep_dur > 0 {
sleep(StdDuration::from_millis(sleep_dur as u64))
}
let now_timespec = now().to_timespec();
if strong.should_ping() {
let data = now_timespec;
strong.update_ping_data(data);
let fmt = format!("{}", data.sec);
while let Err(_) = strong.write(PING(fmt.clone(), None)) {
let _ = strong.reconnect();
}
}
}
});
IrcServer {
state: state,
reconnect_count: Cell::new(0),
}
}
/// Gets a reference to the IRC server's connection.
pub fn conn(&self) -> &Box<Connection + Send + Sync> {
&self.state.conn
}
/// Reconnects to the IRC server, disconnecting if necessary.
pub fn reconnect(&self) -> Result<()> {
let mut reconnect_count = self.state.reconnect_count.lock().unwrap();
let res = if self.reconnect_count.get() == *reconnect_count {
*reconnect_count += 1;
self.state.reconnect()
} else {
Ok(())
};
self.reconnect_count.set(*reconnect_count);
res
}
/// Gets the current nickname in use.
pub fn current_nickname(&self) -> &str {
let alt_nicks = self.config().alternate_nicknames();
let index = self.state.alt_nick_index.read().unwrap();
let index = self.alt_nick_index.read().unwrap();
match *index {
0 => self.config().nickname(),
i => alt_nicks[i - 1],
}
}
/// Returns a reference to the server state's channel lists.
fn chanlists(&self) -> &Mutex<HashMap<String, Vec<User>>> {
&self.state.chanlists
}
/// Handles sent messages internally for basic client functionality.
fn handle_sent_message(&self, msg: &Message) -> Result<()> {
match msg.command {
PART(ref chan, _) => {
let _ = self.state.chanlists.lock().unwrap().remove(chan);
}
_ => (),
}
Ok(())
}
/// Handles received messages internally for basic client functionality.
fn handle_message(&self, msg: &Message) -> Result<()> {
fn handle_message(&self, msg: &Message) -> error::Result<()> {
match msg.command {
PING(ref data, _) => try!(self.send_pong(data)),
PONG(ref pingdata, None) |
PONG(_, Some(ref pingdata)) => self.state.check_pong(pingdata),
JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan),
PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan),
QUIT(_) => self.handle_quit(msg.source_nickname().unwrap_or("")),
@ -435,28 +196,28 @@ impl IrcServer {
}
Command::Response(Response::RPL_ENDOFMOTD, _, _) |
Command::Response(Response::ERR_NOMOTD, _, _) => {
try!(self.send_nick_password());
try!(self.send_umodes());
self.send_nick_password()?;
self.send_umodes()?;
let config_chans = self.config().channels();
for chan in &config_chans {
match self.config().channel_key(chan) {
Some(key) => try!(self.send_join_with_keys(chan, key)),
None => try!(self.send_join(chan)),
Some(key) => self.send_join_with_keys(chan, key)?,
None => self.send_join(chan)?,
}
}
let joined_chans = self.state.chanlists.lock().unwrap();
let joined_chans = self.chanlists.lock().unwrap();
for chan in joined_chans.keys().filter(
|x| !config_chans.contains(&x.as_str()),
)
{
try!(self.send_join(chan))
self.send_join(chan)?
}
}
Command::Response(Response::ERR_NICKNAMEINUSE, _, _) |
Command::Response(Response::ERR_ERRONEOUSNICKNAME, _, _) => {
let alt_nicks = self.config().alternate_nicknames();
let mut index = self.state.alt_nick_index.write().unwrap();
let mut index = self.alt_nick_index.write().unwrap();
if *index >= alt_nicks.len() {
panic!("All specified nicknames were in use or disallowed.")
} else {
@ -464,16 +225,16 @@ impl IrcServer {
*index += 1;
}
}
_ => (),
_ => ()
}
Ok(())
}
fn send_nick_password(&self) -> Result<()> {
fn send_nick_password(&self) -> error::Result<()> {
if self.config().nick_password().is_empty() {
Ok(())
} else {
let mut index = self.state.alt_nick_index.write().unwrap();
let mut index = self.alt_nick_index.write().unwrap();
if self.config().should_ghost() && *index != 0 {
for seq in &self.config().ghost_sequence() {
try!(self.send(NICKSERV(format!(
@ -492,7 +253,7 @@ impl IrcServer {
}
}
fn send_umodes(&self) -> Result<()> {
fn send_umodes(&self) -> error::Result<()> {
if self.config().umodes().is_empty() {
Ok(())
} else {
@ -505,7 +266,7 @@ impl IrcServer {
#[cfg(not(feature = "nochanlists"))]
fn handle_join(&self, src: &str, chan: &str) {
if let Some(vec) = self.chanlists().lock().unwrap().get_mut(&chan.to_owned()) {
if let Some(vec) = self.chanlists.lock().unwrap().get_mut(&chan.to_owned()) {
if !src.is_empty() {
vec.push(User::new(src))
}
@ -517,7 +278,7 @@ impl IrcServer {
#[cfg(not(feature = "nochanlists"))]
fn handle_part(&self, src: &str, chan: &str) {
if let Some(vec) = self.chanlists().lock().unwrap().get_mut(&chan.to_owned()) {
if let Some(vec) = self.chanlists.lock().unwrap().get_mut(&chan.to_owned()) {
if !src.is_empty() {
if let Some(n) = vec.iter().position(|x| x.get_nickname() == src) {
vec.swap_remove(n);
@ -534,7 +295,7 @@ impl IrcServer {
if src.is_empty() {
return;
}
let mut chanlists = self.chanlists().lock().unwrap();
let mut chanlists = self.chanlists.lock().unwrap();
for channel in chanlists.clone().keys() {
if let Some(vec) = chanlists.get_mut(&channel.to_owned()) {
if let Some(p) = vec.iter().position(|x| x.get_nickname() == src) {
@ -552,7 +313,7 @@ impl IrcServer {
if old_nick.is_empty() || new_nick.is_empty() {
return;
}
let mut chanlists = self.chanlists().lock().unwrap();
let mut chanlists = self.chanlists.lock().unwrap();
for channel in chanlists.clone().keys() {
if let Some(vec) = chanlists.get_mut(&channel.to_owned()) {
if let Some(n) = vec.iter().position(|x| x.get_nickname() == old_nick) {
@ -568,7 +329,7 @@ impl IrcServer {
#[cfg(not(feature = "nochanlists"))]
fn handle_mode(&self, chan: &str, mode: &str, user: &str) {
if let Some(vec) = self.chanlists().lock().unwrap().get_mut(chan) {
if let Some(vec) = self.chanlists.lock().unwrap().get_mut(chan) {
if let Some(n) = vec.iter().position(|x| x.get_nickname() == user) {
vec[n].update_access_level(mode)
}
@ -584,7 +345,7 @@ impl IrcServer {
if args.len() == 3 {
let chan = &args[2];
for user in users.split(' ') {
let mut chanlists = self.state.chanlists.lock().unwrap();
let mut chanlists = self.chanlists.lock().unwrap();
chanlists
.entry(chan.clone())
.or_insert_with(Vec::new)
@ -596,7 +357,7 @@ impl IrcServer {
/// Handles CTCP requests if the CTCP feature is enabled.
#[cfg(feature = "ctcp")]
fn handle_ctcp(&self, resp: &str, tokens: Vec<&str>) -> Result<()> {
fn handle_ctcp(&self, resp: &str, tokens: Vec<&str>) -> error::Result<()> {
if tokens.is_empty() {
return Ok(());
}
@ -624,7 +385,7 @@ impl IrcServer {
"PING" if tokens.len() > 1 => {
self.send_ctcp_internal(resp, &format!("PING {}", tokens[1]))
}
"TIME" => self.send_ctcp_internal(resp, &format!("TIME :{}", now().rfc822z())),
"TIME" => self.send_ctcp_internal(resp, &format!("TIME :{}", time::now().rfc822z())),
"USERINFO" => {
self.send_ctcp_internal(resp, &format!("USERINFO :{}", self.config().user_info()))
}
@ -634,7 +395,7 @@ impl IrcServer {
/// Sends a CTCP-escaped message.
#[cfg(feature = "ctcp")]
fn send_ctcp_internal(&self, target: &str, msg: &str) -> Result<()> {
fn send_ctcp_internal(&self, target: &str, msg: &str) -> error::Result<()> {
self.send_notice(target, &format!("\u{001}{}\u{001}", msg))
}
@ -645,53 +406,123 @@ impl IrcServer {
}
}
/// An `Iterator` over an `IrcServer`'s incoming `Messages`.
pub struct ServerIterator<'a> {
server: &'a IrcServer,
/// A thread-safe implementation of an IRC Server connection.
#[derive(Clone)]
pub struct IrcServer {
/// The internal, thread-safe server state.
state: Arc<ServerState>,
}
impl<'a> ServerIterator<'a> {
/// Creates a new ServerIterator for the desired IrcServer.
pub fn new(server: &'a IrcServer) -> ServerIterator {
ServerIterator { server: server }
impl Server for IrcServer {
fn config(&self) -> &Config {
&self.state.config
}
/// Gets the next line from the connection.
fn get_next_line(&self) -> Result<String> {
self.server.conn().recv(self.server.config().encoding())
fn send<M: Into<Message>>(&self, msg: M) -> error::Result<()>
where
Self: Sized,
{
let msg = msg.into();
try!(self.handle_sent_message(&msg));
self.state.send(msg)
}
fn stream(&self) -> ServerStream {
ServerStream {
state: self.state.clone(),
stream: self.state.incoming.lock().unwrap().take().unwrap(),
}
}
impl<'a> Iterator for ServerIterator<'a> {
type Item = Result<Message>;
fn next(&mut self) -> Option<Result<Message>> {
loop {
match self.get_next_line() {
Ok(msg) => {
match msg.parse() {
Ok(res) => {
match self.server.handle_message(&res) {
Ok(()) => (),
Err(err) => return Some(Err(err)),
#[cfg(not(feature = "nochanlists"))]
fn list_channels(&self) -> Option<Vec<String>> {
Some(
self.state
.chanlists
.lock()
.unwrap()
.keys()
.map(|k| k.to_owned())
.collect(),
)
}
self.server.state.action_taken();
return Some(Ok(res));
#[cfg(feature = "nochanlists")]
fn list_channels(&self) -> Option<Vec<String>> {
None
}
Err(_) => {
return Some(Err(Error::new(
ErrorKind::InvalidInput,
&format!("Failed to parse message. (Message: {})", msg)[..],
).into()))
#[cfg(not(feature = "nochanlists"))]
fn list_users(&self, chan: &str) -> Option<Vec<User>> {
self.state
.chanlists
.lock()
.unwrap()
.get(&chan.to_owned())
.cloned()
}
#[cfg(feature = "nochanlists")]
fn list_users(&self, _: &str) -> Option<Vec<User>> {
None
}
}
impl IrcServer {
/// Creates a new IRC Server connection from the configuration at the specified path,
/// connecting immediately.
pub fn new<P: AsRef<Path>>(config: P) -> error::Result<IrcServer> {
IrcServer::from_config(Config::load(config)?)
}
Err(ref err) if err.description() == "EOF" => return None,
Err(_) => {
let _ = self.server.reconnect().and_then(|_| self.server.identify());
/// Creates a new IRC server connection from the specified configuration, connecting
/// immediately.
pub fn from_config(config: Config) -> error::Result<IrcServer> {
// Setting up a remote reactor running forever.
let (tx_outgoing, rx_outgoing) = mpsc::unbounded();
let (tx_incoming, rx_incoming) = oneshot::channel();
let cfg = config.clone();
let _ = thread::spawn(move || {
let mut reactor = Core::new().unwrap();
// Setting up internal processing stuffs.
let handle = reactor.handle();
let (sink, stream) = reactor.run(Connection::new(&cfg, &handle).unwrap()).unwrap().split();
let outgoing_future = sink.send_all(rx_outgoing.map_err(|_| {
let res: error::Error = error::ErrorKind::ChannelError.into();
res
}));
handle.spawn(outgoing_future.map(|_| ()).map_err(|_| ()));
// let incoming_future = tx_incoming.sink_map_err(|e| {
// let res: error::Error = e.into();
// res
// }).send_all(stream);
// // let incoming_future = stream.forward(tx_incoming);
// handle.spawn(incoming_future.map(|_| ()).map_err(|_| ()));
tx_incoming.send(stream).unwrap();
reactor.run(future::empty::<(), ()>()).unwrap();
});
Ok(IrcServer {
state: Arc::new(ServerState::new(rx_incoming.wait()?, tx_outgoing, config)),
})
}
/// Handles sent messages internally for basic client functionality.
fn handle_sent_message(&self, msg: &Message) -> error::Result<()> {
match msg.command {
PART(ref chan, _) => {
let _ = self.state.chanlists.lock().unwrap().remove(chan);
}
_ => (),
}
Ok(())
}
}
#[cfg(test)]