feat(main): Parse timestamps out of journald entries

Instead of relying on Stackdriver's ingestion timestamps, parse
timestamps out of journal entries and provide those to Stackdriver.

If a timestamp could not be parsed out of a log entry, the ingestion
time is used as the fallback.
This commit is contained in:
Vincent Ambo 2018-06-17 03:29:14 +02:00 committed by Vincent Ambo
parent f626d88438
commit 87ab3c806c
2 changed files with 51 additions and 9 deletions

View file

@ -40,12 +40,15 @@
#[macro_use] extern crate serde_json;
#[macro_use] extern crate lazy_static;
extern crate failure;
extern crate chrono;
extern crate env_logger;
extern crate systemd;
extern crate serde;
extern crate failure;
extern crate reqwest;
extern crate serde;
extern crate systemd;
use chrono::offset::LocalResult;
use chrono::prelude::*;
use failure::ResultExt;
use reqwest::{header, Client};
use serde_json::Value;
@ -174,12 +177,36 @@ fn message_to_payload(message: Option<String>) -> Payload {
}
}
/// Attempt to parse journald's microsecond timestamps into a UTC
/// timestamp.
///
/// Parse errors are dismissed and returned as empty options: There
/// simply aren't any useful fallback mechanisms other than defaulting
/// to ingestion time for journaldriver's use-case.
fn parse_microseconds(input: String) -> Option<DateTime<Utc>> {
if input.len() != 16 {
return None;
}
let seconds: i64 = (&input[..10]).parse().ok()?;
let micros: u32 = (&input[10..]).parse().ok()?;
match Utc.timestamp_opt(seconds, micros * 1000) {
LocalResult::Single(time) => Some(time),
_ => None,
}
}
/// This structure represents a log entry in the format expected by
/// the Stackdriver API.
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct LogEntry {
labels: Value,
#[serde(skip_serializing_if = "Option::is_none")]
timestamp: Option<DateTime<Utc>>,
#[serde(flatten)]
payload: Payload,
}
@ -203,15 +230,19 @@ impl From<JournalRecord> for LogEntry {
// present on all others.
let unit = record.remove("_SYSTEMD_UNIT");
// TODO: This timestamp (in microseconds) should be parsed
// into a DateTime<Utc> and used instead of the ingestion
// time.
// let timestamp = record
// .remove("_SOURCE_REALTIME_TIMESTAMP")
// .map();
// The source timestamp (if present) is specified in
// microseconds since epoch.
//
// If it is not present or can not be parsed, journaldriver
// will not send a timestamp for the log entry and it will
// default to the ingestion time.
let timestamp = record
.remove("_SOURCE_REALTIME_TIMESTAMP")
.and_then(parse_microseconds);
LogEntry {
payload,
timestamp,
labels: json!({
"host": hostname,
"unit": unit.unwrap_or_else(|| "syslog".into()),

View file

@ -5,6 +5,7 @@ use serde_json::to_string;
fn test_text_entry_serialization() {
let entry = LogEntry {
labels: Value::Null,
timestamp: None,
payload: Payload::TextPayload {
text_payload: "test entry".into(),
}
@ -20,6 +21,7 @@ fn test_text_entry_serialization() {
fn test_json_entry_serialization() {
let entry = LogEntry {
labels: Value::Null,
timestamp: None,
payload: Payload::TextPayload {
text_payload: "test entry".into(),
}
@ -78,3 +80,12 @@ fn test_json_no_object() {
assert_eq!(expected, payload, "Non-object JSON payload should be plain text");
}
#[test]
fn test_parse_microseconds() {
let input: String = "1529175149291187".into();
let expected: DateTime<Utc> = "2018-06-16T18:52:29.291187Z"
.to_string().parse().unwrap();
assert_eq!(Some(expected), parse_microseconds(input));
}