feat(main): Implement parsing of JSON payloads
Stackdriver supports structured JSON payloads in addition to simple plain-text payloads. This commit introduces a new feature in which journaldriver will attempt to parse incoming log messages into JSON vaues and forward them as structured payloads if they are JSON objects. Messages that can not be parsed into JSON objects will continue to be forwarded as plain text messages.
This commit is contained in:
parent
71f0afe4b5
commit
10f23a9dfb
2 changed files with 130 additions and 3 deletions
53
src/main.rs
53
src/main.rs
|
@ -57,6 +57,9 @@ use std::process;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use systemd::journal::*;
|
use systemd::journal::*;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
|
const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
|
||||||
const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
|
const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
|
||||||
const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
|
const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
|
||||||
|
@ -121,13 +124,57 @@ fn get_metadata(url: &str) -> Result<String> {
|
||||||
Ok(output.trim().into())
|
Ok(output.trim().into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// This structure represents the different types of payloads
|
||||||
|
/// supported by journaldriver.
|
||||||
|
///
|
||||||
|
/// Currently log entries can either contain plain text messages or
|
||||||
|
/// structured payloads in JSON-format.
|
||||||
|
#[derive(Debug, Serialize, PartialEq)]
|
||||||
|
#[serde(untagged)]
|
||||||
|
enum Payload {
|
||||||
|
TextPayload {
|
||||||
|
#[serde(rename = "textPayload")]
|
||||||
|
text_payload: String,
|
||||||
|
},
|
||||||
|
JsonPayload {
|
||||||
|
#[serde(rename = "jsonPaylaod")]
|
||||||
|
json_payload: Value,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to parse a log message as JSON and return it as a
|
||||||
|
/// structured payload. If parsing fails, return the entry in plain
|
||||||
|
/// text format.
|
||||||
|
fn message_to_payload(message: Option<String>) -> Payload {
|
||||||
|
match message {
|
||||||
|
None => Payload::TextPayload { text_payload: "empty log entry".into() },
|
||||||
|
Some(text_payload) => {
|
||||||
|
// Attempt to deserialize the text payload as a generic
|
||||||
|
// JSON value.
|
||||||
|
if let Ok(json_payload) = serde_json::from_str::<Value>(&text_payload) {
|
||||||
|
// If JSON-parsing succeeded on the payload, check
|
||||||
|
// whether we parsed an object (Stackdriver does not
|
||||||
|
// expect other types of JSON payload) and return it
|
||||||
|
// in that case.
|
||||||
|
if json_payload.is_object() {
|
||||||
|
return Payload::JsonPayload { json_payload }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Payload::TextPayload { text_payload }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// This structure represents a log entry in the format expected by
|
/// This structure represents a log entry in the format expected by
|
||||||
/// the Stackdriver API.
|
/// the Stackdriver API.
|
||||||
#[derive(Debug, Serialize)]
|
#[derive(Debug, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct LogEntry {
|
struct LogEntry {
|
||||||
labels: Value,
|
labels: Value,
|
||||||
text_payload: String, // TODO: attempt to parse jsonPayloads
|
#[serde(flatten)]
|
||||||
|
payload: Payload,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<JournalRecord> for LogEntry {
|
impl From<JournalRecord> for LogEntry {
|
||||||
|
@ -139,7 +186,7 @@ impl From<JournalRecord> for LogEntry {
|
||||||
// The message field is technically just a convention, but
|
// The message field is technically just a convention, but
|
||||||
// journald seems to default to it when ingesting unit
|
// journald seems to default to it when ingesting unit
|
||||||
// output.
|
// output.
|
||||||
let message = record.remove("MESSAGE");
|
let payload = message_to_payload(record.remove("MESSAGE"));
|
||||||
|
|
||||||
// Presumably this is always set, but who can be sure
|
// Presumably this is always set, but who can be sure
|
||||||
// about anything in this world.
|
// about anything in this world.
|
||||||
|
@ -157,7 +204,7 @@ impl From<JournalRecord> for LogEntry {
|
||||||
// .map();
|
// .map();
|
||||||
|
|
||||||
LogEntry {
|
LogEntry {
|
||||||
text_payload: message.unwrap_or_else(|| "empty log entry".into()),
|
payload,
|
||||||
labels: json!({
|
labels: json!({
|
||||||
"host": hostname,
|
"host": hostname,
|
||||||
"unit": unit.unwrap_or_else(|| "syslog".into()),
|
"unit": unit.unwrap_or_else(|| "syslog".into()),
|
||||||
|
|
80
src/tests.rs
Normal file
80
src/tests.rs
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
use super::*;
|
||||||
|
use serde_json::to_string;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_text_entry_serialization() {
|
||||||
|
let entry = LogEntry {
|
||||||
|
labels: Value::Null,
|
||||||
|
payload: Payload::TextPayload {
|
||||||
|
text_payload: "test entry".into(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let expected = "{\"labels\":null,\"textPayload\":\"test entry\"}";
|
||||||
|
let result = to_string(&entry).expect("serialization failed");
|
||||||
|
|
||||||
|
assert_eq!(expected, result, "Plain text payload should serialize correctly")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_json_entry_serialization() {
|
||||||
|
let entry = LogEntry {
|
||||||
|
labels: Value::Null,
|
||||||
|
payload: Payload::TextPayload {
|
||||||
|
text_payload: "test entry".into(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let expected = "{\"labels\":null,\"textPayload\":\"test entry\"}";
|
||||||
|
let result = to_string(&entry).expect("serialization failed");
|
||||||
|
|
||||||
|
assert_eq!(expected, result, "Plain text payload should serialize correctly")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_plain_text_payload() {
|
||||||
|
let message = "plain text payload".into();
|
||||||
|
let payload = message_to_payload(Some(message));
|
||||||
|
let expected = Payload::TextPayload {
|
||||||
|
text_payload: "plain text payload".into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(expected, payload, "Plain text payload should be detected correctly");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_empty_payload() {
|
||||||
|
let payload = message_to_payload(None);
|
||||||
|
let expected = Payload::TextPayload {
|
||||||
|
text_payload: "empty log entry".into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(expected, payload, "Empty payload should be handled correctly");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_json_payload() {
|
||||||
|
let message = "{\"someKey\":\"someValue\", \"otherKey\": 42}".into();
|
||||||
|
let payload = message_to_payload(Some(message));
|
||||||
|
let expected = Payload::JsonPayload {
|
||||||
|
json_payload: json!({
|
||||||
|
"someKey": "someValue",
|
||||||
|
"otherKey": 42
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(expected, payload, "JSON payload should be detected correctly");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_json_no_object() {
|
||||||
|
// This message can be parsed as valid JSON, but it is not an
|
||||||
|
// object - it should be returned as a plain-text payload.
|
||||||
|
let message = "42".into();
|
||||||
|
let payload = message_to_payload(Some(message));
|
||||||
|
let expected = Payload::TextPayload {
|
||||||
|
text_payload: "42".into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(expected, payload, "Non-object JSON payload should be plain text");
|
||||||
|
}
|
Loading…
Reference in a new issue