diff --git a/src/main.rs b/src/main.rs index 898f9943d..f6c4ea0d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,12 +40,15 @@ #[macro_use] extern crate serde_json; #[macro_use] extern crate lazy_static; -extern crate failure; +extern crate chrono; extern crate env_logger; -extern crate systemd; -extern crate serde; +extern crate failure; extern crate reqwest; +extern crate serde; +extern crate systemd; +use chrono::offset::LocalResult; +use chrono::prelude::*; use failure::ResultExt; use reqwest::{header, Client}; use serde_json::Value; @@ -174,12 +177,36 @@ fn message_to_payload(message: Option) -> Payload { } } +/// Attempt to parse journald's microsecond timestamps into a UTC +/// timestamp. +/// +/// Parse errors are dismissed and returned as empty options: There +/// simply aren't any useful fallback mechanisms other than defaulting +/// to ingestion time for journaldriver's use-case. +fn parse_microseconds(input: String) -> Option> { + if input.len() != 16 { + return None; + } + + let seconds: i64 = (&input[..10]).parse().ok()?; + let micros: u32 = (&input[10..]).parse().ok()?; + + match Utc.timestamp_opt(seconds, micros * 1000) { + LocalResult::Single(time) => Some(time), + _ => None, + } +} + /// This structure represents a log entry in the format expected by /// the Stackdriver API. #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct LogEntry { labels: Value, + + #[serde(skip_serializing_if = "Option::is_none")] + timestamp: Option>, + #[serde(flatten)] payload: Payload, } @@ -203,15 +230,19 @@ impl From for LogEntry { // present on all others. let unit = record.remove("_SYSTEMD_UNIT"); - // TODO: This timestamp (in microseconds) should be parsed - // into a DateTime and used instead of the ingestion - // time. - // let timestamp = record - // .remove("_SOURCE_REALTIME_TIMESTAMP") - // .map(); + // The source timestamp (if present) is specified in + // microseconds since epoch. + // + // If it is not present or can not be parsed, journaldriver + // will not send a timestamp for the log entry and it will + // default to the ingestion time. + let timestamp = record + .remove("_SOURCE_REALTIME_TIMESTAMP") + .and_then(parse_microseconds); LogEntry { payload, + timestamp, labels: json!({ "host": hostname, "unit": unit.unwrap_or_else(|| "syslog".into()), diff --git a/src/tests.rs b/src/tests.rs index 623cb6b59..6840602ec 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -5,6 +5,7 @@ use serde_json::to_string; fn test_text_entry_serialization() { let entry = LogEntry { labels: Value::Null, + timestamp: None, payload: Payload::TextPayload { text_payload: "test entry".into(), } @@ -20,6 +21,7 @@ fn test_text_entry_serialization() { fn test_json_entry_serialization() { let entry = LogEntry { labels: Value::Null, + timestamp: None, payload: Payload::TextPayload { text_payload: "test entry".into(), } @@ -78,3 +80,12 @@ fn test_json_no_object() { assert_eq!(expected, payload, "Non-object JSON payload should be plain text"); } + +#[test] +fn test_parse_microseconds() { + let input: String = "1529175149291187".into(); + let expected: DateTime = "2018-06-16T18:52:29.291187Z" + .to_string().parse().unwrap(); + + assert_eq!(Some(expected), parse_microseconds(input)); +}