feat(main): Persist & load persisted cursor positions
Adds support for persisting the cursor position in a file (by default `/var/journaldriver/cursor.pos`, overridable via the environment variable `CURSOR_POSITION_FILE`). This lets journaldriver resume from a known journal position after service restarts. This closes #3
This commit is contained in:
parent
03a6ea742c
commit
e4b4830a04
1 changed files with 66 additions and 9 deletions
75
src/main.rs
75
src/main.rs
|
@ -33,9 +33,6 @@
|
||||||
//! monitored resource descriptor)
|
//! monitored resource descriptor)
|
||||||
//! * TODO 2018-06-15: Extract timestamps from journald instead of
|
//! * TODO 2018-06-15: Extract timestamps from journald instead of
|
||||||
//! relying on ingestion timestamps.
|
//! relying on ingestion timestamps.
|
||||||
//! * TODO 2018-06-15: Persist last known cursor position after
|
|
||||||
//! flushing to allow journaldriver to resume from the same position
|
|
||||||
//! after a restart.
|
|
||||||
|
|
||||||
#[macro_use] extern crate hyper;
|
#[macro_use] extern crate hyper;
|
||||||
#[macro_use] extern crate log;
|
#[macro_use] extern crate log;
|
||||||
|
@ -49,10 +46,14 @@ extern crate systemd;
|
||||||
extern crate serde;
|
extern crate serde;
|
||||||
extern crate reqwest;
|
extern crate reqwest;
|
||||||
|
|
||||||
|
use failure::ResultExt;
|
||||||
use reqwest::{header, Client};
|
use reqwest::{header, Client};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::io::Read;
|
use std::env;
|
||||||
|
use std::fs::{self, File};
|
||||||
|
use std::io::{self, Read, ErrorKind, Write};
|
||||||
use std::mem;
|
use std::mem;
|
||||||
|
use std::path::PathBuf;
|
||||||
use std::process;
|
use std::process;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use systemd::journal::*;
|
use systemd::journal::*;
|
||||||
|
@ -68,7 +69,7 @@ const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetad
|
||||||
|
|
||||||
// Google's metadata service requires this header to be present on all
|
// Google's metadata service requires this header to be present on all
|
||||||
// calls:
|
// calls:
|
||||||
//g
|
//
|
||||||
// https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying
|
// https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying
|
||||||
header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
|
header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
|
||||||
|
|
||||||
|
@ -112,6 +113,12 @@ lazy_static! {
|
||||||
"zone": ZONE.as_str(),
|
"zone": ZONE.as_str(),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/// Path to the file in which journaldriver should persist its
|
||||||
|
/// cursor state.
|
||||||
|
static ref POSITION_FILE: PathBuf = env::var("CURSOR_POSITION_FILE")
|
||||||
|
.unwrap_or("/var/journaldriver/cursor.pos".into())
|
||||||
|
.into();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convenience helper for retrieving values from the metadata server.
|
/// Convenience helper for retrieving values from the metadata server.
|
||||||
|
@ -240,17 +247,29 @@ fn receiver_loop(mut journal: Journal) -> Result<()> {
|
||||||
|
|
||||||
if !buf.is_empty() {
|
if !buf.is_empty() {
|
||||||
let to_flush = mem::replace(&mut buf, Vec::new());
|
let to_flush = mem::replace(&mut buf, Vec::new());
|
||||||
flush(&client, &mut token, to_flush)?;
|
flush(&client, &mut token, to_flush, journal.cursor()?)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("Done outer iteration");
|
trace!("Done outer iteration");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Writes the current cursor into `/var/journaldriver/cursor.pos`.
|
||||||
|
fn persist_cursor(cursor: String) -> Result<()> {
|
||||||
|
let mut file = File::create(&*POSITION_FILE)?;
|
||||||
|
write!(file, "{}", cursor).map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
/// Flushes all drained records to Stackdriver. Any Stackdriver
|
/// Flushes all drained records to Stackdriver. Any Stackdriver
|
||||||
/// message can at most contain 1000 log entries which means they are
|
/// message can at most contain 1000 log entries which means they are
|
||||||
/// chunked up here.
|
/// chunked up here.
|
||||||
fn flush(client: &Client, token: &mut Token, entries: Vec<LogEntry>) -> Result<()> {
|
///
|
||||||
|
/// If flushing is successful the last cursor position will be
|
||||||
|
/// persisted to disk.
|
||||||
|
fn flush(client: &Client,
|
||||||
|
token: &mut Token,
|
||||||
|
entries: Vec<LogEntry>,
|
||||||
|
cursor: String) -> Result<()> {
|
||||||
if token.is_expired() {
|
if token.is_expired() {
|
||||||
debug!("Refreshing Google metadata access token");
|
debug!("Refreshing Google metadata access token");
|
||||||
let new_token = get_metadata_token()?;
|
let new_token = get_metadata_token()?;
|
||||||
|
@ -266,7 +285,7 @@ fn flush(client: &Client, token: &mut Token, entries: Vec<LogEntry>) -> Result<(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
persist_cursor(cursor)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents the response returned by the metadata server's token
|
/// Represents the response returned by the metadata server's token
|
||||||
|
@ -330,13 +349,51 @@ fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempt to read the initial cursor position from the configured
|
||||||
|
/// file. If there is no initial cursor position set, read from the
|
||||||
|
/// tail of the log.
|
||||||
|
///
|
||||||
|
/// The only "acceptable" error when reading the cursor position is
|
||||||
|
/// the cursor position file not existing, other errors are fatal
|
||||||
|
/// because they indicate a misconfiguration of journaldriver.
|
||||||
|
fn initial_cursor() -> Result<JournalSeek> {
|
||||||
|
let read_result: io::Result<String> = (|| {
|
||||||
|
let mut contents = String::new();
|
||||||
|
let mut file = File::open(&*POSITION_FILE)?;
|
||||||
|
file.read_to_string(&mut contents)?;
|
||||||
|
Ok(contents.trim().into())
|
||||||
|
})();
|
||||||
|
|
||||||
|
match read_result {
|
||||||
|
Ok(cursor) => Ok(JournalSeek::Cursor { cursor }),
|
||||||
|
Err(ref err) if err.kind() == ErrorKind::NotFound => {
|
||||||
|
info!("No previous cursor position, reading from journal tail");
|
||||||
|
Ok(JournalSeek::Tail)
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
(Err(err).context("Could not read cursor position"))?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn main () {
|
fn main () {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
|
// If the cursor file does not yet exist, the directory structure
|
||||||
|
// leading up to it should be created:
|
||||||
|
let cursor_position_dir = POSITION_FILE.parent()
|
||||||
|
.expect("Invalid cursor position file path");
|
||||||
|
|
||||||
|
fs::create_dir_all(cursor_position_dir)
|
||||||
|
.expect("Could not create directory to store cursor position in");
|
||||||
|
|
||||||
let mut journal = Journal::open(JournalFiles::All, false, true)
|
let mut journal = Journal::open(JournalFiles::All, false, true)
|
||||||
.expect("Failed to open systemd journal");
|
.expect("Failed to open systemd journal");
|
||||||
|
|
||||||
match journal.seek(JournalSeek::Tail) {
|
let seek_position = initial_cursor()
|
||||||
|
.expect("Failed to determine initial cursor position");
|
||||||
|
|
||||||
|
match journal.seek(seek_position) {
|
||||||
Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
|
Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Failed to set initial journal position: {}", err);
|
error!("Failed to set initial journal position: {}", err);
|
||||||
|
|
Loading…
Reference in a new issue