Fixed implementation of auto-reconnection by updating message sending to

use the write queue.
This commit is contained in:
Aaron Weiss 2016-01-16 11:15:10 -05:00
parent 3dc15449a8
commit f34a6ba5cf

View file

@ -15,6 +15,7 @@ use client::conn::{Connection, NetStream, Reconnect};
use client::data::{Command, Config, Message, Response, User};
use client::data::Command::{JOIN, NICK, NICKSERV, PING, PONG, MODE};
use client::data::kinds::{IrcRead, IrcWrite};
use client::server::utils::ServerExt;
use time::{Duration, Timespec, Tm, now};
pub mod utils;
@ -59,6 +60,8 @@ struct ServerState<T: IrcRead, U: IrcWrite> {
alt_nick_index: RwLock<usize>,
/// A thread-safe count of reconnection attempts used for synchronization.
reconnect_count: Mutex<u32>,
/// A global copy of the channel for sending messages to write.
tx: Mutex<Option<Sender<Message>>>,
/// A thread-safe store for the time of the last action.
last_action_time: Mutex<Tm>,
/// A thread-safe store for the last ping data.
@ -74,6 +77,7 @@ impl<T: IrcRead, U: IrcWrite> ServerState<T, U> where Connection<T, U>: Reconnec
chanlists: Mutex::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
reconnect_count: Mutex::new(0),
tx: Mutex::new(None),
last_action_time: Mutex::new(now()),
last_ping_data: Mutex::new(None),
}
@ -148,19 +152,47 @@ impl<T: IrcRead, U: IrcWrite> Drop for ServerState<T, U> {
}
}
impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for ServerState<T, U> where Connection<T, U>: Reconnect {
fn config(&self) -> &Config {
&self.config
}
fn send<M: Into<Message>>(&self, msg: M) -> Result<()> where Self: Sized {
let opt_tx = self.tx.lock().unwrap();
let ref rf_tx = *opt_tx;
match rf_tx {
&Some(ref tx) => tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e)),
&None => Err(Error::new(ErrorKind::NotFound, "Channel was not found."))
}
}
fn iter(&'a self) -> ServerIterator<'a, T, U> {
panic!("unimplemented")
}
fn iter_cmd(&'a self) -> ServerCmdIterator<'a, T, U> {
self.iter().map(Command::from_message_io)
}
#[cfg(not(feature = "nochanlists"))]
fn list_users(&self, chan: &str) -> Option<Vec<User>> {
self.chanlists.lock().unwrap().get(&chan.to_owned()).cloned()
}
#[cfg(feature = "nochanlists")]
fn list_users(&self, chan: &str) -> Option<Vec<User>> {
None
}
}
impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for IrcServer<T, U> where Connection<T, U>: Reconnect {
fn config(&self) -> &Config {
&self.state.config
}
#[cfg(feature = "encode")]
fn send<M: Into<Message>>(&self, msg: M) -> Result<()> {
self.state.conn.send(msg, self.config().encoding())
}
#[cfg(not(feature = "encode"))]
fn send<M: Into<Message>>(&self, msg: M) -> Result<()> where Self: Sized {
self.state.conn.send(msg)
self.tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e))
}
fn iter(&'a self) -> ServerIterator<'a, T, U> {
@ -193,14 +225,17 @@ impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> where Connection<T, U>: Reconnect
if let Some(strong) = weak.upgrade() {
if let Some(time) = strong.last_ping_data() {
if now().to_timespec() - time > strong.ping_timeout_duration() {
while let Err(_) = strong.reconnect() {} // Continue trying to reconnect.
let _ = strong.reconnect();
while let Err(_) = strong.identify() {
let _ = strong.reconnect();
}
}
}
}
match rx.try_recv() {
Ok(msg) => if let Some(strong) = weak.upgrade() {
while let Err(_) = IrcServer::write(&strong, msg.clone()) {
let _ = strong.reconnect();
let _ = strong.reconnect().and_then(|_| strong.identify());
}
strong.action_taken();
},
@ -220,6 +255,8 @@ impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> where Connection<T, U>: Reconnect
let state2 = state.clone();
let mut handle = state2.write_handle.lock().unwrap();
*handle = Some(write_handle);
let mut state_tx = state2.tx.lock().unwrap();
*state_tx = Some(tx.clone());
IrcServer { tx: tx, state: state, reconnect_count: Cell::new(0) }
}
@ -444,7 +481,7 @@ impl<'a, T: IrcRead + 'a, U: IrcWrite + 'a> Iterator for ServerIterator<'a, T, U
},
Err(ref err) if err.description() == "EOF" => return None,
Err(_) => {
let _ = self.server.reconnect();
let _ = self.server.reconnect().and_then(|_| self.server.identify());
}
}
}