feat(users/flokli/nixos/archeology-ec2): automate bucket log parsing
This adds a `parse-bucket-logs.{service,timer}`, running once every night at 3AM UTC, figuring out the last time it was run and parsing bucket logs for all previous days. It invokes the `archeology-parse-bucket-logs` script to produce a .parquet file with the bucket logs in `s3://nix-cache-log/log/` for that day (inside a temporary directory), then on success uploads the produced parquet file to `s3://nix-archeologist/nix-cache-bucket-logs/yyyy-mm-dd.parquet`. Change-Id: Ia75ca8c43f8074fbaa34537ffdba68350c504e52 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10011 Reviewed-by: edef <edef@edef.eu> Tested-by: BuildkiteCI
This commit is contained in:
parent
3fe455cd4a
commit
e4adca0880
2 changed files with 80 additions and 0 deletions
|
@ -6,6 +6,24 @@
|
||||||
../profiles/archeology.nix
|
../profiles/archeology.nix
|
||||||
];
|
];
|
||||||
|
|
||||||
|
systemd.timers.parse-bucket-logs = {
|
||||||
|
wantedBy = [ "multi-user.target" ];
|
||||||
|
timerConfig.OnCalendar = "*-*-* 03:00:00 UTC";
|
||||||
|
};
|
||||||
|
|
||||||
|
systemd.services.parse-bucket-logs = {
|
||||||
|
path = [ depot.users.flokli.archeology.parse-bucket-logs ];
|
||||||
|
serviceConfig = {
|
||||||
|
Type = "oneshot";
|
||||||
|
ExecStart = (pkgs.writers.writePython3 "parse-bucket-logs-continuously"
|
||||||
|
{
|
||||||
|
libraries = [ pkgs.python3Packages.boto3 ];
|
||||||
|
} ./parse-bucket-logs-continuously.py);
|
||||||
|
DynamicUser = "yes";
|
||||||
|
StateDirectory = "parse-bucket-logs";
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
environment.systemPackages = [
|
environment.systemPackages = [
|
||||||
depot.users.flokli.archeology.parse-bucket-logs
|
depot.users.flokli.archeology.parse-bucket-logs
|
||||||
];
|
];
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
import boto3
|
||||||
|
import datetime
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
s3 = boto3.resource('s3')
|
||||||
|
bucket_name = "nix-archeologist"
|
||||||
|
prefix = "nix-cache-bucket-logs/"
|
||||||
|
|
||||||
|
bucket = s3.Bucket(bucket_name)
|
||||||
|
|
||||||
|
key_pattern = re.compile(r'.*\/(?P<y>\d{4})-(?P<m>\d{2})-(?P<d>\d{2})\.parquet$') # noqa: E501
|
||||||
|
|
||||||
|
# get a listing (which is sorted), grab the most recent key
|
||||||
|
last_elem = list(
|
||||||
|
o for o in bucket.objects.filter(Prefix=prefix)
|
||||||
|
if key_pattern.match(o.key)
|
||||||
|
).pop()
|
||||||
|
|
||||||
|
# extract the date of that key.
|
||||||
|
m = key_pattern.search(last_elem.key)
|
||||||
|
last_elem_date = datetime.date(int(m.group("y")), int(m.group("m")), int(m.group("d"))) # noqa: E501
|
||||||
|
|
||||||
|
# get the current date (UTC)
|
||||||
|
now = datetime.datetime.now(tz=datetime.UTC)
|
||||||
|
now_date = datetime.date(now.year, now.month, now.day)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Calculate what date would be processed next.
|
||||||
|
next_elem_date = last_elem_date + datetime.timedelta(days=1)
|
||||||
|
|
||||||
|
# If that's today, we don't want to process it.
|
||||||
|
if next_elem_date == now_date:
|
||||||
|
print("Caught up, would process data from today.")
|
||||||
|
break
|
||||||
|
|
||||||
|
# If we'd be processing data from yesterday, but it's right after midnight,
|
||||||
|
# also don't process - data might still be flushed.
|
||||||
|
if (next_elem_date + datetime.timedelta(days=1) == now_date) and now.hour == 0: # noqa: E501
|
||||||
|
print("Not processing data from previous day right after midnight")
|
||||||
|
break
|
||||||
|
|
||||||
|
src = f"http://nix-cache-log.s3.amazonaws.com/log/{next_elem_date.isoformat()}-*" # noqa: E501
|
||||||
|
|
||||||
|
# Invoke parse-bucket-logs script inside a tempdir and upload on success.
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
work_file_name = os.path.join(td, "output.parquet")
|
||||||
|
args = ["archeology-parse-bucket-logs", src, work_file_name]
|
||||||
|
subprocess.run(
|
||||||
|
args,
|
||||||
|
check=True # throw exception if nonzero exit code
|
||||||
|
)
|
||||||
|
|
||||||
|
dest_key = f"{prefix}{next_elem_date.isoformat()}.parquet"
|
||||||
|
|
||||||
|
# Upload the file
|
||||||
|
print(f"uploading to s3://{bucket_name}{dest_key}")
|
||||||
|
bucket.upload_file(work_file_name, dest_key)
|
||||||
|
|
||||||
|
last_elem_date = next_elem_date
|
Loading…
Reference in a new issue