2018-10-09 10:52:54 +02:00
|
|
|
// Copyright (C) 2018 Vincent Ambo <mail@tazj.in>
|
2018-06-15 17:00:26 +02:00
|
|
|
//
|
|
|
|
// journaldriver is free software: you can redistribute it and/or
|
|
|
|
// modify it under the terms of the GNU General Public License as
|
|
|
|
// published by the Free Software Foundation, either version 3 of the
|
|
|
|
// License, or (at your option) any later version.
|
|
|
|
//
|
|
|
|
// This program is distributed in the hope that it will be useful,
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
2018-06-15 16:45:17 +02:00
|
|
|
//! This file implements journaldriver, a small application that
|
|
|
|
//! forwards logs from journald (systemd's log facility) to
|
|
|
|
//! Stackdriver Logging.
|
|
|
|
//!
|
|
|
|
//! Log entries are read continously from journald and are forwarded
|
|
|
|
//! to Stackdriver in batches.
|
|
|
|
//!
|
|
|
|
//! Stackdriver Logging has a concept of monitored resources. In the
|
2018-06-17 17:03:41 +02:00
|
|
|
//! simplest case this monitored resource will be the GCE instance on
|
|
|
|
//! which journaldriver is running.
|
2018-06-15 16:45:17 +02:00
|
|
|
//!
|
|
|
|
//! Information about the instance, the project and required security
|
|
|
|
//! credentials are retrieved from Google's metadata instance on GCP.
|
|
|
|
//!
|
2018-06-17 17:03:41 +02:00
|
|
|
//! To run journaldriver on non-GCP machines, users must specify the
|
|
|
|
//! `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_CLOUD_PROJECT` and
|
|
|
|
//! `LOG_NAME` environment variables.
|
2018-06-15 16:45:17 +02:00
|
|
|
|
2022-02-18 10:49:59 +01:00
|
|
|
use anyhow::{bail, Context, Result};
|
2022-02-18 10:23:35 +01:00
|
|
|
use lazy_static::lazy_static;
|
|
|
|
use log::{debug, error, info, trace};
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use serde_json::{from_str, json, Value};
|
2022-02-18 10:49:59 +01:00
|
|
|
use std::convert::TryInto;
|
2022-02-07 16:49:59 +01:00
|
|
|
use std::fs::{self, rename, File};
|
|
|
|
use std::io::{self, ErrorKind, Read, Write};
|
2018-06-16 18:40:09 +02:00
|
|
|
use std::path::PathBuf;
|
2018-05-27 23:57:24 +02:00
|
|
|
use std::time::{Duration, Instant};
|
2022-02-07 16:49:59 +01:00
|
|
|
use std::{env, mem, process};
|
2021-02-23 12:59:43 +01:00
|
|
|
use systemd::journal::{Journal, JournalFiles, JournalRecord, JournalSeek};
|
2018-05-27 20:09:37 +02:00
|
|
|
|
2018-06-16 17:57:11 +02:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests;
|
|
|
|
|
2018-06-17 16:47:43 +02:00
|
|
|
const LOGGING_SERVICE: &str = "https://logging.googleapis.com/google.logging.v2.LoggingServiceV2";
|
2018-06-15 16:45:17 +02:00
|
|
|
const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
|
2022-02-07 16:49:59 +01:00
|
|
|
const METADATA_TOKEN_URL: &str =
|
|
|
|
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
|
2018-06-15 16:45:17 +02:00
|
|
|
const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
|
|
|
|
const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
|
2022-02-07 16:49:59 +01:00
|
|
|
const METADATA_PROJECT_URL: &str =
|
|
|
|
"http://metadata.google.internal/computeMetadata/v1/project/project-id";
|
2018-06-14 16:48:43 +02:00
|
|
|
|
2018-06-17 16:47:43 +02:00
|
|
|
/// Representation of static service account credentials for GCP.
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
|
struct Credentials {
|
|
|
|
/// PEM encoded private key
|
|
|
|
private_key: String,
|
|
|
|
|
|
|
|
/// `kid` of this private key
|
|
|
|
private_key_id: String,
|
|
|
|
|
|
|
|
/// "email" address of the service account
|
|
|
|
client_email: String,
|
|
|
|
}
|
|
|
|
|
2018-06-15 16:45:17 +02:00
|
|
|
lazy_static! {
|
2018-06-17 17:03:41 +02:00
|
|
|
/// ID of the GCP project to which to send logs.
|
|
|
|
static ref PROJECT_ID: String = get_project_id();
|
2018-06-15 16:45:17 +02:00
|
|
|
|
2018-06-17 17:03:41 +02:00
|
|
|
/// Name of the log to write to (this should only be manually
|
|
|
|
/// configured if not running on GCP):
|
|
|
|
static ref LOG_NAME: String = env::var("LOG_NAME")
|
|
|
|
.unwrap_or("journaldriver".into());
|
2018-06-15 16:45:17 +02:00
|
|
|
|
2018-06-17 16:47:43 +02:00
|
|
|
/// Service account credentials (if configured)
|
|
|
|
static ref SERVICE_ACCOUNT_CREDENTIALS: Option<Credentials> =
|
|
|
|
env::var("GOOGLE_APPLICATION_CREDENTIALS").ok()
|
|
|
|
.and_then(|path| File::open(path).ok())
|
|
|
|
.and_then(|file| serde_json::from_reader(file).ok());
|
|
|
|
|
2018-06-17 17:03:41 +02:00
|
|
|
/// Descriptor of the currently monitored instance. Refer to the
|
|
|
|
/// documentation of `determine_monitored_resource` for more
|
|
|
|
/// information.
|
|
|
|
static ref MONITORED_RESOURCE: Value = determine_monitored_resource();
|
2018-06-16 18:40:09 +02:00
|
|
|
|
2018-10-09 11:20:38 +02:00
|
|
|
/// Path to the directory in which journaldriver should persist
|
|
|
|
/// its cursor state.
|
|
|
|
static ref CURSOR_DIR: PathBuf = env::var("CURSOR_POSITION_DIR")
|
|
|
|
.unwrap_or("/var/lib/journaldriver".into())
|
2018-06-16 18:40:09 +02:00
|
|
|
.into();
|
2018-10-09 10:52:54 +02:00
|
|
|
|
2018-10-09 11:20:38 +02:00
|
|
|
/// Path to the cursor position file itself.
|
|
|
|
static ref CURSOR_FILE: PathBuf = {
|
|
|
|
let mut path = CURSOR_DIR.clone();
|
|
|
|
path.push("cursor.pos");
|
|
|
|
path
|
|
|
|
};
|
|
|
|
|
2018-10-09 10:52:54 +02:00
|
|
|
/// Path to the temporary file used for cursor position writes.
|
|
|
|
static ref CURSOR_TMP_FILE: PathBuf = {
|
2018-10-09 11:20:38 +02:00
|
|
|
let mut path = CURSOR_DIR.clone();
|
|
|
|
path.push("cursor.tmp");
|
|
|
|
path
|
2018-10-09 10:52:54 +02:00
|
|
|
};
|
2018-06-15 16:45:17 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Convenience helper for retrieving values from the metadata server.
|
|
|
|
fn get_metadata(url: &str) -> Result<String> {
|
2022-02-19 16:23:29 +01:00
|
|
|
let response = crimp::Request::get(url)
|
|
|
|
.header("Metadata-Flavor", "Google")?
|
|
|
|
.timeout(std::time::Duration::from_secs(5))?
|
|
|
|
.send()?
|
|
|
|
.as_string()?;
|
|
|
|
|
|
|
|
if !response.is_success() {
|
|
|
|
bail!(
|
|
|
|
"Error response ({}) from metadata server: {}",
|
|
|
|
response.status,
|
|
|
|
response.body
|
|
|
|
);
|
2018-10-05 23:37:22 +02:00
|
|
|
}
|
2022-02-19 16:23:29 +01:00
|
|
|
|
|
|
|
Ok(response.body.trim().to_owned())
|
2018-06-15 16:45:17 +02:00
|
|
|
}
|
|
|
|
|
2018-06-17 17:03:41 +02:00
|
|
|
/// Convenience helper for determining the project ID.
|
|
|
|
fn get_project_id() -> String {
|
|
|
|
env::var("GOOGLE_CLOUD_PROJECT")
|
2022-02-18 09:36:53 +01:00
|
|
|
.or_else(|_| get_metadata(METADATA_PROJECT_URL))
|
2018-06-17 17:03:41 +02:00
|
|
|
.expect("Could not determine project ID")
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Determines the monitored resource descriptor used in Stackdriver
|
|
|
|
/// logs. On GCP this will be set to the instance ID as returned by
|
|
|
|
/// the metadata server.
|
|
|
|
///
|
|
|
|
/// On non-GCP machines the value is determined by using the
|
2019-02-04 15:48:53 +01:00
|
|
|
/// `GOOGLE_CLOUD_PROJECT` and `LOG_STREAM` environment variables.
|
|
|
|
///
|
|
|
|
/// [issue #4]: https://github.com/tazjin/journaldriver/issues/4
|
2018-06-17 17:03:41 +02:00
|
|
|
fn determine_monitored_resource() -> Value {
|
2018-06-17 22:36:01 +02:00
|
|
|
if let Ok(log) = env::var("LOG_STREAM") {
|
2019-02-04 15:48:53 +01:00
|
|
|
// The special value `global` is recognised as a log stream name that
|
|
|
|
// results in a `global`-type resource descriptor. This is useful in
|
|
|
|
// cases where Stackdriver Error Reporting is intended to be used on
|
|
|
|
// a non-GCE instance. See [issue #4][] for details.
|
|
|
|
if log == "global" {
|
|
|
|
return json!({
|
|
|
|
"type": "global",
|
|
|
|
"labels": {
|
|
|
|
"project_id": PROJECT_ID.as_str(),
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2018-06-17 17:03:41 +02:00
|
|
|
json!({
|
|
|
|
"type": "logging_log",
|
|
|
|
"labels": {
|
|
|
|
"project_id": PROJECT_ID.as_str(),
|
|
|
|
"name": log,
|
|
|
|
}
|
|
|
|
})
|
|
|
|
} else {
|
2022-02-07 16:49:59 +01:00
|
|
|
let instance_id = get_metadata(METADATA_ID_URL).expect("Could not determine instance ID");
|
2018-06-17 17:03:41 +02:00
|
|
|
|
2022-02-07 16:49:59 +01:00
|
|
|
let zone = get_metadata(METADATA_ZONE_URL).expect("Could not determine instance zone");
|
2018-06-17 17:03:41 +02:00
|
|
|
|
|
|
|
json!({
|
|
|
|
"type": "gce_instance",
|
|
|
|
"labels": {
|
|
|
|
"project_id": PROJECT_ID.as_str(),
|
|
|
|
"instance_id": instance_id,
|
|
|
|
"zone": zone,
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-17 16:47:43 +02:00
|
|
|
/// Represents the response returned by the metadata server's token
|
|
|
|
/// endpoint. The token is normally valid for an hour.
|
|
|
|
#[derive(Deserialize)]
|
|
|
|
struct TokenResponse {
|
|
|
|
expires_in: u64,
|
|
|
|
access_token: String,
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Struct used to store a token together with a sensible
|
|
|
|
/// representation of when it expires.
|
|
|
|
struct Token {
|
|
|
|
token: String,
|
|
|
|
fetched_at: Instant,
|
|
|
|
expires: Duration,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Token {
|
|
|
|
/// Does this token need to be renewed?
|
|
|
|
fn is_expired(&self) -> bool {
|
|
|
|
self.fetched_at.elapsed() > self.expires
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Retrieves a token from the GCP metadata service. Retrieving these
|
|
|
|
/// tokens requires no additional authentication.
|
|
|
|
fn get_metadata_token() -> Result<Token> {
|
2018-10-05 23:37:22 +02:00
|
|
|
let body = get_metadata(METADATA_TOKEN_URL)?;
|
|
|
|
let token: TokenResponse = from_str(&body)?;
|
2018-06-17 16:47:43 +02:00
|
|
|
|
|
|
|
debug!("Fetched new token from metadata service");
|
|
|
|
|
|
|
|
Ok(Token {
|
|
|
|
fetched_at: Instant::now(),
|
|
|
|
expires: Duration::from_secs(token.expires_in / 2),
|
|
|
|
token: token.access_token,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Signs a token using static client credentials configured for a
|
|
|
|
/// service account. This service account must have been given the
|
|
|
|
/// `Log Writer` role in Google Cloud IAM.
|
|
|
|
///
|
|
|
|
/// The process for creating and signing these tokens is described
|
|
|
|
/// here:
|
|
|
|
///
|
|
|
|
/// https://developers.google.com/identity/protocols/OAuth2ServiceAccount#jwt-auth
|
|
|
|
fn sign_service_account_token(credentials: &Credentials) -> Result<Token> {
|
|
|
|
use medallion::{Algorithm, Header, Payload};
|
|
|
|
|
2022-02-18 10:49:59 +01:00
|
|
|
let iat = time::OffsetDateTime::now_utc();
|
|
|
|
let exp = iat + time::Duration::seconds(3600);
|
2018-06-17 16:47:43 +02:00
|
|
|
|
|
|
|
let header = Header {
|
|
|
|
alg: Algorithm::RS256,
|
|
|
|
headers: Some(json!({
|
|
|
|
"kid": credentials.private_key_id,
|
|
|
|
})),
|
|
|
|
};
|
|
|
|
|
|
|
|
let payload: Payload<()> = Payload {
|
|
|
|
iss: Some(credentials.client_email.clone()),
|
|
|
|
sub: Some(credentials.client_email.clone()),
|
|
|
|
aud: Some(LOGGING_SERVICE.to_string()),
|
2022-02-18 10:49:59 +01:00
|
|
|
iat: Some(iat.unix_timestamp().try_into().unwrap()),
|
|
|
|
exp: Some(exp.unix_timestamp().try_into().unwrap()),
|
2018-06-17 16:47:43 +02:00
|
|
|
..Default::default()
|
|
|
|
};
|
|
|
|
|
|
|
|
let token = medallion::Token::new(header, payload)
|
|
|
|
.sign(credentials.private_key.as_bytes())
|
|
|
|
.context("Signing service account token failed")?;
|
|
|
|
|
|
|
|
debug!("Signed new service account token");
|
|
|
|
|
|
|
|
Ok(Token {
|
|
|
|
token,
|
|
|
|
fetched_at: Instant::now(),
|
|
|
|
expires: Duration::from_secs(3000),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Retrieve the authentication token either by using static client
|
|
|
|
/// credentials, or by talking to the metadata server.
|
|
|
|
///
|
|
|
|
/// Which behaviour is used is controlled by the environment variable
|
|
|
|
/// `GOOGLE_APPLICATION_CREDENTIALS`, which should be configured to
|
|
|
|
/// point at a JSON private key file if service account authentication
|
|
|
|
/// is to be used.
|
|
|
|
fn get_token() -> Result<Token> {
|
|
|
|
if let Some(credentials) = SERVICE_ACCOUNT_CREDENTIALS.as_ref() {
|
|
|
|
sign_service_account_token(credentials)
|
|
|
|
} else {
|
|
|
|
get_metadata_token()
|
|
|
|
}
|
|
|
|
}
|
2018-06-16 17:57:11 +02:00
|
|
|
|
|
|
|
/// This structure represents the different types of payloads
|
|
|
|
/// supported by journaldriver.
|
|
|
|
///
|
|
|
|
/// Currently log entries can either contain plain text messages or
|
|
|
|
/// structured payloads in JSON-format.
|
|
|
|
#[derive(Debug, Serialize, PartialEq)]
|
|
|
|
#[serde(untagged)]
|
|
|
|
enum Payload {
|
|
|
|
TextPayload {
|
|
|
|
#[serde(rename = "textPayload")]
|
|
|
|
text_payload: String,
|
|
|
|
},
|
|
|
|
JsonPayload {
|
2018-06-17 15:18:45 +02:00
|
|
|
#[serde(rename = "jsonPayload")]
|
2018-06-16 17:57:11 +02:00
|
|
|
json_payload: Value,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Attempt to parse a log message as JSON and return it as a
|
|
|
|
/// structured payload. If parsing fails, return the entry in plain
|
|
|
|
/// text format.
|
|
|
|
fn message_to_payload(message: Option<String>) -> Payload {
|
|
|
|
match message {
|
2022-02-07 16:49:59 +01:00
|
|
|
None => Payload::TextPayload {
|
|
|
|
text_payload: "empty log entry".into(),
|
|
|
|
},
|
2018-06-16 17:57:11 +02:00
|
|
|
Some(text_payload) => {
|
|
|
|
// Attempt to deserialize the text payload as a generic
|
|
|
|
// JSON value.
|
|
|
|
if let Ok(json_payload) = serde_json::from_str::<Value>(&text_payload) {
|
|
|
|
// If JSON-parsing succeeded on the payload, check
|
|
|
|
// whether we parsed an object (Stackdriver does not
|
|
|
|
// expect other types of JSON payload) and return it
|
|
|
|
// in that case.
|
|
|
|
if json_payload.is_object() {
|
2022-02-07 16:49:59 +01:00
|
|
|
return Payload::JsonPayload { json_payload };
|
2018-06-16 17:57:11 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Payload::TextPayload { text_payload }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-17 03:29:14 +02:00
|
|
|
/// Attempt to parse journald's microsecond timestamps into a UTC
|
|
|
|
/// timestamp.
|
|
|
|
///
|
|
|
|
/// Parse errors are dismissed and returned as empty options: There
|
|
|
|
/// simply aren't any useful fallback mechanisms other than defaulting
|
|
|
|
/// to ingestion time for journaldriver's use-case.
|
2022-02-18 10:49:59 +01:00
|
|
|
fn parse_microseconds(input: String) -> Option<time::OffsetDateTime> {
|
2018-06-17 03:29:14 +02:00
|
|
|
if input.len() != 16 {
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
|
2022-02-18 10:49:59 +01:00
|
|
|
let micros: i128 = input.parse().ok()?;
|
|
|
|
let nanos: i128 = micros * 1000;
|
2018-06-17 03:29:14 +02:00
|
|
|
|
2022-02-18 10:49:59 +01:00
|
|
|
time::OffsetDateTime::from_unix_timestamp_nanos(nanos).ok()
|
2018-06-17 03:29:14 +02:00
|
|
|
}
|
|
|
|
|
2018-09-24 15:38:43 +02:00
|
|
|
/// Converts a journald log message priority to a
|
|
|
|
/// Stackdriver-compatible severity number.
|
2018-09-24 15:10:13 +02:00
|
|
|
///
|
2018-09-24 15:38:43 +02:00
|
|
|
/// Both Stackdriver and journald specify equivalent
|
|
|
|
/// severities/priorities. Conveniently, the names are the same.
|
|
|
|
/// Inconveniently, the numbers are not.
|
|
|
|
///
|
|
|
|
/// For more information on the journald priorities, consult these
|
|
|
|
/// man-pages:
|
|
|
|
///
|
|
|
|
/// * systemd.journal-fields(7) (section 'PRIORITY')
|
|
|
|
/// * sd-daemon(3)
|
|
|
|
/// * systemd.exec(5) (section 'SyslogLevelPrefix')
|
|
|
|
///
|
|
|
|
/// Note that priorities can be logged by applications via the prefix
|
|
|
|
/// concept described in these man pages, without interfering with
|
|
|
|
/// structured JSON-payloads.
|
|
|
|
///
|
|
|
|
/// For more information on the Stackdriver severity levels, please
|
|
|
|
/// consult Google's documentation:
|
|
|
|
///
|
|
|
|
/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
|
|
|
|
///
|
|
|
|
/// Any unknown priority values result in no severity being set.
|
2018-09-24 15:10:13 +02:00
|
|
|
fn priority_to_severity(priority: String) -> Option<u32> {
|
|
|
|
match priority.as_ref() {
|
|
|
|
"0" => Some(800), // emerg
|
|
|
|
"1" => Some(700), // alert
|
|
|
|
"2" => Some(600), // crit
|
|
|
|
"3" => Some(500), // err
|
|
|
|
"4" => Some(400), // warning
|
|
|
|
"5" => Some(300), // notice
|
|
|
|
"6" => Some(200), // info
|
|
|
|
"7" => Some(100), // debug
|
|
|
|
_ => None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-15 16:45:17 +02:00
|
|
|
/// This structure represents a log entry in the format expected by
|
|
|
|
/// the Stackdriver API.
|
|
|
|
#[derive(Debug, Serialize)]
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
struct LogEntry {
|
|
|
|
labels: Value,
|
2018-06-17 03:29:14 +02:00
|
|
|
|
|
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
2022-02-18 10:49:59 +01:00
|
|
|
#[serde(with = "time::serde::rfc3339::option")]
|
|
|
|
timestamp: Option<time::OffsetDateTime>,
|
2018-06-17 03:29:14 +02:00
|
|
|
|
2018-06-16 17:57:11 +02:00
|
|
|
#[serde(flatten)]
|
|
|
|
payload: Payload,
|
2018-09-24 15:10:13 +02:00
|
|
|
|
|
|
|
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
|
|
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
|
|
severity: Option<u32>,
|
2018-05-27 23:57:24 +02:00
|
|
|
}
|
|
|
|
|
2018-06-15 16:45:17 +02:00
|
|
|
impl From<JournalRecord> for LogEntry {
|
|
|
|
// Converts from the fields contained in a journald record to the
|
|
|
|
// representation required by Stackdriver Logging.
|
|
|
|
//
|
|
|
|
// The fields are documented in systemd.journal-fields(7).
|
|
|
|
fn from(mut record: JournalRecord) -> LogEntry {
|
|
|
|
// The message field is technically just a convention, but
|
|
|
|
// journald seems to default to it when ingesting unit
|
|
|
|
// output.
|
2018-06-16 17:57:11 +02:00
|
|
|
let payload = message_to_payload(record.remove("MESSAGE"));
|
2018-06-15 16:45:17 +02:00
|
|
|
|
|
|
|
// Presumably this is always set, but who can be sure
|
|
|
|
// about anything in this world.
|
|
|
|
let hostname = record.remove("_HOSTNAME");
|
|
|
|
|
|
|
|
// The unit is seemingly missing on kernel entries, but
|
|
|
|
// present on all others.
|
|
|
|
let unit = record.remove("_SYSTEMD_UNIT");
|
|
|
|
|
2018-06-17 03:29:14 +02:00
|
|
|
// The source timestamp (if present) is specified in
|
|
|
|
// microseconds since epoch.
|
|
|
|
//
|
|
|
|
// If it is not present or can not be parsed, journaldriver
|
|
|
|
// will not send a timestamp for the log entry and it will
|
|
|
|
// default to the ingestion time.
|
|
|
|
let timestamp = record
|
|
|
|
.remove("_SOURCE_REALTIME_TIMESTAMP")
|
|
|
|
.and_then(parse_microseconds);
|
2018-06-15 16:45:17 +02:00
|
|
|
|
2018-09-24 15:10:13 +02:00
|
|
|
// Journald uses syslogd's concept of priority. No idea if this is
|
|
|
|
// always present, but it's optional in the Stackdriver API, so we just
|
|
|
|
// omit it if we can't find or parse it.
|
2022-02-07 16:49:59 +01:00
|
|
|
let severity = record.remove("PRIORITY").and_then(priority_to_severity);
|
2018-09-24 15:10:13 +02:00
|
|
|
|
2018-06-15 16:45:17 +02:00
|
|
|
LogEntry {
|
2018-06-16 17:57:11 +02:00
|
|
|
payload,
|
2018-06-17 03:29:14 +02:00
|
|
|
timestamp,
|
2018-06-15 16:45:17 +02:00
|
|
|
labels: json!({
|
|
|
|
"host": hostname,
|
|
|
|
"unit": unit.unwrap_or_else(|| "syslog".into()),
|
|
|
|
}),
|
2018-09-24 15:10:13 +02:00
|
|
|
severity,
|
2018-05-27 23:57:24 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-16 20:47:42 +02:00
|
|
|
/// Attempt to read from the journal. If no new entry is present,
|
|
|
|
/// await the next one up to the specified timeout.
|
2022-02-07 16:49:59 +01:00
|
|
|
fn receive_next_record(timeout: Duration, journal: &mut Journal) -> Result<Option<JournalRecord>> {
|
2018-06-16 20:47:42 +02:00
|
|
|
let next_record = journal.next_record()?;
|
|
|
|
if next_record.is_some() {
|
|
|
|
return Ok(next_record);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(journal.await_next_record(Some(timeout))?)
|
|
|
|
}
|
|
|
|
|
2018-05-28 09:45:07 +02:00
|
|
|
/// This function starts a double-looped, blocking receiver. It will
|
|
|
|
/// buffer messages for half a second before flushing them to
|
|
|
|
/// Stackdriver.
|
2018-06-15 16:45:17 +02:00
|
|
|
fn receiver_loop(mut journal: Journal) -> Result<()> {
|
2018-06-17 16:47:43 +02:00
|
|
|
let mut token = get_token()?;
|
2018-06-15 16:45:17 +02:00
|
|
|
|
|
|
|
let mut buf: Vec<LogEntry> = Vec::new();
|
2018-05-27 23:57:24 +02:00
|
|
|
let iteration = Duration::from_millis(500);
|
|
|
|
|
|
|
|
loop {
|
|
|
|
trace!("Beginning outer iteration");
|
|
|
|
let now = Instant::now();
|
|
|
|
|
|
|
|
loop {
|
|
|
|
if now.elapsed() > iteration {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2018-06-16 20:47:42 +02:00
|
|
|
if let Ok(Some(entry)) = receive_next_record(iteration, &mut journal) {
|
2018-06-15 16:45:17 +02:00
|
|
|
trace!("Received a new entry");
|
|
|
|
buf.push(entry.into());
|
2018-05-27 23:57:24 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !buf.is_empty() {
|
2018-06-05 15:24:03 +02:00
|
|
|
let to_flush = mem::replace(&mut buf, Vec::new());
|
2018-10-05 23:37:22 +02:00
|
|
|
flush(&mut token, to_flush, journal.cursor()?)?;
|
2018-05-27 23:57:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
trace!("Done outer iteration");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-09 10:52:54 +02:00
|
|
|
/// Writes the current cursor into `/var/journaldriver/cursor.pos`. To
|
|
|
|
/// avoid issues with journaldriver being terminated while the cursor
|
|
|
|
/// is still being written, this will first write the cursor into a
|
|
|
|
/// temporary file and then move it.
|
2018-06-16 18:40:09 +02:00
|
|
|
fn persist_cursor(cursor: String) -> Result<()> {
|
2018-10-09 11:01:33 +02:00
|
|
|
// This code exists to aid in tracking down if there are other
|
|
|
|
// causes of issue #2 than what has already been taken care of.
|
|
|
|
//
|
|
|
|
// One theory is that journald (or the Rust library to interface
|
|
|
|
// with it) may occasionally return empty cursor strings. If this
|
|
|
|
// is ever the case, we would like to know about it.
|
|
|
|
if cursor.is_empty() {
|
|
|
|
error!("Received empty journald cursor position, refusing to persist!");
|
|
|
|
error!("Please report this message at https://github.com/tazjin/journaldriver/issues/2");
|
2022-02-07 16:49:59 +01:00
|
|
|
return Ok(());
|
2018-10-09 11:01:33 +02:00
|
|
|
}
|
|
|
|
|
2022-02-07 16:49:59 +01:00
|
|
|
let mut file = File::create(&*CURSOR_TMP_FILE).context("Failed to create cursor file")?;
|
2018-10-09 11:20:38 +02:00
|
|
|
|
2018-10-09 10:52:54 +02:00
|
|
|
write!(file, "{}", cursor).context("Failed to write cursor file")?;
|
2018-10-09 11:20:38 +02:00
|
|
|
|
2018-10-09 10:52:54 +02:00
|
|
|
rename(&*CURSOR_TMP_FILE, &*CURSOR_FILE)
|
|
|
|
.context("Failed to move cursor file")
|
|
|
|
.map_err(Into::into)
|
2018-06-16 18:40:09 +02:00
|
|
|
}
|
|
|
|
|
2018-06-05 15:24:03 +02:00
|
|
|
/// Flushes all drained records to Stackdriver. Any Stackdriver
|
|
|
|
/// message can at most contain 1000 log entries which means they are
|
|
|
|
/// chunked up here.
|
2018-06-16 18:40:09 +02:00
|
|
|
///
|
2018-06-16 20:55:52 +02:00
|
|
|
/// In some cases large payloads seem to cause errors in Stackdriver -
|
|
|
|
/// the chunks are therefore made smaller here.
|
|
|
|
///
|
2018-06-16 18:40:09 +02:00
|
|
|
/// If flushing is successful the last cursor position will be
|
|
|
|
/// persisted to disk.
|
2022-02-07 16:49:59 +01:00
|
|
|
fn flush(token: &mut Token, entries: Vec<LogEntry>, cursor: String) -> Result<()> {
|
2018-06-15 16:45:17 +02:00
|
|
|
if token.is_expired() {
|
|
|
|
debug!("Refreshing Google metadata access token");
|
2018-06-17 16:47:43 +02:00
|
|
|
let new_token = get_token()?;
|
2021-02-23 12:59:43 +01:00
|
|
|
*token = new_token;
|
2018-06-15 16:45:17 +02:00
|
|
|
}
|
|
|
|
|
2018-06-17 15:24:58 +02:00
|
|
|
for chunk in entries.chunks(750) {
|
2018-06-15 16:45:17 +02:00
|
|
|
let request = prepare_request(chunk);
|
2018-10-05 23:37:22 +02:00
|
|
|
if let Err(write_error) = write_entries(token, request) {
|
2018-06-15 16:45:17 +02:00
|
|
|
error!("Failed to write {} entries: {}", chunk.len(), write_error)
|
|
|
|
} else {
|
|
|
|
debug!("Wrote {} entries to Stackdriver", chunk.len())
|
|
|
|
}
|
2018-06-05 15:24:03 +02:00
|
|
|
}
|
2018-06-15 16:45:17 +02:00
|
|
|
|
2018-06-16 18:40:09 +02:00
|
|
|
persist_cursor(cursor)
|
2018-05-27 23:57:24 +02:00
|
|
|
}
|
|
|
|
|
2018-06-15 16:45:17 +02:00
|
|
|
/// Convert a slice of log entries into the format expected by
|
|
|
|
/// Stackdriver. This format is documented here:
|
|
|
|
///
|
|
|
|
/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write
|
|
|
|
fn prepare_request(entries: &[LogEntry]) -> Value {
|
|
|
|
json!({
|
2018-06-17 17:03:41 +02:00
|
|
|
"logName": format!("projects/{}/logs/{}", PROJECT_ID.as_str(), LOG_NAME.as_str()),
|
2018-06-15 16:45:17 +02:00
|
|
|
"resource": &*MONITORED_RESOURCE,
|
|
|
|
"entries": entries,
|
|
|
|
"partialSuccess": true
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Perform the log entry insertion in Stackdriver Logging.
|
2018-10-05 23:37:22 +02:00
|
|
|
fn write_entries(token: &Token, request: Value) -> Result<()> {
|
2022-02-19 16:23:29 +01:00
|
|
|
let response = crimp::Request::post(ENTRIES_WRITE_URL)
|
|
|
|
.json(&request)?
|
|
|
|
.header("Authorization", format!("Bearer {}", token.token).as_str())?
|
2018-10-05 23:37:22 +02:00
|
|
|
// The timeout values are set relatively high, not because of
|
|
|
|
// an expectation of Stackdriver being slow but just to
|
2022-02-19 16:23:29 +01:00
|
|
|
// eventually force an error in case of network troubles.
|
2018-10-05 23:37:22 +02:00
|
|
|
// Presumably no request in a functioning environment will
|
|
|
|
// ever hit these limits.
|
2022-02-19 16:23:29 +01:00
|
|
|
.timeout(std::time::Duration::from_secs(5))?
|
|
|
|
.send()?;
|
2018-10-05 23:37:22 +02:00
|
|
|
|
2022-02-19 16:23:29 +01:00
|
|
|
if !response.is_success() {
|
|
|
|
let status = response.status;
|
2022-02-07 16:49:59 +01:00
|
|
|
let body = response
|
2022-02-19 16:23:29 +01:00
|
|
|
.as_string()
|
|
|
|
.map(|r| r.body)
|
|
|
|
.unwrap_or_else(|_| "no valid response body".to_owned());
|
|
|
|
|
|
|
|
bail!("Writing to Stackdriver failed({}): {}", status, body);
|
2018-06-17 03:46:50 +02:00
|
|
|
}
|
2022-02-19 16:23:29 +01:00
|
|
|
|
|
|
|
Ok(())
|
2018-06-15 16:45:17 +02:00
|
|
|
}
|
|
|
|
|
2018-06-16 18:40:09 +02:00
|
|
|
/// Attempt to read the initial cursor position from the configured
|
|
|
|
/// file. If there is no initial cursor position set, read from the
|
|
|
|
/// tail of the log.
|
|
|
|
///
|
|
|
|
/// The only "acceptable" error when reading the cursor position is
|
|
|
|
/// the cursor position file not existing, other errors are fatal
|
|
|
|
/// because they indicate a misconfiguration of journaldriver.
|
|
|
|
fn initial_cursor() -> Result<JournalSeek> {
|
|
|
|
let read_result: io::Result<String> = (|| {
|
|
|
|
let mut contents = String::new();
|
2018-10-09 10:52:54 +02:00
|
|
|
let mut file = File::open(&*CURSOR_FILE)?;
|
2018-06-16 18:40:09 +02:00
|
|
|
file.read_to_string(&mut contents)?;
|
|
|
|
Ok(contents.trim().into())
|
|
|
|
})();
|
|
|
|
|
|
|
|
match read_result {
|
|
|
|
Ok(cursor) => Ok(JournalSeek::Cursor { cursor }),
|
|
|
|
Err(ref err) if err.kind() == ErrorKind::NotFound => {
|
|
|
|
info!("No previous cursor position, reading from journal tail");
|
|
|
|
Ok(JournalSeek::Tail)
|
|
|
|
}
|
2022-02-07 16:49:59 +01:00
|
|
|
Err(err) => (Err(err).context("Could not read cursor position"))?,
|
2018-06-16 18:40:09 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-07 16:49:59 +01:00
|
|
|
fn main() {
|
2018-05-27 23:57:24 +02:00
|
|
|
env_logger::init();
|
|
|
|
|
2018-10-09 11:20:38 +02:00
|
|
|
// The directory in which cursor positions are persisted should
|
|
|
|
// have been created:
|
|
|
|
if !CURSOR_DIR.exists() {
|
|
|
|
error!("Cursor directory at '{:?}' does not exist", *CURSOR_DIR);
|
|
|
|
process::exit(1);
|
|
|
|
}
|
|
|
|
|
2022-02-07 16:49:59 +01:00
|
|
|
let cursor_position_dir = CURSOR_FILE
|
|
|
|
.parent()
|
2018-06-16 18:40:09 +02:00
|
|
|
.expect("Invalid cursor position file path");
|
|
|
|
|
|
|
|
fs::create_dir_all(cursor_position_dir)
|
|
|
|
.expect("Could not create directory to store cursor position in");
|
|
|
|
|
2022-02-07 16:49:59 +01:00
|
|
|
let mut journal =
|
|
|
|
Journal::open(JournalFiles::All, false, true).expect("Failed to open systemd journal");
|
2018-05-27 23:57:24 +02:00
|
|
|
|
2022-02-07 16:49:59 +01:00
|
|
|
let seek_position = initial_cursor().expect("Failed to determine initial cursor position");
|
2018-06-16 18:40:09 +02:00
|
|
|
|
|
|
|
match journal.seek(seek_position) {
|
2018-05-27 23:57:24 +02:00
|
|
|
Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
|
|
|
|
Err(err) => {
|
|
|
|
error!("Failed to set initial journal position: {}", err);
|
|
|
|
process::exit(1)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-15 16:45:17 +02:00
|
|
|
receiver_loop(journal).expect("log receiver encountered an unexpected error");
|
2018-05-27 20:09:37 +02:00
|
|
|
}
|