Changed ping timeout logic in ping thread

Previously, ping thread might enter infinite reconnect loop.
Consider the following scenario.

1. If at least one ping is sent, last_ping_data is updated.
2. Then there are many activities, so should_ping() becomes false.
3. After some seconds, now().to_timespec() - time > strong.ping_timeout_duration() is satisfied.
4. reconnect()
5. But should_ping() is still false.
6. The condition is still satisfied, so reconnect() again and again.

I made several changes.

Followings are the changes in the code
- Handle PONG message from server.
- Add `waiting_pong_reply' flag in the state. The flag is set if ping is
sent but the corresponding pong did not arrive yet.
- Ping thread checks ping timeout in correct way.
- Sleeping duration for ping is now based on idle time.
- Initialize ping related fields when `reconnect()' is called.

Hopefully, this commit may be related to issue #50.
This commit is contained in:
ChanMin Kim 2016-07-05 06:36:43 +09:00
parent 7032a550c9
commit bba8648252

View file

@ -10,7 +10,7 @@ use std::thread::{spawn, sleep};
use std::time::Duration as StdDuration; use std::time::Duration as StdDuration;
use client::conn::{Connection, NetConnection}; use client::conn::{Connection, NetConnection};
use client::data::{Command, Config, Message, Response, User}; use client::data::{Command, Config, Message, Response, User};
use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PRIVMSG, MODE}; use client::data::Command::{JOIN, NICK, NICKSERV, PART, PING, PONG, PRIVMSG, MODE};
use client::server::utils::ServerExt; use client::server::utils::ServerExt;
use time::{Duration, Timespec, Tm, now}; use time::{Duration, Timespec, Tm, now};
@ -57,6 +57,8 @@ struct ServerState {
last_action_time: Mutex<Tm>, last_action_time: Mutex<Tm>,
/// A thread-safe store for the last ping data. /// A thread-safe store for the last ping data.
last_ping_data: Mutex<Option<Timespec>>, last_ping_data: Mutex<Option<Timespec>>,
/// A thread-safe check of pong reply.
waiting_pong_reply: Mutex<bool>,
} }
impl ServerState { impl ServerState {
@ -69,10 +71,15 @@ impl ServerState {
reconnect_count: Mutex::new(0), reconnect_count: Mutex::new(0),
last_action_time: Mutex::new(now()), last_action_time: Mutex::new(now()),
last_ping_data: Mutex::new(None), last_ping_data: Mutex::new(None),
waiting_pong_reply: Mutex::new(false),
} }
} }
fn reconnect(&self) -> Result<()> { fn reconnect(&self) -> Result<()> {
let mut ping_data = self.last_ping_data.lock().unwrap();
*ping_data = None;
let mut waiting_pong_reply = self.waiting_pong_reply.lock().unwrap();
*waiting_pong_reply = false;
self.conn.reconnect() self.conn.reconnect()
} }
@ -86,19 +93,36 @@ impl ServerState {
(now() - *time) > Duration::seconds(self.config.ping_time() as i64) (now() - *time) > Duration::seconds(self.config.ping_time() as i64)
} }
fn last_action_time(&self) -> Tm {
*self.last_action_time.lock().unwrap()
}
fn update_ping_data(&self, data: Timespec) { fn update_ping_data(&self, data: Timespec) {
let mut ping_data = self.last_ping_data.lock().unwrap(); let mut ping_data = self.last_ping_data.lock().unwrap();
*ping_data = Some(data); *ping_data = Some(data);
} let mut waiting_pong_reply = self.waiting_pong_reply.lock().unwrap();
*waiting_pong_reply = true;
fn ping_timeout_duration(&self) -> Duration {
Duration::seconds(self.config.ping_timeout() as i64)
} }
fn last_ping_data(&self) -> Option<Timespec> { fn last_ping_data(&self) -> Option<Timespec> {
*self.last_ping_data.lock().unwrap() *self.last_ping_data.lock().unwrap()
} }
fn waiting_pong_reply(&self) -> bool {
*self.waiting_pong_reply.lock().unwrap()
}
fn check_pong(&self, data: &str) {
if let Some(ref time) = self.last_ping_data() {
let fmt = format!("{}", time.sec);
if fmt == data {
// found matching pong reply
let mut waiting_reply = self.waiting_pong_reply.lock().unwrap();
*waiting_reply = false;
}
}
}
fn send_impl(&self, msg: Message) { fn send_impl(&self, msg: Message) {
while let Err(_) = self.write(msg.clone()) { while let Err(_) = self.write(msg.clone()) {
let _ = self.reconnect().and_then(|_| self.identify()); let _ = self.reconnect().and_then(|_| self.identify());
@ -202,30 +226,55 @@ impl IrcServer {
where C: Connection + Send + Sync + 'static { where C: Connection + Send + Sync + 'static {
let state = Arc::new(ServerState::new(conn, config)); let state = Arc::new(ServerState::new(conn, config));
let ping_time = (state.config.ping_time() as i64) * 1000; let ping_time = (state.config.ping_time() as i64) * 1000;
let ping_timeout = (state.config.ping_timeout() as i64) * 1000;
let weak = Arc::downgrade(&state); let weak = Arc::downgrade(&state);
spawn(move || while let Some(strong) = weak.upgrade() { spawn(move || while let Some(strong) = weak.upgrade() {
let sleep_dur = if let Some(time) = strong.last_ping_data() { let ping_idle_timespec = {
if now().to_timespec() - time > strong.ping_timeout_duration() { let last_action = strong.last_action_time().to_timespec();
if let Some(last_ping) = strong.last_ping_data() {
if last_action < last_ping {
last_ping
} else {
last_action
}
} else {
last_action
}
};
let now_timespec = now().to_timespec();
let sleep_dur_ping_time = ping_time - (now_timespec - ping_idle_timespec).num_milliseconds();
let sleep_dur_ping_timeout = if let Some(time) = strong.last_ping_data() {
ping_timeout - (now_timespec - time).num_milliseconds()
} else {
ping_timeout
};
if strong.waiting_pong_reply() && sleep_dur_ping_timeout < sleep_dur_ping_time {
// timeout check is earlier
let sleep_dur = sleep_dur_ping_timeout;
if sleep_dur > 0 {
sleep(StdDuration::from_millis(sleep_dur as u64))
}
if strong.waiting_pong_reply() {
let _ = strong.reconnect(); let _ = strong.reconnect();
while let Err(_) = strong.identify() { while let Err(_) = strong.identify() {
let _ = strong.reconnect(); let _ = strong.reconnect();
} }
} }
ping_time - (now().to_timespec() - time).num_milliseconds()
} else { } else {
ping_time let sleep_dur = sleep_dur_ping_time;
}; if sleep_dur > 0 {
sleep(StdDuration::from_millis(sleep_dur as u64))
if sleep_dur > 0 { }
sleep(StdDuration::from_millis(sleep_dur as u64)) let now_timespec = now().to_timespec();
} if strong.should_ping() {
if strong.should_ping() { let data = now_timespec;
let data = now().to_timespec(); strong.update_ping_data(data);
strong.update_ping_data(data); let fmt = format!("{}", data.sec);
let fmt = format!("{}", data.sec); while let Err(_) = strong.write(PING(fmt.clone(), None)) {
while let Err(_) = strong.write(PING(fmt.clone(), None)) { let _ = strong.reconnect();
let _ = strong.reconnect(); }
} }
} }
}); });
@ -269,6 +318,8 @@ impl IrcServer {
fn handle_message(&self, msg: &Message) -> Result<()> { fn handle_message(&self, msg: &Message) -> Result<()> {
match msg.command { match msg.command {
PING(ref data, _) => try!(self.send_pong(&data)), PING(ref data, _) => try!(self.send_pong(&data)),
PONG(_, Some(ref pingdata)) => self.state.check_pong(&pingdata),
PONG(ref pingdata, None) => self.state.check_pong(&pingdata),
JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan), JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan),
PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan), PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan),
MODE(ref chan, ref mode, Some(ref user)) => self.handle_mode(chan, mode, user), MODE(ref chan, ref mode, Some(ref user)) => self.handle_mode(chan, mode, user),