diff --git a/src/client/server/mod.rs b/src/client/server/mod.rs index 3407d7f..2c9f2eb 100644 --- a/src/client/server/mod.rs +++ b/src/client/server/mod.rs @@ -6,8 +6,8 @@ use std::error::Error as StdError; use std::io::{Error, ErrorKind, Result}; use std::path::Path; use std::sync::{Arc, Mutex, RwLock}; -use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; -use std::thread::{JoinHandle, spawn}; +use std::thread::{spawn, sleep}; +use std::time::Duration as StdDuration; use client::conn::{Connection, NetConnection}; use client::data::{Command, Config, Message, Response, User}; use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PRIVMSG, MODE}; @@ -35,8 +35,6 @@ pub trait Server { /// A thread-safe implementation of an IRC Server connection. pub struct IrcServer { - /// The channel for sending messages to write. - tx: Sender, /// The internal, thread-safe server state. state: Arc, /// A thread-local count of reconnection attempts used for synchronization. @@ -45,12 +43,8 @@ pub struct IrcServer { /// Thread-safe internal state for an IRC server connection. struct ServerState { - /// A global copy of the channel for sending messages to write. - tx: Mutex>>, /// The thread-safe IRC connection. conn: Box, - /// The handle for the message sending thread. - write_handle: Mutex>>, /// The configuration used with this connection. config: Config, /// A thread-safe map of channels to the list of users in them. @@ -68,9 +62,7 @@ struct ServerState { impl ServerState { fn new(conn: C, config: Config) -> ServerState where C: Connection + Send + Sync + 'static { ServerState { - tx: Mutex::new(None), conn: Box::new(conn), - write_handle: Mutex::new(None), config: config, chanlists: Mutex::new(HashMap::new()), alt_nick_index: RwLock::new(0), @@ -106,6 +98,23 @@ impl ServerState { fn last_ping_data(&self) -> Option { self.last_ping_data.lock().unwrap().clone() } + + fn send_impl(&self, msg: Message) { + while let Err(_) = self.write(msg.clone()) { + let _ = self.reconnect().and_then(|_| self.identify()); + } + self.action_taken(); + } + + #[cfg(feature = "encode")] + fn write>(&self, msg: M) -> Result<()> { + self.conn.send(&msg.into().into_string(), self.config.encoding()) + } + + #[cfg(not(feature = "encode"))] + fn write>(&self, msg: M) -> Result<()> { + self.conn.send(&msg.into().into_string()) + } } impl IrcServer { @@ -130,35 +139,20 @@ impl IrcServer { impl Clone for IrcServer { fn clone(&self) -> IrcServer { IrcServer { - tx: self.tx.clone(), state: self.state.clone(), reconnect_count: self.reconnect_count.clone() } } } -impl Drop for ServerState { - fn drop(&mut self) { - let _ = self.tx.lock().unwrap().take(); - let mut guard = self.write_handle.lock().unwrap(); - if let Some(handle) = guard.take() { - handle.join().unwrap() - } - } -} - impl<'a> Server for ServerState { fn config(&self) -> &Config { &self.config } fn send>(&self, msg: M) -> Result<()> where Self: Sized { - let opt_tx = self.tx.lock().unwrap(); - let ref rf_tx = *opt_tx; - match rf_tx { - &Some(ref tx) => tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e)), - &None => Err(Error::new(ErrorKind::NotFound, "Channel was not found.")) - } + self.send_impl(msg.into()); + Ok(()) } fn iter(&self) -> Box>> { @@ -183,7 +177,7 @@ impl Server for IrcServer { } fn send>(&self, msg: M) -> Result<()> where Self: Sized { - self.tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e)) + self.state.send(msg) } fn iter<'a>(&'a self) -> Box> + 'a> { @@ -206,46 +200,36 @@ impl IrcServer { /// Creates an IRC server from the specified configuration, and any arbitrary sync connection. pub fn from_connection(config: Config, conn: C) -> IrcServer where C: Connection + Send + Sync + 'static { - let (tx, rx): (Sender, Receiver) = channel(); let state = Arc::new(ServerState::new(conn, config)); + let ping_time = (state.config.ping_time() as i64) * 1000; let weak = Arc::downgrade(&state); - let write_handle = spawn(move || loop { - if let Some(strong) = weak.upgrade() { - if let Some(time) = strong.last_ping_data() { - if now().to_timespec() - time > strong.ping_timeout_duration() { + + spawn(move || while let Some(strong) = weak.upgrade() { + let sleep_dur = if let Some(time) = strong.last_ping_data() { + if now().to_timespec() - time > strong.ping_timeout_duration() { + let _ = strong.reconnect(); + while let Err(_) = strong.identify() { let _ = strong.reconnect(); - while let Err(_) = strong.identify() { - let _ = strong.reconnect(); - } } } + ping_time - (now().to_timespec() - time).num_milliseconds() + } else { + ping_time + }; + + if sleep_dur > 0 { + sleep(StdDuration::from_millis(sleep_dur as u64)) } - match rx.try_recv() { - Ok(msg) => if let Some(strong) = weak.upgrade() { - while let Err(_) = IrcServer::write(&strong, msg.clone()) { - let _ = strong.reconnect().and_then(|_| strong.identify()); - } - strong.action_taken(); - }, - Err(TryRecvError::Disconnected) => break, - Err(TryRecvError::Empty) => if let Some(strong) = weak.upgrade() { - if strong.should_ping() { - let data = now().to_timespec(); - strong.update_ping_data(data); - let fmt = format!("{}", data.sec); - while let Err(_) = IrcServer::write(&strong, PING(fmt.clone(), None)) { - let _ = strong.reconnect(); - } - } - }, + if strong.should_ping() { + let data = now().to_timespec(); + strong.update_ping_data(data); + let fmt = format!("{}", data.sec); + while let Err(_) = strong.write(PING(fmt.clone(), None)) { + let _ = strong.reconnect(); + } } }); - let state2 = state.clone(); - let mut handle = state2.write_handle.lock().unwrap(); - *handle = Some(write_handle); - let mut state_tx = state2.tx.lock().unwrap(); - *state_tx = Some(tx.clone()); - IrcServer { tx: tx, state: state, reconnect_count: Cell::new(0) } + IrcServer { state: state, reconnect_count: Cell::new(0) } } /// Gets a reference to the IRC server's connection. @@ -276,16 +260,6 @@ impl IrcServer { } } - #[cfg(feature = "encode")] - fn write>(state: &Arc, msg: M) -> Result<()> { - state.conn.send(&msg.into().into_string(), state.config.encoding()) - } - - #[cfg(not(feature = "encode"))] - fn write>(state: &Arc, msg: M) -> Result<()> { - state.conn.send(&msg.into().into_string()) - } - /// Returns a reference to the server state's channel lists. fn chanlists(&self) -> &Mutex>> { &self.state.chanlists @@ -422,22 +396,6 @@ impl IrcServer { fn handle_ctcp(&self, _: &str, _: Vec<&str>) -> Result<()> { Ok(()) } - - /// Waits for the completion of all writes for messages in this server's write queue. This - /// function may cause unusual behavior when called on a server with operations being performed - /// on other threads. This function is destructive, and is primarily intended for writing unit - /// tests. Use it with care. - pub fn await_writing(&mut self) { - let _ = self.state.tx.lock().unwrap().take(); - // This is a terrible hack to get the real channel to drop. - // Otherwise, joining would never finish. - let (tx, _) = channel(); - self.tx = tx; - let mut guard = self.state.write_handle.lock().unwrap(); - if let Some(handle) = guard.take() { - handle.join().unwrap() - } - } } /// An Iterator over an IrcServer's incoming Messages. @@ -513,14 +471,12 @@ mod test { } #[cfg(feature = "encode")] - pub fn get_server_value(mut server: IrcServer) -> String { - server.await_writing(); + pub fn get_server_value(server: IrcServer) -> String { server.conn().written(server.config().encoding()).unwrap() } #[cfg(not(feature = "encode"))] - pub fn get_server_value(mut server: IrcServer) -> String { - server.await_writing(); + pub fn get_server_value(server: IrcServer) -> String { server.conn().written().unwrap() }