Write directly from threads (through a mutex) instead of using a writing thread

The writing thread now becomes a simple pinging thread. The pinging
thread will exit when the weak reference to the ServerState can no
longer be upgraded i.e. when the ServerState has been dropped i.e. when
the IrcServer has been dropped.

This fixes #43 as well.

This is the first in a series of changes that will reduce allocations by
the library.
This commit is contained in:
angelsl 2016-02-16 21:52:40 +08:00 committed by Aaron Weiss
parent 7d7e09fa8e
commit 46f9136c93

View file

@ -6,8 +6,8 @@ use std::error::Error as StdError;
use std::io::{Error, ErrorKind, Result};
use std::path::Path;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
use std::thread::{JoinHandle, spawn};
use std::thread::{spawn, sleep};
use std::time::Duration as StdDuration;
use client::conn::{Connection, NetConnection};
use client::data::{Command, Config, Message, Response, User};
use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PRIVMSG, MODE};
@ -35,8 +35,6 @@ pub trait Server {
/// A thread-safe implementation of an IRC Server connection.
pub struct IrcServer {
/// The channel for sending messages to write.
tx: Sender<Message>,
/// The internal, thread-safe server state.
state: Arc<ServerState>,
/// A thread-local count of reconnection attempts used for synchronization.
@ -45,12 +43,8 @@ pub struct IrcServer {
/// Thread-safe internal state for an IRC server connection.
struct ServerState {
/// A global copy of the channel for sending messages to write.
tx: Mutex<Option<Sender<Message>>>,
/// The thread-safe IRC connection.
conn: Box<Connection + Send + Sync>,
/// The handle for the message sending thread.
write_handle: Mutex<Option<JoinHandle<()>>>,
/// The configuration used with this connection.
config: Config,
/// A thread-safe map of channels to the list of users in them.
@ -68,9 +62,7 @@ struct ServerState {
impl ServerState {
fn new<C>(conn: C, config: Config) -> ServerState where C: Connection + Send + Sync + 'static {
ServerState {
tx: Mutex::new(None),
conn: Box::new(conn),
write_handle: Mutex::new(None),
config: config,
chanlists: Mutex::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
@ -106,6 +98,23 @@ impl ServerState {
fn last_ping_data(&self) -> Option<Timespec> {
self.last_ping_data.lock().unwrap().clone()
}
fn send_impl(&self, msg: Message) {
while let Err(_) = self.write(msg.clone()) {
let _ = self.reconnect().and_then(|_| self.identify());
}
self.action_taken();
}
#[cfg(feature = "encode")]
fn write<M: Into<Message>>(&self, msg: M) -> Result<()> {
self.conn.send(&msg.into().into_string(), self.config.encoding())
}
#[cfg(not(feature = "encode"))]
fn write<M: Into<Message>>(&self, msg: M) -> Result<()> {
self.conn.send(&msg.into().into_string())
}
}
impl IrcServer {
@ -130,35 +139,20 @@ impl IrcServer {
impl Clone for IrcServer {
fn clone(&self) -> IrcServer {
IrcServer {
tx: self.tx.clone(),
state: self.state.clone(),
reconnect_count: self.reconnect_count.clone()
}
}
}
impl Drop for ServerState {
fn drop(&mut self) {
let _ = self.tx.lock().unwrap().take();
let mut guard = self.write_handle.lock().unwrap();
if let Some(handle) = guard.take() {
handle.join().unwrap()
}
}
}
impl<'a> Server for ServerState {
fn config(&self) -> &Config {
&self.config
}
fn send<M: Into<Message>>(&self, msg: M) -> Result<()> where Self: Sized {
let opt_tx = self.tx.lock().unwrap();
let ref rf_tx = *opt_tx;
match rf_tx {
&Some(ref tx) => tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e)),
&None => Err(Error::new(ErrorKind::NotFound, "Channel was not found."))
}
self.send_impl(msg.into());
Ok(())
}
fn iter(&self) -> Box<Iterator<Item = Result<Message>>> {
@ -183,7 +177,7 @@ impl Server for IrcServer {
}
fn send<M: Into<Message>>(&self, msg: M) -> Result<()> where Self: Sized {
self.tx.send(msg.into()).map_err(|e| Error::new(ErrorKind::Other, e))
self.state.send(msg)
}
fn iter<'a>(&'a self) -> Box<Iterator<Item = Result<Message>> + 'a> {
@ -206,46 +200,36 @@ impl IrcServer {
/// Creates an IRC server from the specified configuration, and any arbitrary sync connection.
pub fn from_connection<C>(config: Config, conn: C) -> IrcServer
where C: Connection + Send + Sync + 'static {
let (tx, rx): (Sender<Message>, Receiver<Message>) = channel();
let state = Arc::new(ServerState::new(conn, config));
let ping_time = (state.config.ping_time() as i64) * 1000;
let weak = Arc::downgrade(&state);
let write_handle = spawn(move || loop {
if let Some(strong) = weak.upgrade() {
if let Some(time) = strong.last_ping_data() {
spawn(move || while let Some(strong) = weak.upgrade() {
let sleep_dur = if let Some(time) = strong.last_ping_data() {
if now().to_timespec() - time > strong.ping_timeout_duration() {
let _ = strong.reconnect();
while let Err(_) = strong.identify() {
let _ = strong.reconnect();
}
}
ping_time - (now().to_timespec() - time).num_milliseconds()
} else {
ping_time
};
if sleep_dur > 0 {
sleep(StdDuration::from_millis(sleep_dur as u64))
}
}
match rx.try_recv() {
Ok(msg) => if let Some(strong) = weak.upgrade() {
while let Err(_) = IrcServer::write(&strong, msg.clone()) {
let _ = strong.reconnect().and_then(|_| strong.identify());
}
strong.action_taken();
},
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => if let Some(strong) = weak.upgrade() {
if strong.should_ping() {
let data = now().to_timespec();
strong.update_ping_data(data);
let fmt = format!("{}", data.sec);
while let Err(_) = IrcServer::write(&strong, PING(fmt.clone(), None)) {
while let Err(_) = strong.write(PING(fmt.clone(), None)) {
let _ = strong.reconnect();
}
}
},
}
});
let state2 = state.clone();
let mut handle = state2.write_handle.lock().unwrap();
*handle = Some(write_handle);
let mut state_tx = state2.tx.lock().unwrap();
*state_tx = Some(tx.clone());
IrcServer { tx: tx, state: state, reconnect_count: Cell::new(0) }
IrcServer { state: state, reconnect_count: Cell::new(0) }
}
/// Gets a reference to the IRC server's connection.
@ -276,16 +260,6 @@ impl IrcServer {
}
}
#[cfg(feature = "encode")]
fn write<M: Into<Message>>(state: &Arc<ServerState>, msg: M) -> Result<()> {
state.conn.send(&msg.into().into_string(), state.config.encoding())
}
#[cfg(not(feature = "encode"))]
fn write<M: Into<Message>>(state: &Arc<ServerState>, msg: M) -> Result<()> {
state.conn.send(&msg.into().into_string())
}
/// Returns a reference to the server state's channel lists.
fn chanlists(&self) -> &Mutex<HashMap<String, Vec<User>>> {
&self.state.chanlists
@ -422,22 +396,6 @@ impl IrcServer {
fn handle_ctcp(&self, _: &str, _: Vec<&str>) -> Result<()> {
Ok(())
}
/// Waits for the completion of all writes for messages in this server's write queue. This
/// function may cause unusual behavior when called on a server with operations being performed
/// on other threads. This function is destructive, and is primarily intended for writing unit
/// tests. Use it with care.
pub fn await_writing(&mut self) {
let _ = self.state.tx.lock().unwrap().take();
// This is a terrible hack to get the real channel to drop.
// Otherwise, joining would never finish.
let (tx, _) = channel();
self.tx = tx;
let mut guard = self.state.write_handle.lock().unwrap();
if let Some(handle) = guard.take() {
handle.join().unwrap()
}
}
}
/// An Iterator over an IrcServer's incoming Messages.
@ -513,14 +471,12 @@ mod test {
}
#[cfg(feature = "encode")]
pub fn get_server_value(mut server: IrcServer) -> String {
server.await_writing();
pub fn get_server_value(server: IrcServer) -> String {
server.conn().written(server.config().encoding()).unwrap()
}
#[cfg(not(feature = "encode"))]
pub fn get_server_value(mut server: IrcServer) -> String {
server.await_writing();
pub fn get_server_value(server: IrcServer) -> String {
server.conn().written().unwrap()
}