Moved to using an internal sending channel for messages.

This commit is contained in:
Aaron Weiss 2016-01-03 07:51:10 -05:00
parent ea0577bbaf
commit 6ece3e25fe

View file

@ -5,9 +5,11 @@ use std::borrow::ToOwned;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error as StdError; use std::error::Error as StdError;
use std::io::{BufReader, BufWriter, Error, ErrorKind, Result}; use std::io::{BufReader, BufWriter, Error, ErrorKind, Result};
use std::iter::Map;
use std::path::Path; use std::path::Path;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::iter::Map; use std::sync::mpsc::{Sender, channel};
use std::thread::{JoinHandle, spawn};
use client::conn::{Connection, NetStream}; use client::conn::{Connection, NetStream};
use client::data::{Command, Config, Message, Response, User}; use client::data::{Command, Config, Message, Response, User};
use client::data::Command::{JOIN, NICK, NICKSERV, PONG, MODE}; use client::data::Command::{JOIN, NICK, NICKSERV, PONG, MODE};
@ -35,12 +37,15 @@ pub trait Server<'a, T: IrcRead, U: IrcWrite> {
/// A thread-safe implementation of an IRC Server connection. /// A thread-safe implementation of an IRC Server connection.
pub struct IrcServer<T: IrcRead, U: IrcWrite> { pub struct IrcServer<T: IrcRead, U: IrcWrite> {
state: Arc<ServerState<T, U>>, state: Arc<ServerState<T, U>>,
tx: Sender<Message>,
} }
/// Thread-safe internal state for an IRC server connection. /// Thread-safe internal state for an IRC server connection.
struct ServerState<T: IrcRead, U: IrcWrite> { struct ServerState<T: IrcRead, U: IrcWrite> {
/// The thread-safe IRC connection. /// The thread-safe IRC connection.
conn: Connection<T, U>, conn: Connection<T, U>,
/// The handle for the message sending thread.
write_handle: Mutex<Option<JoinHandle<()>>>,
/// The configuration used with this connection. /// The configuration used with this connection.
config: Config, config: Config,
/// A thread-safe map of channels to the list of users in them. /// A thread-safe map of channels to the list of users in them.
@ -67,13 +72,7 @@ impl IrcServer<BufReader<NetStream>, BufWriter<NetStream>> {
} else { } else {
Connection::connect(config.server(), config.port()) Connection::connect(config.server(), config.port())
}); });
let state = ServerState { Ok(IrcServer::from_connection(config, conn))
config: config,
conn: conn,
chanlists: Mutex::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
};
Ok(IrcServer { state: Arc::new(state) })
} }
/// Reconnects to the IRC server. /// Reconnects to the IRC server.
@ -82,9 +81,18 @@ impl IrcServer<BufReader<NetStream>, BufWriter<NetStream>> {
} }
} }
impl<'a, T: IrcRead, U: IrcWrite> Clone for IrcServer<T, U> { impl<T: IrcRead, U: IrcWrite> Clone for IrcServer<T, U> {
fn clone(&self) -> IrcServer<T, U> { fn clone(&self) -> IrcServer<T, U> {
IrcServer { state: self.state.clone() } IrcServer { state: self.state.clone(), tx: self.tx.clone() }
}
}
impl<T: IrcRead, U: IrcWrite> Drop for ServerState<T, U> {
fn drop(&mut self) {
let mut guard = self.write_handle.lock().unwrap();
if let Some(handle) = guard.take() {
handle.join().unwrap()
}
} }
} }
@ -126,13 +134,27 @@ impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for IrcServer<T, U> {
impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> { impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> {
/// Creates an IRC server from the specified configuration, and any arbitrary Connection. /// Creates an IRC server from the specified configuration, and any arbitrary Connection.
pub fn from_connection(config: Config, conn: Connection<T, U>) -> IrcServer<T, U> { pub fn from_connection(config: Config, conn: Connection<T, U>) -> IrcServer<T, U> {
let state = ServerState { let (tx, rx) = channel();
let state = Arc::new(ServerState {
conn: conn, conn: conn,
write_handle: Mutex::new(None),
config: config, config: config,
chanlists: Mutex::new(HashMap::new()), chanlists: Mutex::new(HashMap::new()),
alt_nick_index: RwLock::new(0), alt_nick_index: RwLock::new(0),
}; });
IrcServer { state: Arc::new(state) } let weak = Arc::downgrade(&state);
let write_handle = spawn(move || {
while let Some(strong) = weak.upgrade() {
match rx.recv() {
Ok(msg) => { IrcServer::write(&strong, msg); },
Err(_) => ()
}
}
});
let state2 = state.clone();
let mut handle = state2.write_handle.lock().unwrap();
*handle = Some(write_handle);
IrcServer { state: state, tx: tx }
} }
/// Gets a reference to the IRC server's connection. /// Gets a reference to the IRC server's connection.
@ -140,6 +162,17 @@ impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> {
&self.state.conn &self.state.conn
} }
#[cfg(feature = "encode")]
fn write<M: Into<Message>>(state: &Arc<ServerState<T, U>>, msg: M) -> Result<()> {
state.conn.send(msg, state.config.encoding())
}
#[cfg(not(feature = "encode"))]
fn write<M: Into<Message>>(state: &Arc<ServerState<T, U>>, msg: M) -> Result<()> where Self: Sized {
state.conn.send(msg)
}
/// Handles messages internally for basic bot functionality. /// Handles messages internally for basic bot functionality.
fn handle_message(&self, msg: &Message) { fn handle_message(&self, msg: &Message) {
if let Some(resp) = Response::from_message(msg) { if let Some(resp) = Response::from_message(msg) {