refactor: Replace reqwest library with ureq

This replaces reqwest with the more simplistic ureq library for
performing required HTTP requests.

Reqwest comes with a lot of (tokio-based) machinery for
high-performance requesting that is a bit out of scope for
journaldriver's needs.

This clocks in at 62 (!) fewer dependencies after the change, with
equivalent functionality. Wew.
This commit is contained in:
Vincent Ambo 2018-10-05 23:37:22 +02:00 committed by Vincent Ambo
parent a4084bf1e0
commit 86c25cc226
4 changed files with 390 additions and 1009 deletions

1312
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -8,15 +8,14 @@ license = "GPL-3.0-or-later"
chrono = { version = "0.4", features = [ "serde" ]} chrono = { version = "0.4", features = [ "serde" ]}
env_logger = "0.5" env_logger = "0.5"
failure = "0.1" failure = "0.1"
hyper = "0.11"
lazy_static = "1.0" lazy_static = "1.0"
log = "0.4" log = "0.4"
medallion = "2.2" medallion = "2.2"
reqwest = "0.8"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
systemd = "0.3" systemd = "0.3"
ureq = { version = "0.6.2", features = [ "json" ]}
[build-dependencies] [build-dependencies]
pkg-config = "0.3" pkg-config = "0.3"

View file

