diff --git a/src/client/server/mod.rs b/src/client/server/mod.rs index 7753185..96b2e5d 100644 --- a/src/client/server/mod.rs +++ b/src/client/server/mod.rs @@ -10,7 +10,7 @@ use std::thread::{spawn, sleep}; use std::time::Duration as StdDuration; use client::conn::{Connection, NetConnection}; use client::data::{Command, Config, Message, Response, User}; -use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PRIVMSG, MODE}; +use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PONG, PRIVMSG, MODE}; use client::server::utils::ServerExt; use time::{Duration, Timespec, Tm, now}; @@ -57,6 +57,8 @@ struct ServerState { last_action_time: Mutex, /// A thread-safe store for the last ping data. last_ping_data: Mutex>, + /// A thread-safe check of pong reply. + waiting_pong_reply: Mutex, } impl ServerState { @@ -69,10 +71,15 @@ impl ServerState { reconnect_count: Mutex::new(0), last_action_time: Mutex::new(now()), last_ping_data: Mutex::new(None), + waiting_pong_reply: Mutex::new(false), } } fn reconnect(&self) -> Result<()> { + let mut ping_data = self.last_ping_data.lock().unwrap(); + *ping_data = None; + let mut waiting_pong_reply = self.waiting_pong_reply.lock().unwrap(); + *waiting_pong_reply = false; self.conn.reconnect() } @@ -86,19 +93,36 @@ impl ServerState { (now() - *time) > Duration::seconds(self.config.ping_time() as i64) } + fn last_action_time(&self) -> Tm { + *self.last_action_time.lock().unwrap() + } + fn update_ping_data(&self, data: Timespec) { let mut ping_data = self.last_ping_data.lock().unwrap(); *ping_data = Some(data); - } - - fn ping_timeout_duration(&self) -> Duration { - Duration::seconds(self.config.ping_timeout() as i64) + let mut waiting_pong_reply = self.waiting_pong_reply.lock().unwrap(); + *waiting_pong_reply = true; } fn last_ping_data(&self) -> Option { *self.last_ping_data.lock().unwrap() } + fn waiting_pong_reply(&self) -> bool { + *self.waiting_pong_reply.lock().unwrap() + } + + fn check_pong(&self, data: &str) { + if let Some(ref time) = self.last_ping_data() { + let fmt = format!("{}", time.sec); + if fmt == data { + // found matching pong reply + let mut waiting_reply = self.waiting_pong_reply.lock().unwrap(); + *waiting_reply = false; + } + } + } + fn send_impl(&self, msg: Message) { while let Err(_) = self.write(msg.clone()) { let _ = self.reconnect().and_then(|_| self.identify()); @@ -202,30 +226,55 @@ impl IrcServer { where C: Connection + Send + Sync + 'static { let state = Arc::new(ServerState::new(conn, config)); let ping_time = (state.config.ping_time() as i64) * 1000; + let ping_timeout = (state.config.ping_timeout() as i64) * 1000; let weak = Arc::downgrade(&state); spawn(move || while let Some(strong) = weak.upgrade() { - let sleep_dur = if let Some(time) = strong.last_ping_data() { - if now().to_timespec() - time > strong.ping_timeout_duration() { + let ping_idle_timespec = { + let last_action = strong.last_action_time().to_timespec(); + if let Some(last_ping) = strong.last_ping_data() { + if last_action < last_ping { + last_ping + } else { + last_action + } + } else { + last_action + } + }; + let now_timespec = now().to_timespec(); + let sleep_dur_ping_time = ping_time - (now_timespec - ping_idle_timespec).num_milliseconds(); + let sleep_dur_ping_timeout = if let Some(time) = strong.last_ping_data() { + ping_timeout - (now_timespec - time).num_milliseconds() + } else { + ping_timeout + }; + + if strong.waiting_pong_reply() && sleep_dur_ping_timeout < sleep_dur_ping_time { + // timeout check is earlier + let sleep_dur = sleep_dur_ping_timeout; + if sleep_dur > 0 { + sleep(StdDuration::from_millis(sleep_dur as u64)) + } + if strong.waiting_pong_reply() { let _ = strong.reconnect(); while let Err(_) = strong.identify() { let _ = strong.reconnect(); } } - ping_time - (now().to_timespec() - time).num_milliseconds() } else { - ping_time - }; - - if sleep_dur > 0 { - sleep(StdDuration::from_millis(sleep_dur as u64)) - } - if strong.should_ping() { - let data = now().to_timespec(); - strong.update_ping_data(data); - let fmt = format!("{}", data.sec); - while let Err(_) = strong.write(PING(fmt.clone(), None)) { - let _ = strong.reconnect(); + let sleep_dur = sleep_dur_ping_time; + if sleep_dur > 0 { + sleep(StdDuration::from_millis(sleep_dur as u64)) + } + let now_timespec = now().to_timespec(); + if strong.should_ping() { + let data = now_timespec; + strong.update_ping_data(data); + let fmt = format!("{}", data.sec); + while let Err(_) = strong.write(PING(fmt.clone(), None)) { + let _ = strong.reconnect(); + } } } }); @@ -269,6 +318,8 @@ impl IrcServer { fn handle_message(&self, msg: &Message) -> Result<()> { match msg.command { PING(ref data, _) => try!(self.send_pong(&data)), + PONG(_, Some(ref pingdata)) => self.state.check_pong(&pingdata), + PONG(ref pingdata, None) => self.state.check_pong(&pingdata), JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan), PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan), MODE(ref chan, ref mode, Some(ref user)) => self.handle_mode(chan, mode, user),