feat(main): Implement record conversion & flushing to API

This implements the "meat" of the initial version of journaldriver.

Records from journald are converted into the representation required
by Stackdriver and forwarded to the API.

In this initial version journaldriver is only supported on instances
running on GCP.
This commit is contained in:
Vincent Ambo 2018-06-15 16:45:17 +02:00
parent 2d8e057118
commit 4ef98fc2ba
4 changed files with 192 additions and 172 deletions

29
Cargo.lock generated
View file

@ -98,17 +98,6 @@ name = "cfg-if"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "chrono"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
"num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.62 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "core-foundation"
version = "0.2.3"
@ -339,10 +328,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "journaldriver"
version = "0.1.0"
dependencies = [
"chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"pkg-config 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)",
@ -525,19 +514,6 @@ name = "nodrop"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "num-integer"
version = "0.1.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "num-traits"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "num_cpus"
version = "1.8.0"
@ -1269,7 +1245,6 @@ dependencies = [
"checksum bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7dd32989a66957d3f0cba6588f15d4281a733f4e9ffc43fcd2385f57d3bf99ff"
"checksum cc 1.0.15 (registry+https://github.com/rust-lang/crates.io-index)" = "0ebb87d1116151416c0cf66a0e3fb6430cccd120fd6300794b4dfaa050ac40ba"
"checksum cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "405216fd8fe65f718daa7102ea808a946b6ce40c742998fbfd3463645552de18"
"checksum chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1cce36c92cb605414e9b824f866f5babe0a0368e39ea07393b9b63cf3844c0e6"
"checksum core-foundation 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "25bfd746d203017f7d5cbd31ee5d8e17f94b6521c7af77ece6c9e4b2d4b16c67"
"checksum core-foundation-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "065a5d7ffdcbc8fa145d6f0746f3555025b9097a9e9cda59f7467abae670c78d"
"checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
@ -1316,8 +1291,6 @@ dependencies = [
"checksum native-tls 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f74dbadc8b43df7864539cedb7bc91345e532fdd913cfdc23ad94f4d2d40fbc0"
"checksum net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)" = "9044faf1413a1057267be51b5afba8eb1090bd2231c693664aa1db716fe1eae0"
"checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2"
"checksum num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "6ac0ea58d64a89d9d6b7688031b3be9358d6c919badcf7fbb0527ccfd891ee45"
"checksum num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "775393e285254d2f5004596d69bb8bc1149754570dcc08cf30cabeba67955e28"
"checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30"
"checksum openssl 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)" = "a3605c298474a3aa69de92d21139fb5e2a81688d308262359d85cdd0d12a7985"
"checksum openssl-sys 0.9.31 (registry+https://github.com/rust-lang/crates.io-index)" = "a4d6a27d108b29befe1822d40e2e22f85518dac59acbf7f30fdc532f48fd0a77"

View file

@ -13,8 +13,8 @@ serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
reqwest = "0.8"
chrono = { version = "0.4", features = ["serde"] }
hyper = "*" # whatever reqwest drags in
hyper = "0.11" # whatever reqwest drags in
lazy_static = "1.0"
[build-dependencies]
pkg-config = "0.3"

View file

@ -1,58 +1,152 @@
#[macro_use] extern crate failure;
//! This file implements journaldriver, a small application that
//! forwards logs from journald (systemd's log facility) to
//! Stackdriver Logging.
//!
//! Log entries are read continously from journald and are forwarded
//! to Stackdriver in batches.
//!
//! Stackdriver Logging has a concept of monitored resources. In the
//! simplest (and currently only supported) case this monitored
//! resource will be the GCE instance on which journaldriver is
//! running.
//!
//! Information about the instance, the project and required security
//! credentials are retrieved from Google's metadata instance on GCP.
//!
//! Things left to do:
//! * TODO 2018-06-15: Support non-GCP instances (see comment on
//! monitored resource descriptor)
//! * TODO 2018-06-15: Extract timestamps from journald instead of
//! relying on ingestion timestamps.
//! * TODO 2018-06-15: Persist last known cursor position after
//! flushing to allow journaldriver to resume from the same position
//! after a restart.
#[macro_use] extern crate hyper;
#[macro_use] extern crate log;
#[macro_use] extern crate serde_derive;
#[macro_use] extern crate serde_json;
#[macro_use] extern crate lazy_static;
extern crate chrono;
extern crate failure;
extern crate env_logger;
extern crate systemd;
extern crate serde;
extern crate serde_json;
extern crate reqwest;
mod stackdriver;
use std::env;
use reqwest::{header, Client};
use serde_json::Value;
use std::io::Read;
use std::mem;
use std::ops::Add;
use std::process;
use std::time::{Duration, Instant};
use systemd::journal::*;
const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id";
// Google's metadata service requires this header to be present on all
// calls:
//g
// https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying
header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
/// Convenience type alias for results using failure's `Error` type.
type Result<T> = std::result::Result<T, failure::Error>;
#[derive(Debug)]
struct Record {
message: Option<String>,
hostname: Option<String>,
unit: Option<String>,
timestamp: Option<String>,
lazy_static! {
/// HTTP client instance preconfigured with the metadata header
/// required by Google.
static ref METADATA_CLIENT: Client = {
let mut headers = header::Headers::new();
headers.set(MetadataFlavor("Google".into()));
Client::builder().default_headers(headers)
.build().expect("Could not create metadata client")
};
/// ID of the GCP project in which this instance is running.
static ref PROJECT_ID: String = get_metadata(METADATA_PROJECT_URL)
.expect("Could not determine project ID");
/// ID of the current GCP instance.
static ref INSTANCE_ID: String = get_metadata(METADATA_ID_URL)
.expect("Could not determine instance ID");
/// GCP zone in which this instance is running.
static ref ZONE: String = get_metadata(METADATA_ZONE_URL)
.expect("Could not determine instance zone");
/// Descriptor of the currently monitored instance.
///
/// For GCE instances, this will be the GCE instance ID. For
/// non-GCE machines a sensible solution may be using the machine
/// hostname as a Cloud Logging log name, but this is not yet
/// implemented.
static ref MONITORED_RESOURCE: Value = json!({
"type": "gce_instance",
"labels": {
"project_id": PROJECT_ID.as_str(),
"instance_id": INSTANCE_ID.as_str(),
"zone": ZONE.as_str(),
}
});
}
impl From<JournalRecord> for Record {
fn from(mut record: JournalRecord) -> Record {
Record {
// The message field is technically just a convention, but
// journald seems to default to it when ingesting unit
// output.
message: record.remove("MESSAGE"),
/// Convenience helper for retrieving values from the metadata server.
fn get_metadata(url: &str) -> Result<String> {
let mut output = String::new();
METADATA_CLIENT.get(url).send()?
.error_for_status()?
.read_to_string(&mut output)?;
// Presumably this is always set, but who can be sure
// about anything in this world.
hostname: record.remove("_HOSTNAME"),
Ok(output.trim().into())
}
// The unit is seemingly missing on kernel entries, but
// present on all others.
unit: record.remove("_SYSTEMD_UNIT"),
/// This structure represents a log entry in the format expected by
/// the Stackdriver API.
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct LogEntry {
labels: Value,
text_payload: String, // TODO: attempt to parse jsonPayloads
}
// This timestamp is present on most log entries
// (seemingly all that are ingested from the output
// systemd units).
timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"),
impl From<JournalRecord> for LogEntry {
// Converts from the fields contained in a journald record to the
// representation required by Stackdriver Logging.
//
// The fields are documented in systemd.journal-fields(7).
fn from(mut record: JournalRecord) -> LogEntry {
// The message field is technically just a convention, but
// journald seems to default to it when ingesting unit
// output.
let message = record.remove("MESSAGE");
// Presumably this is always set, but who can be sure
// about anything in this world.
let hostname = record.remove("_HOSTNAME");
// The unit is seemingly missing on kernel entries, but
// present on all others.
let unit = record.remove("_SYSTEMD_UNIT");
// TODO: This timestamp (in microseconds) should be parsed
// into a DateTime<Utc> and used instead of the ingestion
// time.
// let timestamp = record
// .remove("_SOURCE_REALTIME_TIMESTAMP")
// .map();
LogEntry {
text_payload: message.unwrap_or_else(|| "empty log entry".into()),
labels: json!({
"host": hostname,
"unit": unit.unwrap_or_else(|| "syslog".into()),
}),
}
}
}
@ -60,8 +154,11 @@ impl From<JournalRecord> for Record {
/// This function starts a double-looped, blocking receiver. It will
/// buffer messages for half a second before flushing them to
/// Stackdriver.
fn receiver_loop(mut journal: Journal) {
let mut buf: Vec<Record> = Vec::new();
fn receiver_loop(mut journal: Journal) -> Result<()> {
let mut token = get_metadata_token()?;
let client = reqwest::Client::new();
let mut buf: Vec<LogEntry> = Vec::new();
let iteration = Duration::from_millis(500);
loop {
@ -73,15 +170,15 @@ fn receiver_loop(mut journal: Journal) {
break;
}
if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) {
trace!("Received a new record");
buf.push(record.into());
if let Ok(Some(entry)) = journal.await_next_record(Some(iteration)) {
trace!("Received a new entry");
buf.push(entry.into());
}
}
if !buf.is_empty() {
let to_flush = mem::replace(&mut buf, Vec::new());
flush(to_flush);
flush(&client, &mut token, to_flush)?;
}
trace!("Done outer iteration");
@ -91,16 +188,29 @@ fn receiver_loop(mut journal: Journal) {
/// Flushes all drained records to Stackdriver. Any Stackdriver
/// message can at most contain 1000 log entries which means they are
/// chunked up here.
fn flush(records: Vec<Record>) {
for chunk in records.chunks(1000) {
debug!("Flushed {} records", chunk.len())
fn flush(client: &Client, token: &mut Token, entries: Vec<LogEntry>) -> Result<()> {
if token.is_expired() {
debug!("Refreshing Google metadata access token");
let new_token = get_metadata_token()?;
mem::replace(token, new_token);
}
for chunk in entries.chunks(1000) {
let request = prepare_request(chunk);
if let Err(write_error) = write_entries(client, token, request) {
error!("Failed to write {} entries: {}", chunk.len(), write_error)
} else {
debug!("Wrote {} entries to Stackdriver", chunk.len())
}
}
Ok(())
}
/// Retrieves an access token from the GCP metadata service.
/// Represents the response returned by the metadata server's token
/// endpoint. The token is normally valid for an hour.
#[derive(Deserialize)]
struct TokenResponse {
#[serde(rename = "type")]
expires_in: u64,
access_token: String,
}
@ -109,26 +219,55 @@ struct TokenResponse {
/// representation of when it expires.
struct Token {
token: String,
renew_at: Instant,
fetched_at: Instant,
expires: Duration,
}
fn get_metadata_token(client: &reqwest::Client) -> Result<Token> {
let now = Instant::now();
impl Token {
/// Does this token need to be renewed?
fn is_expired(&self) -> bool {
self.fetched_at.elapsed() > self.expires
}
}
let token: TokenResponse = client.get(METADATA_TOKEN_URL)
.header(MetadataFlavor("Google".into()))
fn get_metadata_token() -> Result<Token> {
let token: TokenResponse = METADATA_CLIENT
.get(METADATA_TOKEN_URL)
.send()?.json()?;
debug!("Fetched new token from metadata service");
let renew_at = now.add(Duration::from_secs(token.expires_in / 2));
Ok(Token {
renew_at,
fetched_at: Instant::now(),
expires: Duration::from_secs(token.expires_in / 2),
token: token.access_token,
})
}
/// Convert a slice of log entries into the format expected by
/// Stackdriver. This format is documented here:
///
/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write
fn prepare_request(entries: &[LogEntry]) -> Value {
json!({
"logName": format!("projects/{}/logs/journaldriver", PROJECT_ID.as_str()),
"resource": &*MONITORED_RESOURCE,
"entries": entries,
"partialSuccess": true
})
}
/// Perform the log entry insertion in Stackdriver Logging.
fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> {
client.post(ENTRIES_WRITE_URL)
.header(header::Authorization(format!("Bearer {}", token.token)))
.json(&request)
.send()?
.error_for_status()?;
Ok(())
}
fn main () {
env_logger::init();
@ -143,5 +282,5 @@ fn main () {
}
}
receiver_loop(journal)
receiver_loop(journal).expect("log receiver encountered an unexpected error");
}

View file

@ -1,92 +0,0 @@
//! This module defines types and functions for submitting log entries
//! to the Stackdriver Logging API.
//!
//! Initially this will use the HTTP & JSON API instead of gRPC.
//!
//! Documentation for the relevant endpoint is available at:
//! https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write
use chrono::{DateTime, Utc};
use failure::{Error, ResultExt};
use std::collections::HashMap;
use reqwest::Client;
const WRITE_ENTRY_URL: &str = "https://logging.googleapis.com/v2/entries:write";
const TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
/// Represents an OAuth 2 access token as returned by Google's APIs.
#[derive(Deserialize)]
struct Token {
access_token: String,
expires_in: usize,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct MonitoredResource {
/// The monitored resource type. This field must match the type
/// field of a MonitoredResourceDescriptor object.
#[serde(rename = "type")]
resource_type: String,
/// Values for all of the labels listed in the associated
/// monitored resource descriptor.
labels: HashMap<String, String>,
}
/// This type represents a single Stackdriver log entry.
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct LogEntry<'a> {
/// The resource name of the log to which this log entry belongs.
log_name: String,
/// The primary monitored resource associated with this log entry.
resource: &'a MonitoredResource,
/// The time the event described by the log entry occurred.
timestamp: DateTime<Utc>,
/// A set of user-defined (key, value) data that provides
/// additional information about the log entry.
labels: HashMap<String, String>,
}
/// This type represents the request sent to the Stackdriver API to
/// insert a batch of log records.
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct WriteEntriesRequest<'a> {
/// The log entries to send to Stackdriver Logging.
entries: Vec<LogEntry<'a>>,
/// Whether valid entries should be written even if some other
/// entries fail due to `INVALID_ARGUMENT` or `PERMISSION_DENIED`
/// errors.
partial_success: bool,
/// Default labels that are added to the labels field of all log
/// entries in entries. If a log entry already has a label with
/// the same key as a label in this parameter, then the log
/// entry's label is not changed.
labels: HashMap<String, String>,
/// The log entry payload, represented as a Unicode string.
text_payload: String,
}
// Define the metadata header required by Google's service:
header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
/// This function is used to fetch a new authentication token from
/// Google. Currently only tokens retrieved via instance metadata are
/// supported.
fn fetch_token() -> Result<Token, Error> {
let token: Token = Client::new()
.get(TOKEN_URL)
.header(MetadataFlavor("Google".to_string()))
.send().context("Requesting token failed")?
.json().context("Deserializing token failed")?;
Ok(token)
}