Implemented ping-sending on inactivity.

This commit is contained in:
Aaron Weiss 2016-01-15 12:37:16 -05:00
parent 9ac625c091
commit 3dc15449a8
2 changed files with 114 additions and 49 deletions

View file

@ -10,17 +10,17 @@ use rustc_serialize::json::{decode, encode};
/// Configuration data.
#[derive(Clone, RustcDecodable, RustcEncodable, Default, PartialEq, Debug)]
pub struct Config {
/// A list of the owners of the bot by nickname.
/// A list of the owners of the client by nickname (for bots).
pub owners: Option<Vec<String>>,
/// The bot's nickname.
/// The client's nickname.
pub nickname: Option<String>,
/// The bot's NICKSERV password.
/// The client's NICKSERV password.
pub nick_password: Option<String>,
/// Alternative nicknames for the bots, if the default is taken.
/// Alternative nicknames for the client, if the default is taken.
pub alt_nicks: Option<Vec<String>>,
/// The bot's username.
/// The client's username.
pub username: Option<String>,
/// The bot's real name.
/// The client's real name.
pub realname: Option<String>,
/// The server to connect to.
pub server: Option<String>,
@ -29,7 +29,7 @@ pub struct Config {
/// The password to connect to the server.
pub password: Option<String>,
/// Whether or not to use SSL.
/// Bots will automatically panic if this is enabled without SSL support.
/// Clients will automatically panic if this is enabled without SSL support.
pub use_ssl: Option<bool>,
/// The encoding type used for this connection.
/// This is typically UTF-8, but could be something else.
@ -40,6 +40,10 @@ pub struct Config {
pub umodes: Option<String>,
/// The text that'll be sent in response to CTCP USERINFO requests.
pub user_info: Option<String>,
/// The amount of inactivity in seconds before the client will ping the server.
pub ping_time: Option<u32>,
/// The amount of time in seconds for a client to reconnect due to no ping response.
pub ping_timeout: Option<u32>,
/// A map of additional options to be stored in config.
pub options: Option<HashMap<String, String>>,
}
@ -151,6 +155,19 @@ impl Config {
self.user_info.as_ref().map(|s| &s[..]).unwrap_or("")
}
/// Gets the amount of time in seconds since last activity necessary for the client to ping the
/// server.
/// This defaults to 180 seconds when not specified.
pub fn ping_time(&self) -> u32 {
self.ping_time.as_ref().map(|t| *t).unwrap_or(180)
}
/// Gets the amount of time in seconds for the client to reconnect after no ping response.
/// This defaults to 10 seconds when not specified.
pub fn ping_timeout(&self) -> u32 {
self.ping_timeout.as_ref().map(|t| *t).unwrap_or(10)
}
/// Looks up the specified string in the options map.
/// This uses indexing, and thus panics when the string is not present.
/// This will also panic if used and there are no options.

View file

@ -9,13 +9,13 @@ use std::io::{BufReader, BufWriter, Error, ErrorKind, Result};
use std::iter::Map;
use std::path::Path;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::{Sender, Receiver, channel};
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
use std::thread::{JoinHandle, spawn};
use client::conn::{Connection, NetStream, Reconnect};
use client::data::{Command, Config, Message, Response, User};
use client::data::Command::{JOIN, NICK, NICKSERV, PONG, MODE};
use client::data::Command::{JOIN, NICK, NICKSERV, PING, PONG, MODE};
use client::data::kinds::{IrcRead, IrcWrite};
#[cfg(feature = "ctcp")] use time::now;
use time::{Duration, Timespec, Tm, now};
pub mod utils;
@ -59,12 +59,52 @@ struct ServerState<T: IrcRead, U: IrcWrite> {
alt_nick_index: RwLock<usize>,
/// A thread-safe count of reconnection attempts used for synchronization.
reconnect_count: Mutex<u32>,
/// A thread-safe store for the time of the last action.
last_action_time: Mutex<Tm>,
/// A thread-safe store for the last ping data.
last_ping_data: Mutex<Option<Timespec>>,
}
impl<T: IrcRead, U: IrcWrite> ServerState<T, U> where Connection<T, U>: Reconnect {
fn new(conn: Connection<T, U>, config: Config) -> ServerState<T, U> {
ServerState {
conn: conn,
write_handle: Mutex::new(None),
config: config,
chanlists: Mutex::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
reconnect_count: Mutex::new(0),
last_action_time: Mutex::new(now()),
last_ping_data: Mutex::new(None),
}
}
fn reconnect(&self) -> Result<()> {
self.conn.reconnect(self.config.server(), self.config.port())
}
fn action_taken(&self) {
let mut time = self.last_action_time.lock().unwrap();
*time = now();
}
fn should_ping(&self) -> bool {
let time = self.last_action_time.lock().unwrap();
(now() - *time) > Duration::seconds(self.config.ping_time() as i64)
}
fn update_ping_data(&self, data: Timespec) {
let mut ping_data = self.last_ping_data.lock().unwrap();
*ping_data = Some(data);
}
fn ping_timeout_duration(&self) -> Duration {
Duration::seconds(self.config.ping_timeout() as i64)
}
fn last_ping_data(&self) -> Option<Timespec> {
self.last_ping_data.lock().unwrap().clone()
}
}
/// An IrcServer over a buffered NetStream.
@ -85,27 +125,7 @@ impl IrcServer<BufReader<NetStream>, BufWriter<NetStream>> {
} else {
Connection::connect(config.server(), config.port())
});
let (tx, rx): (Sender<Message>, Receiver<Message>) = channel();
let state = Arc::new(ServerState {
conn: conn,
write_handle: Mutex::new(None),
config: config,
chanlists: Mutex::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
reconnect_count: Mutex::new(0),
});
let weak = Arc::downgrade(&state);
let write_handle = spawn(move || while let Ok(msg) = rx.recv() {
if let Some(strong) = weak.upgrade() {
while let Err(_) = IrcServer::write(&strong, msg.clone()) {
let _ = strong.reconnect();
}
}
});
let state2 = state.clone();
let mut handle = state2.write_handle.lock().unwrap();
*handle = Some(write_handle);
Ok(IrcServer { tx: tx, state: state, reconnect_count: Cell::new(0) })
Ok(IrcServer::from_connection(config, conn))
}
}
@ -166,19 +186,35 @@ impl<'a, T: IrcRead, U: IrcWrite> Server<'a, T, U> for IrcServer<T, U> where Con
impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> where Connection<T, U>: Reconnect {
/// Creates an IRC server from the specified configuration, and any arbitrary Connection.
pub fn from_connection(config: Config, conn: Connection<T, U>) -> IrcServer<T, U> {
let (tx, rx) = channel();
let state = Arc::new(ServerState {
conn: conn,
write_handle: Mutex::new(None),
config: config,
chanlists: Mutex::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
reconnect_count: Mutex::new(0),
});
let (tx, rx): (Sender<Message>, Receiver<Message>) = channel();
let state = Arc::new(ServerState::new(conn, config));
let weak = Arc::downgrade(&state);
let write_handle = spawn(move || while let Ok(msg) = rx.recv() {
let write_handle = spawn(move || loop {
if let Some(strong) = weak.upgrade() {
let _ = IrcServer::write(&strong, msg);
if let Some(time) = strong.last_ping_data() {
if now().to_timespec() - time > strong.ping_timeout_duration() {
while let Err(_) = strong.reconnect() {} // Continue trying to reconnect.
}
}
}
match rx.try_recv() {
Ok(msg) => if let Some(strong) = weak.upgrade() {
while let Err(_) = IrcServer::write(&strong, msg.clone()) {
let _ = strong.reconnect();
}
strong.action_taken();
},
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => if let Some(strong) = weak.upgrade() {
if strong.should_ping() {
let data = now().to_timespec();
strong.update_ping_data(data);
let fmt = format!("{}", data.sec);
while let Err(_) = IrcServer::write(&strong, PING(fmt.clone(), None)) {
let _ = strong.reconnect();
}
}
},
}
});
let state2 = state.clone();
@ -264,6 +300,15 @@ impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> where Connection<T, U>: Reconnect
}
if &msg.command[..] == "PING" {
self.send(PONG(msg.suffix.as_ref().unwrap().to_owned(), None)).unwrap();
} else if &msg.command[..] == "PONG" {
if let Ok(data) = msg.suffix.as_ref().unwrap().parse() {
if let Some(timespec) = self.state.last_ping_data() {
if timespec.sec == data {
let mut ping_data = self.state.last_ping_data.lock().unwrap();
ping_data.take();
}
}
}
} else if cfg!(not(feature = "nochanlists")) &&
(&msg.command[..] == "JOIN" || &msg.command[..] == "PART") {
let chan = match msg.suffix {
@ -321,19 +366,21 @@ impl<T: IrcRead, U: IrcWrite> IrcServer<T, U> where Connection<T, U>: Reconnect
msg[1..end].split(" ").collect()
};
match tokens[0] {
"FINGER" => self.send_ctcp_internal(resp, &format!("FINGER :{} ({})",
self.config().real_name(),
self.config().username())),
"FINGER" => self.send_ctcp_internal(resp, &format!(
"FINGER :{} ({})", self.config().real_name(), self.config().username()
)),
"VERSION" => self.send_ctcp_internal(resp, "VERSION irc:git:Rust"),
"SOURCE" => {
self.send_ctcp_internal(resp, "SOURCE https://github.com/aatxe/irc");
self.send_ctcp_internal(resp, "SOURCE");
},
"PING" => self.send_ctcp_internal(resp, &format!("PING {}", tokens[1])),
"TIME" => self.send_ctcp_internal(resp, &format!("TIME :{}",
now().rfc822z())),
"USERINFO" => self.send_ctcp_internal(resp, &format!("USERINFO :{}",
self.config().user_info())),
"TIME" => self.send_ctcp_internal(resp, &format!(
"TIME :{}", now().rfc822z()
)),
"USERINFO" => self.send_ctcp_internal(resp, &format!(
"USERINFO :{}", self.config().user_info()
)),
_ => {}
}
},
@ -388,6 +435,7 @@ impl<'a, T: IrcRead + 'a, U: IrcWrite + 'a> Iterator for ServerIterator<'a, T, U
Ok(msg) => match msg.parse() {
Ok(res) => {
self.server.handle_message(&res);
self.server.state.action_taken();
return Some(Ok(res))
},
Err(_) => return Some(Err(Error::new(ErrorKind::InvalidInput,