From f34a6ba5cf7a724cdcabbd03c8be89a7374b1445 Mon Sep 17 00:00:00 2001 From: Aaron Weiss Date: Sat, 16 Jan 2016 11:15:10 -0500 Subject: [PATCH] Fixed implementation of auto-reconnection by updating message sending to use the write queue. --- src/client/server/mod.rs | 57 +++++++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/src/client/server/mod.rs b/src/client/server/mod.rs index e4df0b1..fda7cb9 100644 --- a/src/client/server/mod.rs +++ b/src/client/server/mod.rs @@ -15,6 +15,7 @@ use client::conn::{Connection, NetStream, Reconnect}; use client::data::{Command, Config, Message, Response, User}; use client::data::Command::{JOIN, NICK, NICKSERV, PING, PONG, MODE}; use client::data::kinds::{IrcRead, IrcWrite}; +use client::server::utils::ServerExt; use time::{Duration, Timespec, Tm, now}; pub mod utils; @@ -59,6 +60,8 @@ struct ServerState { alt_nick_index: RwLock, /// A thread-safe count of reconnection attempts used for synchronization. reconnect_count: Mutex, + /// A global copy of the channel for sending messages to write. + tx: Mutex>>, /// A thread-safe store for the time of the last action. last_action_time: Mutex, /// A thread-safe store for the last ping data. @@ -74,6 +77,7 @@ impl ServerState where Connection: Reconnec chanlists: Mutex::new(HashMap::new()), alt_nick_index: RwLock::new(0), reconnect_count: Mutex::new(0), + tx: Mutex::new(None), last_action_time: Mutex::new(now()), last_ping_data: Mutex::new(None), } @@ -148,19 +152,47 @@ impl Drop for ServerState { } } +impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for ServerState where Connection: Reconnect { + fn config(&self) -> &Config { + &self.config + } + + fn send>(&self, msg: M) -> Result<()> where Self: Sized { + let opt_tx = self.tx.lock().unwrap(); + let ref rf_tx = *opt_tx; + match rf_tx { + &Some(ref tx) => tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e)), + &None => Err(Error::new(ErrorKind::NotFound, "Channel was not found.")) + } + } + + fn iter(&'a self) -> ServerIterator<'a, T, U> { + panic!("unimplemented") + } + + fn iter_cmd(&'a self) -> ServerCmdIterator<'a, T, U> { + self.iter().map(Command::from_message_io) + } + + #[cfg(not(feature = "nochanlists"))] + fn list_users(&self, chan: &str) -> Option> { + self.chanlists.lock().unwrap().get(&chan.to_owned()).cloned() + } + + + #[cfg(feature = "nochanlists")] + fn list_users(&self, chan: &str) -> Option> { + None + } +} + impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for IrcServer where Connection: Reconnect { fn config(&self) -> &Config { &self.state.config } - #[cfg(feature = "encode")] - fn send>(&self, msg: M) -> Result<()> { - self.state.conn.send(msg, self.config().encoding()) - } - - #[cfg(not(feature = "encode"))] fn send>(&self, msg: M) -> Result<()> where Self: Sized { - self.state.conn.send(msg) + self.tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e)) } fn iter(&'a self) -> ServerIterator<'a, T, U> { @@ -193,14 +225,17 @@ impl IrcServer where Connection: Reconnect if let Some(strong) = weak.upgrade() { if let Some(time) = strong.last_ping_data() { if now().to_timespec() - time > strong.ping_timeout_duration() { - while let Err(_) = strong.reconnect() {} // Continue trying to reconnect. + let _ = strong.reconnect(); + while let Err(_) = strong.identify() { + let _ = strong.reconnect(); + } } } } match rx.try_recv() { Ok(msg) => if let Some(strong) = weak.upgrade() { while let Err(_) = IrcServer::write(&strong, msg.clone()) { - let _ = strong.reconnect(); + let _ = strong.reconnect().and_then(|_| strong.identify()); } strong.action_taken(); }, @@ -220,6 +255,8 @@ impl IrcServer where Connection: Reconnect let state2 = state.clone(); let mut handle = state2.write_handle.lock().unwrap(); *handle = Some(write_handle); + let mut state_tx = state2.tx.lock().unwrap(); + *state_tx = Some(tx.clone()); IrcServer { tx: tx, state: state, reconnect_count: Cell::new(0) } } @@ -444,7 +481,7 @@ impl<'a, T: IrcRead + 'a, U: IrcWrite + 'a> Iterator for ServerIterator<'a, T, U }, Err(ref err) if err.description() == "EOF" => return None, Err(_) => { - let _ = self.server.reconnect(); + let _ = self.server.reconnect().and_then(|_| self.server.identify()); } } }