@ -11,7 +11,7 @@ with pkgs; rustPlatform.buildRustPackage {
name = "journaldriver"; name = "journaldriver";
version = "1.0.0"; version = "1.0.0";
cargoSha256 = "04llhriwsrjqnkbjgd22nhci6zmhadclnd8r2bw5092gwdamf49k"; cargoSha256 = "03rq96hzv97wh2gbzi8sz796bqgh6pbpvdn0zy6zgq2f2sgkavsl";
src = ./.; src = ./.;

View file

@ -32,7 +32,6 @@
//! `LOG_NAME` environment variables. //! `LOG_NAME` environment variables.
#[macro_use] extern crate failure; #[macro_use] extern crate failure;
#[macro_use] extern crate hyper;
#[macro_use] extern crate log; #[macro_use] extern crate log;
#[macro_use] extern crate serde_derive; #[macro_use] extern crate serde_derive;
#[macro_use] extern crate serde_json; #[macro_use] extern crate serde_json;
@ -41,15 +40,14 @@
extern crate chrono; extern crate chrono;
extern crate env_logger; extern crate env_logger;
extern crate medallion; extern crate medallion;
extern crate reqwest;
extern crate serde; extern crate serde;
extern crate systemd; extern crate systemd;
extern crate ureq;
use chrono::offset::LocalResult; use chrono::offset::LocalResult;
use chrono::prelude::*; use chrono::prelude::*;
use failure::ResultExt; use failure::ResultExt;
use reqwest::{header, Client}; use serde_json::{from_str, Value};
use serde_json::Value;
use std::env; use std::env;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{self, Read, ErrorKind, Write}; use std::io::{self, Read, ErrorKind, Write};
@ -69,12 +67,6 @@ const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v
const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone"; const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id"; const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id";
// Google's metadata service requires this header to be present on all
// calls:
//
// https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying
header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
/// Convenience type alias for results using failure's `Error` type. /// Convenience type alias for results using failure's `Error` type.
type Result<T> = std::result::Result<T, failure::Error>; type Result<T> = std::result::Result<T, failure::Error>;
@ -92,16 +84,6 @@ struct Credentials {
} }
lazy_static! { lazy_static! {
/// HTTP client instance preconfigured with the metadata header
/// required by Google.
static ref METADATA_CLIENT: Client = {
let mut headers = header::Headers::new();
headers.set(MetadataFlavor("Google".into()));
Client::builder().default_headers(headers)
.build().expect("Could not create metadata client")
};
/// ID of the GCP project to which to send logs. /// ID of the GCP project to which to send logs.
static ref PROJECT_ID: String = get_project_id(); static ref PROJECT_ID: String = get_project_id();
@ -130,12 +112,25 @@ lazy_static! {
/// Convenience helper for retrieving values from the metadata server. /// Convenience helper for retrieving values from the metadata server.
fn get_metadata(url: &str) -> Result<String> { fn get_metadata(url: &str) -> Result<String> {
let mut output = String::new(); let response = ureq::get(url)
METADATA_CLIENT.get(url).send()? .set("Metadata-Flavor", "Google")
.error_for_status()? .timeout_connect(5000)
.read_to_string(&mut output)?; .timeout_read(5000)
.call();
Ok(output.trim().into()) if response.ok() {
// Whitespace is trimmed to remove newlines from responses.
let body = response.into_string()
.context("Failed to decode metadata response")?
.trim().to_string();
Ok(body)
} else {
let status = response.status_line().to_string();
let body = response.into_string()
.unwrap_or_else(|e| format!("Metadata body error: {}", e));
bail!("Metadata failure: {} ({})", body, status)
}
} }
/// Convenience helper for determining the project ID. /// Convenience helper for determining the project ID.
@ -205,9 +200,8 @@ impl Token {
/// Retrieves a token from the GCP metadata service. Retrieving these /// Retrieves a token from the GCP metadata service. Retrieving these
/// tokens requires no additional authentication. /// tokens requires no additional authentication.
fn get_metadata_token() -> Result<Token> { fn get_metadata_token() -> Result<Token> {
let token: TokenResponse = METADATA_CLIENT let body = get_metadata(METADATA_TOKEN_URL)?;
.get(METADATA_TOKEN_URL) let token: TokenResponse = from_str(&body)?;
.send()?.json()?;
debug!("Fetched new token from metadata service"); debug!("Fetched new token from metadata service");
@ -460,7 +454,6 @@ fn receive_next_record(timeout: Duration, journal: &mut Journal)
/// Stackdriver. /// Stackdriver.
fn receiver_loop(mut journal: Journal) -> Result<()> { fn receiver_loop(mut journal: Journal) -> Result<()> {
let mut token = get_token()?; let mut token = get_token()?;
let client = reqwest::Client::new();
let mut buf: Vec<LogEntry> = Vec::new(); let mut buf: Vec<LogEntry> = Vec::new();
let iteration = Duration::from_millis(500); let iteration = Duration::from_millis(500);
@ -482,7 +475,7 @@ fn receiver_loop(mut journal: Journal) -> Result<()> {
if !buf.is_empty() { if !buf.is_empty() {
let to_flush = mem::replace(&mut buf, Vec::new()); let to_flush = mem::replace(&mut buf, Vec::new());
flush(&client, &mut token, to_flush, journal.cursor()?)?; flush(&mut token, to_flush, journal.cursor()?)?;
} }
trace!("Done outer iteration"); trace!("Done outer iteration");
@ -504,8 +497,7 @@ fn persist_cursor(cursor: String) -> Result<()> {
/// ///
/// If flushing is successful the last cursor position will be /// If flushing is successful the last cursor position will be
/// persisted to disk. /// persisted to disk.
fn flush(client: &Client, fn flush(token: &mut Token,
token: &mut Token,
entries: Vec<LogEntry>, entries: Vec<LogEntry>,
cursor: String) -> Result<()> { cursor: String) -> Result<()> {
if token.is_expired() { if token.is_expired() {
@ -516,7 +508,7 @@ fn flush(client: &Client,
for chunk in entries.chunks(750) { for chunk in entries.chunks(750) {
let request = prepare_request(chunk); let request = prepare_request(chunk);
if let Err(write_error) = write_entries(client, token, request) { if let Err(write_error) = write_entries(token, request) {
error!("Failed to write {} entries: {}", chunk.len(), write_error) error!("Failed to write {} entries: {}", chunk.len(), write_error)
} else { } else {
debug!("Wrote {} entries to Stackdriver", chunk.len()) debug!("Wrote {} entries to Stackdriver", chunk.len())
@ -540,17 +532,25 @@ fn prepare_request(entries: &[LogEntry]) -> Value {
} }
/// Perform the log entry insertion in Stackdriver Logging. /// Perform the log entry insertion in Stackdriver Logging.
fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> { fn write_entries(token: &Token, request: Value) -> Result<()> {
let mut response = client.post(ENTRIES_WRITE_URL) let response = ureq::post(ENTRIES_WRITE_URL)
.header(header::Authorization(format!("Bearer {}", token.token))) .set("Authorization", format!("Bearer {}", token.token).as_str())
.json(&request) // The timeout values are set relatively high, not because of
.send()?; // an expectation of Stackdriver being slow but just to
// eventually hit an error case in case of network troubles.
// Presumably no request in a functioning environment will
// ever hit these limits.
.timeout_connect(2000)
.timeout_read(5000)
.send_json(request);
if response.status().is_success() { if response.ok() {
Ok(()) Ok(())
} else { } else {
let body = response.text().unwrap_or_else(|_| "no response body".into()); let status = response.status_line().to_string();
bail!("{} ({})", body, response.status()) let body = response.into_string()
.unwrap_or_else(|_| "no response body".into());
bail!("Write failure: {} ({})", body, status)
} }
} }