2018-05-28 22:24:32 +02:00
|
|
|
#[macro_use] extern crate failure;
|
|
|
|
#[macro_use] extern crate hyper;
|
2018-05-27 23:57:24 +02:00
|
|
|
#[macro_use] extern crate log;
|
2018-05-28 22:24:32 +02:00
|
|
|
#[macro_use] extern crate serde_derive;
|
2018-05-27 20:09:37 +02:00
|
|
|
|
2018-05-28 22:24:32 +02:00
|
|
|
extern crate chrono;
|
2018-05-27 23:57:24 +02:00
|
|
|
extern crate env_logger;
|
|
|
|
extern crate systemd;
|
2018-05-28 22:24:32 +02:00
|
|
|
extern crate serde;
|
|
|
|
extern crate serde_json;
|
|
|
|
extern crate reqwest;
|
|
|
|
|
|
|
|
mod stackdriver;
|
2018-05-27 20:09:37 +02:00
|
|
|
|
2018-06-05 15:24:03 +02:00
|
|
|
use std::env;
|
|
|
|
use std::mem;
|
2018-06-14 16:48:43 +02:00
|
|
|
use std::ops::Add;
|
2018-05-27 20:09:37 +02:00
|
|
|
use std::process;
|
2018-05-27 23:57:24 +02:00
|
|
|
use std::time::{Duration, Instant};
|
2018-06-05 15:24:03 +02:00
|
|
|
use systemd::journal::*;
|
2018-05-27 20:09:37 +02:00
|
|
|
|
2018-06-14 16:48:43 +02:00
|
|
|
const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
|
|
|
|
|
|
|
|
header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
|
|
|
|
|
|
|
|
type Result<T> = std::result::Result<T, failure::Error>;
|
|
|
|
|
2018-05-27 23:57:24 +02:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct Record {
|
|
|
|
message: Option<String>,
|
|
|
|
hostname: Option<String>,
|
|
|
|
unit: Option<String>,
|
|
|
|
timestamp: Option<String>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<JournalRecord> for Record {
|
|
|
|
fn from(mut record: JournalRecord) -> Record {
|
|
|
|
Record {
|
|
|
|
// The message field is technically just a convention, but
|
|
|
|
// journald seems to default to it when ingesting unit
|
|
|
|
// output.
|
|
|
|
message: record.remove("MESSAGE"),
|
|
|
|
|
|
|
|
// Presumably this is always set, but who can be sure
|
|
|
|
// about anything in this world.
|
|
|
|
hostname: record.remove("_HOSTNAME"),
|
|
|
|
|
|
|
|
// The unit is seemingly missing on kernel entries, but
|
|
|
|
// present on all others.
|
|
|
|
unit: record.remove("_SYSTEMD_UNIT"),
|
|
|
|
|
|
|
|
// This timestamp is present on most log entries
|
|
|
|
// (seemingly all that are ingested from the output
|
|
|
|
// systemd units).
|
|
|
|
timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-28 09:45:07 +02:00
|
|
|
/// This function starts a double-looped, blocking receiver. It will
|
|
|
|
/// buffer messages for half a second before flushing them to
|
|
|
|
/// Stackdriver.
|
|
|
|
fn receiver_loop(mut journal: Journal) {
|
2018-06-05 15:24:03 +02:00
|
|
|
let mut buf: Vec<Record> = Vec::new();
|
2018-05-27 23:57:24 +02:00
|
|
|
let iteration = Duration::from_millis(500);
|
|
|
|
|
|
|
|
loop {
|
|
|
|
trace!("Beginning outer iteration");
|
|
|
|
let now = Instant::now();
|
|
|
|
|
|
|
|
loop {
|
|
|
|
if now.elapsed() > iteration {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2018-05-28 09:45:07 +02:00
|
|
|
if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) {
|
2018-06-05 15:24:03 +02:00
|
|
|
trace!("Received a new record");
|
|
|
|
buf.push(record.into());
|
2018-05-27 23:57:24 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !buf.is_empty() {
|
2018-06-05 15:24:03 +02:00
|
|
|
let to_flush = mem::replace(&mut buf, Vec::new());
|
|
|
|
flush(to_flush);
|
2018-05-27 23:57:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
trace!("Done outer iteration");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-05 15:24:03 +02:00
|
|
|
/// Flushes all drained records to Stackdriver. Any Stackdriver
|
|
|
|
/// message can at most contain 1000 log entries which means they are
|
|
|
|
/// chunked up here.
|
|
|
|
fn flush(records: Vec<Record>) {
|
|
|
|
for chunk in records.chunks(1000) {
|
|
|
|
debug!("Flushed {} records", chunk.len())
|
|
|
|
}
|
2018-05-27 23:57:24 +02:00
|
|
|
}
|
|
|
|
|
2018-06-14 16:48:43 +02:00
|
|
|
/// Retrieves an access token from the GCP metadata service.
|
|
|
|
#[derive(Deserialize)]
|
|
|
|
struct TokenResponse {
|
|
|
|
#[serde(rename = "type")]
|
|
|
|
expires_in: u64,
|
|
|
|
access_token: String,
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Struct used to store a token together with a sensible
|
|
|
|
/// representation of when it expires.
|
|
|
|
struct Token {
|
|
|
|
token: String,
|
|
|
|
renew_at: Instant,
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_metadata_token(client: &reqwest::Client) -> Result<Token> {
|
|
|
|
let now = Instant::now();
|
|
|
|
|
|
|
|
let token: TokenResponse = client.get(METADATA_TOKEN_URL)
|
|
|
|
.header(MetadataFlavor("Google".into()))
|
|
|
|
.send()?.json()?;
|
|
|
|
|
|
|
|
debug!("Fetched new token from metadata service");
|
|
|
|
|
|
|
|
let renew_at = now.add(Duration::from_secs(token.expires_in / 2));
|
|
|
|
|
|
|
|
Ok(Token {
|
|
|
|
renew_at,
|
|
|
|
token: token.access_token,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-05-27 23:57:24 +02:00
|
|
|
fn main () {
|
|
|
|
env_logger::init();
|
|
|
|
|
|
|
|
let mut journal = Journal::open(JournalFiles::All, false, true)
|
|
|
|
.expect("Failed to open systemd journal");
|
|
|
|
|
|
|
|
match journal.seek(JournalSeek::Tail) {
|
|
|
|
Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
|
|
|
|
Err(err) => {
|
|
|
|
error!("Failed to set initial journal position: {}", err);
|
|
|
|
process::exit(1)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-28 09:45:07 +02:00
|
|
|
receiver_loop(journal)
|
2018-05-27 20:09:37 +02:00
|
|
|
}
|