tvl-depot/ops/journaldriver/src/main.rs
Vincent Ambo ede837b687 refactor(journaldriver): Use anyhow instead of failure
Apparently failure is not hip anymore, and crate updates are forcing
the use of anyhow now. Whatever.

The functionality basically stays the same, maybe error messages will
look a little bit different now.

Change-Id: I173d644688785339c16161ddeec47a534123710f
Reviewed-on: https://cl.tvl.fyi/c/depot/+/5307
Tested-by: BuildkiteCI
Reviewed-by: sterni <sternenseemann@systemli.org>
Autosubmit: tazjin <tazjin@tvl.su>
2022-02-18 11:52:24 +00:00

663 lines
22 KiB
Rust

// Copyright (C) 2018 Vincent Ambo <mail@tazj.in>
//
// journaldriver is free software: you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//! This file implements journaldriver, a small application that
//! forwards logs from journald (systemd's log facility) to
//! Stackdriver Logging.
//!
//! Log entries are read continously from journald and are forwarded
//! to Stackdriver in batches.
//!
//! Stackdriver Logging has a concept of monitored resources. In the
//! simplest case this monitored resource will be the GCE instance on
//! which journaldriver is running.
//!
//! Information about the instance, the project and required security
//! credentials are retrieved from Google's metadata instance on GCP.
//!
//! To run journaldriver on non-GCP machines, users must specify the
//! `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_CLOUD_PROJECT` and
//! `LOG_NAME` environment variables.
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate serde_json;
#[macro_use]
extern crate lazy_static;
extern crate chrono;
extern crate env_logger;
extern crate medallion;
extern crate serde;
extern crate systemd;
extern crate ureq;
use anyhow::{Context, Result};
use chrono::offset::LocalResult;
use chrono::prelude::{DateTime, TimeZone, Utc};
use serde_json::{from_str, Value};
use std::fs::{self, rename, File};
use std::io::{self, ErrorKind, Read, Write};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use std::{env, mem, process};
use systemd::journal::{Journal, JournalFiles, JournalRecord, JournalSeek};
#[cfg(test)]
mod tests;
const LOGGING_SERVICE: &str = "https://logging.googleapis.com/google.logging.v2.LoggingServiceV2";
const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
const METADATA_TOKEN_URL: &str =
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
const METADATA_PROJECT_URL: &str =
"http://metadata.google.internal/computeMetadata/v1/project/project-id";
/// Representation of static service account credentials for GCP.
#[derive(Debug, Deserialize)]
struct Credentials {
/// PEM encoded private key
private_key: String,
/// `kid` of this private key
private_key_id: String,
/// "email" address of the service account
client_email: String,
}
lazy_static! {
/// ID of the GCP project to which to send logs.
static ref PROJECT_ID: String = get_project_id();
/// Name of the log to write to (this should only be manually
/// configured if not running on GCP):
static ref LOG_NAME: String = env::var("LOG_NAME")
.unwrap_or("journaldriver".into());
/// Service account credentials (if configured)
static ref SERVICE_ACCOUNT_CREDENTIALS: Option<Credentials> =
env::var("GOOGLE_APPLICATION_CREDENTIALS").ok()
.and_then(|path| File::open(path).ok())
.and_then(|file| serde_json::from_reader(file).ok());
/// Descriptor of the currently monitored instance. Refer to the
/// documentation of `determine_monitored_resource` for more
/// information.
static ref MONITORED_RESOURCE: Value = determine_monitored_resource();
/// Path to the directory in which journaldriver should persist
/// its cursor state.
static ref CURSOR_DIR: PathBuf = env::var("CURSOR_POSITION_DIR")
.unwrap_or("/var/lib/journaldriver".into())
.into();
/// Path to the cursor position file itself.
static ref CURSOR_FILE: PathBuf = {
let mut path = CURSOR_DIR.clone();
path.push("cursor.pos");
path
};
/// Path to the temporary file used for cursor position writes.
static ref CURSOR_TMP_FILE: PathBuf = {
let mut path = CURSOR_DIR.clone();
path.push("cursor.tmp");
path
};
}
/// Convenience helper for retrieving values from the metadata server.
fn get_metadata(url: &str) -> Result<String> {
let response = ureq::get(url)
.set("Metadata-Flavor", "Google")
.timeout_connect(5000)
.timeout_read(5000)
.call();
if response.ok() {
// Whitespace is trimmed to remove newlines from responses.
let body = response
.into_string()
.context("Failed to decode metadata response")?
.trim()
.to_string();
Ok(body)
} else {
let status = response.status_line().to_string();
let body = response
.into_string()
.unwrap_or_else(|e| format!("Metadata body error: {}", e));
bail!("Metadata failure: {} ({})", body, status)
}
}
/// Convenience helper for determining the project ID.
fn get_project_id() -> String {
env::var("GOOGLE_CLOUD_PROJECT")
.or_else(|_| get_metadata(METADATA_PROJECT_URL))
.expect("Could not determine project ID")
}
/// Determines the monitored resource descriptor used in Stackdriver
/// logs. On GCP this will be set to the instance ID as returned by
/// the metadata server.
///
/// On non-GCP machines the value is determined by using the
/// `GOOGLE_CLOUD_PROJECT` and `LOG_STREAM` environment variables.
///
/// [issue #4]: https://github.com/tazjin/journaldriver/issues/4
fn determine_monitored_resource() -> Value {
if let Ok(log) = env::var("LOG_STREAM") {
// The special value `global` is recognised as a log stream name that
// results in a `global`-type resource descriptor. This is useful in
// cases where Stackdriver Error Reporting is intended to be used on
// a non-GCE instance. See [issue #4][] for details.
if log == "global" {
return json!({
"type": "global",
"labels": {
"project_id": PROJECT_ID.as_str(),
}
});
}
json!({
"type": "logging_log",
"labels": {
"project_id": PROJECT_ID.as_str(),
"name": log,
}
})
} else {
let instance_id = get_metadata(METADATA_ID_URL).expect("Could not determine instance ID");
let zone = get_metadata(METADATA_ZONE_URL).expect("Could not determine instance zone");
json!({
"type": "gce_instance",
"labels": {
"project_id": PROJECT_ID.as_str(),
"instance_id": instance_id,
"zone": zone,
}
})
}
}
/// Represents the response returned by the metadata server's token
/// endpoint. The token is normally valid for an hour.
#[derive(Deserialize)]
struct TokenResponse {
expires_in: u64,
access_token: String,
}
/// Struct used to store a token together with a sensible
/// representation of when it expires.
struct Token {
token: String,
fetched_at: Instant,
expires: Duration,
}
impl Token {
/// Does this token need to be renewed?
fn is_expired(&self) -> bool {
self.fetched_at.elapsed() > self.expires
}
}
/// Retrieves a token from the GCP metadata service. Retrieving these
/// tokens requires no additional authentication.
fn get_metadata_token() -> Result<Token> {
let body = get_metadata(METADATA_TOKEN_URL)?;
let token: TokenResponse = from_str(&body)?;
debug!("Fetched new token from metadata service");
Ok(Token {
fetched_at: Instant::now(),
expires: Duration::from_secs(token.expires_in / 2),
token: token.access_token,
})
}
/// Signs a token using static client credentials configured for a
/// service account. This service account must have been given the
/// `Log Writer` role in Google Cloud IAM.
///
/// The process for creating and signing these tokens is described
/// here:
///
/// https://developers.google.com/identity/protocols/OAuth2ServiceAccount#jwt-auth
fn sign_service_account_token(credentials: &Credentials) -> Result<Token> {
use medallion::{Algorithm, Header, Payload};
let iat = Utc::now();
let exp = iat
.checked_add_signed(chrono::Duration::seconds(3600))
.ok_or_else(|| format_err!("Failed to calculate token expiry"))?;
let header = Header {
alg: Algorithm::RS256,
headers: Some(json!({
"kid": credentials.private_key_id,
})),
};
let payload: Payload<()> = Payload {
iss: Some(credentials.client_email.clone()),
sub: Some(credentials.client_email.clone()),
aud: Some(LOGGING_SERVICE.to_string()),
iat: Some(iat.timestamp() as u64),
exp: Some(exp.timestamp() as u64),
..Default::default()
};
let token = medallion::Token::new(header, payload)
.sign(credentials.private_key.as_bytes())
.context("Signing service account token failed")?;
debug!("Signed new service account token");
Ok(Token {
token,
fetched_at: Instant::now(),
expires: Duration::from_secs(3000),
})
}
/// Retrieve the authentication token either by using static client
/// credentials, or by talking to the metadata server.
///
/// Which behaviour is used is controlled by the environment variable
/// `GOOGLE_APPLICATION_CREDENTIALS`, which should be configured to
/// point at a JSON private key file if service account authentication
/// is to be used.
fn get_token() -> Result<Token> {
if let Some(credentials) = SERVICE_ACCOUNT_CREDENTIALS.as_ref() {
sign_service_account_token(credentials)
} else {
get_metadata_token()
}
}
/// This structure represents the different types of payloads
/// supported by journaldriver.
///
/// Currently log entries can either contain plain text messages or
/// structured payloads in JSON-format.
#[derive(Debug, Serialize, PartialEq)]
#[serde(untagged)]
enum Payload {
TextPayload {
#[serde(rename = "textPayload")]
text_payload: String,
},
JsonPayload {
#[serde(rename = "jsonPayload")]
json_payload: Value,
},
}
/// Attempt to parse a log message as JSON and return it as a
/// structured payload. If parsing fails, return the entry in plain
/// text format.
fn message_to_payload(message: Option<String>) -> Payload {
match message {
None => Payload::TextPayload {
text_payload: "empty log entry".into(),
},
Some(text_payload) => {
// Attempt to deserialize the text payload as a generic
// JSON value.
if let Ok(json_payload) = serde_json::from_str::<Value>(&text_payload) {
// If JSON-parsing succeeded on the payload, check
// whether we parsed an object (Stackdriver does not
// expect other types of JSON payload) and return it
// in that case.
if json_payload.is_object() {
return Payload::JsonPayload { json_payload };
}
}
Payload::TextPayload { text_payload }
}
}
}
/// Attempt to parse journald's microsecond timestamps into a UTC
/// timestamp.
///
/// Parse errors are dismissed and returned as empty options: There
/// simply aren't any useful fallback mechanisms other than defaulting
/// to ingestion time for journaldriver's use-case.
fn parse_microseconds(input: String) -> Option<DateTime<Utc>> {
if input.len() != 16 {
return None;
}
let seconds: i64 = (&input[..10]).parse().ok()?;
let micros: u32 = (&input[10..]).parse().ok()?;
match Utc.timestamp_opt(seconds, micros * 1000) {
LocalResult::Single(time) => Some(time),
_ => None,
}
}
/// Converts a journald log message priority to a
/// Stackdriver-compatible severity number.
///
/// Both Stackdriver and journald specify equivalent
/// severities/priorities. Conveniently, the names are the same.
/// Inconveniently, the numbers are not.
///
/// For more information on the journald priorities, consult these
/// man-pages:
///
/// * systemd.journal-fields(7) (section 'PRIORITY')
/// * sd-daemon(3)
/// * systemd.exec(5) (section 'SyslogLevelPrefix')
///
/// Note that priorities can be logged by applications via the prefix
/// concept described in these man pages, without interfering with
/// structured JSON-payloads.
///
/// For more information on the Stackdriver severity levels, please
/// consult Google's documentation:
///
/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
///
/// Any unknown priority values result in no severity being set.
fn priority_to_severity(priority: String) -> Option<u32> {
match priority.as_ref() {
"0" => Some(800), // emerg
"1" => Some(700), // alert
"2" => Some(600), // crit
"3" => Some(500), // err
"4" => Some(400), // warning
"5" => Some(300), // notice
"6" => Some(200), // info
"7" => Some(100), // debug
_ => None,
}
}
/// This structure represents a log entry in the format expected by
/// the Stackdriver API.
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct LogEntry {
labels: Value,
#[serde(skip_serializing_if = "Option::is_none")]
timestamp: Option<DateTime<Utc>>,
#[serde(flatten)]
payload: Payload,
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
#[serde(skip_serializing_if = "Option::is_none")]
severity: Option<u32>,
}
impl From<JournalRecord> for LogEntry {
// Converts from the fields contained in a journald record to the
// representation required by Stackdriver Logging.
//
// The fields are documented in systemd.journal-fields(7).
fn from(mut record: JournalRecord) -> LogEntry {
// The message field is technically just a convention, but
// journald seems to default to it when ingesting unit
// output.
let payload = message_to_payload(record.remove("MESSAGE"));
// Presumably this is always set, but who can be sure
// about anything in this world.
let hostname = record.remove("_HOSTNAME");
// The unit is seemingly missing on kernel entries, but
// present on all others.
let unit = record.remove("_SYSTEMD_UNIT");
// The source timestamp (if present) is specified in
// microseconds since epoch.
//
// If it is not present or can not be parsed, journaldriver
// will not send a timestamp for the log entry and it will
// default to the ingestion time.
let timestamp = record
.remove("_SOURCE_REALTIME_TIMESTAMP")
.and_then(parse_microseconds);
// Journald uses syslogd's concept of priority. No idea if this is
// always present, but it's optional in the Stackdriver API, so we just
// omit it if we can't find or parse it.
let severity = record.remove("PRIORITY").and_then(priority_to_severity);
LogEntry {
payload,
timestamp,
labels: json!({
"host": hostname,
"unit": unit.unwrap_or_else(|| "syslog".into()),
}),
severity,
}
}
}
/// Attempt to read from the journal. If no new entry is present,
/// await the next one up to the specified timeout.
fn receive_next_record(timeout: Duration, journal: &mut Journal) -> Result<Option<JournalRecord>> {
let next_record = journal.next_record()?;
if next_record.is_some() {
return Ok(next_record);
}
Ok(journal.await_next_record(Some(timeout))?)
}
/// This function starts a double-looped, blocking receiver. It will
/// buffer messages for half a second before flushing them to
/// Stackdriver.
fn receiver_loop(mut journal: Journal) -> Result<()> {
let mut token = get_token()?;
let mut buf: Vec<LogEntry> = Vec::new();
let iteration = Duration::from_millis(500);
loop {
trace!("Beginning outer iteration");
let now = Instant::now();
loop {
if now.elapsed() > iteration {
break;
}
if let Ok(Some(entry)) = receive_next_record(iteration, &mut journal) {
trace!("Received a new entry");
buf.push(entry.into());
}
}
if !buf.is_empty() {
let to_flush = mem::replace(&mut buf, Vec::new());
flush(&mut token, to_flush, journal.cursor()?)?;
}
trace!("Done outer iteration");
}
}
/// Writes the current cursor into `/var/journaldriver/cursor.pos`. To
/// avoid issues with journaldriver being terminated while the cursor
/// is still being written, this will first write the cursor into a
/// temporary file and then move it.
fn persist_cursor(cursor: String) -> Result<()> {
// This code exists to aid in tracking down if there are other
// causes of issue #2 than what has already been taken care of.
//
// One theory is that journald (or the Rust library to interface
// with it) may occasionally return empty cursor strings. If this
// is ever the case, we would like to know about it.
if cursor.is_empty() {
error!("Received empty journald cursor position, refusing to persist!");
error!("Please report this message at https://github.com/tazjin/journaldriver/issues/2");
return Ok(());
}
let mut file = File::create(&*CURSOR_TMP_FILE).context("Failed to create cursor file")?;
write!(file, "{}", cursor).context("Failed to write cursor file")?;
rename(&*CURSOR_TMP_FILE, &*CURSOR_FILE)
.context("Failed to move cursor file")
.map_err(Into::into)
}
/// Flushes all drained records to Stackdriver. Any Stackdriver
/// message can at most contain 1000 log entries which means they are
/// chunked up here.
///
/// In some cases large payloads seem to cause errors in Stackdriver -
/// the chunks are therefore made smaller here.
///
/// If flushing is successful the last cursor position will be
/// persisted to disk.
fn flush(token: &mut Token, entries: Vec<LogEntry>, cursor: String) -> Result<()> {
if token.is_expired() {
debug!("Refreshing Google metadata access token");
let new_token = get_token()?;
*token = new_token;
}
for chunk in entries.chunks(750) {
let request = prepare_request(chunk);
if let Err(write_error) = write_entries(token, request) {
error!("Failed to write {} entries: {}", chunk.len(), write_error)
} else {
debug!("Wrote {} entries to Stackdriver", chunk.len())
}
}
persist_cursor(cursor)
}
/// Convert a slice of log entries into the format expected by
/// Stackdriver. This format is documented here:
///
/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write
fn prepare_request(entries: &[LogEntry]) -> Value {
json!({
"logName": format!("projects/{}/logs/{}", PROJECT_ID.as_str(), LOG_NAME.as_str()),
"resource": &*MONITORED_RESOURCE,
"entries": entries,
"partialSuccess": true
})
}
/// Perform the log entry insertion in Stackdriver Logging.
fn write_entries(token: &Token, request: Value) -> Result<()> {
let response = ureq::post(ENTRIES_WRITE_URL)
.set("Authorization", format!("Bearer {}", token.token).as_str())
// The timeout values are set relatively high, not because of
// an expectation of Stackdriver being slow but just to
// eventually hit an error case in case of network troubles.
// Presumably no request in a functioning environment will
// ever hit these limits.
.timeout_connect(2000)
.timeout_read(5000)
.send_json(request);
if response.ok() {
Ok(())
} else {
let status = response.status_line().to_string();
let body = response
.into_string()
.unwrap_or_else(|_| "no response body".into());
bail!("Write failure: {} ({})", body, status)
}
}
/// Attempt to read the initial cursor position from the configured
/// file. If there is no initial cursor position set, read from the
/// tail of the log.
///
/// The only "acceptable" error when reading the cursor position is
/// the cursor position file not existing, other errors are fatal
/// because they indicate a misconfiguration of journaldriver.
fn initial_cursor() -> Result<JournalSeek> {
let read_result: io::Result<String> = (|| {
let mut contents = String::new();
let mut file = File::open(&*CURSOR_FILE)?;
file.read_to_string(&mut contents)?;
Ok(contents.trim().into())
})();
match read_result {
Ok(cursor) => Ok(JournalSeek::Cursor { cursor }),
Err(ref err) if err.kind() == ErrorKind::NotFound => {
info!("No previous cursor position, reading from journal tail");
Ok(JournalSeek::Tail)
}
Err(err) => (Err(err).context("Could not read cursor position"))?,
}
}
fn main() {
env_logger::init();
// The directory in which cursor positions are persisted should
// have been created:
if !CURSOR_DIR.exists() {
error!("Cursor directory at '{:?}' does not exist", *CURSOR_DIR);
process::exit(1);
}
let cursor_position_dir = CURSOR_FILE
.parent()
.expect("Invalid cursor position file path");
fs::create_dir_all(cursor_position_dir)
.expect("Could not create directory to store cursor position in");
let mut journal =
Journal::open(JournalFiles::All, false, true).expect("Failed to open systemd journal");
let seek_position = initial_cursor().expect("Failed to determine initial cursor position");
match journal.seek(seek_position) {
Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
Err(err) => {
error!("Failed to set initial journal position: {}", err);
process::exit(1)
}
}
receiver_loop(journal).expect("log receiver encountered an unexpected error");
}