feat(main): Emit output in chunks of max. 1000 records
Required by the Stackdriver API.
This commit is contained in:
parent
ef638bfa20
commit
2541d25fba
1 changed files with 15 additions and 9 deletions
24
src/main.rs
24
src/main.rs
|
@ -12,10 +12,11 @@ extern crate reqwest;
|
|||
|
||||
mod stackdriver;
|
||||
|
||||
use systemd::journal::*;
|
||||
use std::env;
|
||||
use std::mem;
|
||||
use std::process;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::collections::vec_deque::{VecDeque, Drain};
|
||||
use systemd::journal::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Record {
|
||||
|
@ -53,7 +54,7 @@ impl From<JournalRecord> for Record {
|
|||
/// buffer messages for half a second before flushing them to
|
||||
/// Stackdriver.
|
||||
fn receiver_loop(mut journal: Journal) {
|
||||
let mut buf: VecDeque<Record> = VecDeque::new();
|
||||
let mut buf: Vec<Record> = Vec::new();
|
||||
let iteration = Duration::from_millis(500);
|
||||
|
||||
loop {
|
||||
|
@ -66,22 +67,27 @@ fn receiver_loop(mut journal: Journal) {
|
|||
}
|
||||
|
||||
if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) {
|
||||
buf.push_back(record.into());
|
||||
trace!("Received a new record");
|
||||
buf.push(record.into());
|
||||
}
|
||||
}
|
||||
|
||||
if !buf.is_empty() {
|
||||
flush(buf.drain(..));
|
||||
let to_flush = mem::replace(&mut buf, Vec::new());
|
||||
flush(to_flush);
|
||||
}
|
||||
|
||||
trace!("Done outer iteration");
|
||||
}
|
||||
}
|
||||
|
||||
/// Flushes all drained records to Stackdriver.
|
||||
fn flush(drain: Drain<Record>) {
|
||||
let record_count = drain.count();
|
||||
debug!("Flushed {} records", record_count);
|
||||
/// Flushes all drained records to Stackdriver. Any Stackdriver
|
||||
/// message can at most contain 1000 log entries which means they are
|
||||
/// chunked up here.
|
||||
fn flush(records: Vec<Record>) {
|
||||
for chunk in records.chunks(1000) {
|
||||
debug!("Flushed {} records", chunk.len())
|
||||
}
|
||||
}
|
||||
|
||||
fn main () {
|
||||
|
|
Loading…
Reference in a new issue