From e4adca0880547f2ea825aeec5f64671c6c0324ae Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sun, 12 Nov 2023 18:09:03 +0200 Subject: [PATCH] feat(users/flokli/nixos/archeology-ec2): automate bucket log parsing This adds a `parse-bucket-logs.{service,timer}`, running once every night at 3AM UTC, figuring out the last time it was run and parsing bucket logs for all previous days. It invokes the `archeology-parse-bucket-logs` script to produce a .parquet file with the bucket logs in `s3://nix-cache-log/log/` for that day (inside a temporary directory), then on success uploads the produced parquet file to `s3://nix-archeologist/nix-cache-bucket-logs/yyyy-mm-dd.parquet`. Change-Id: Ia75ca8c43f8074fbaa34537ffdba68350c504e52 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10011 Reviewed-by: edef Tested-by: BuildkiteCI --- .../nixos/archeology-ec2/configuration.nix | 18 ++++++ .../parse-bucket-logs-continuously.py | 62 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py diff --git a/users/flokli/nixos/archeology-ec2/configuration.nix b/users/flokli/nixos/archeology-ec2/configuration.nix index af10771ad..f0fc0c5d0 100644 --- a/users/flokli/nixos/archeology-ec2/configuration.nix +++ b/users/flokli/nixos/archeology-ec2/configuration.nix @@ -6,6 +6,24 @@ ../profiles/archeology.nix ]; + systemd.timers.parse-bucket-logs = { + wantedBy = [ "multi-user.target" ]; + timerConfig.OnCalendar = "*-*-* 03:00:00 UTC"; + }; + + systemd.services.parse-bucket-logs = { + path = [ depot.users.flokli.archeology.parse-bucket-logs ]; + serviceConfig = { + Type = "oneshot"; + ExecStart = (pkgs.writers.writePython3 "parse-bucket-logs-continuously" + { + libraries = [ pkgs.python3Packages.boto3 ]; + } ./parse-bucket-logs-continuously.py); + DynamicUser = "yes"; + StateDirectory = "parse-bucket-logs"; + }; + }; + environment.systemPackages = [ depot.users.flokli.archeology.parse-bucket-logs ]; diff --git a/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py b/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py new file mode 100644 index 000000000..f6ec8fb77 --- /dev/null +++ b/users/flokli/nixos/archeology-ec2/parse-bucket-logs-continuously.py @@ -0,0 +1,62 @@ +import boto3 +import datetime +import os +import re +import subprocess +import tempfile + +s3 = boto3.resource('s3') +bucket_name = "nix-archeologist" +prefix = "nix-cache-bucket-logs/" + +bucket = s3.Bucket(bucket_name) + +key_pattern = re.compile(r'.*\/(?P\d{4})-(?P\d{2})-(?P\d{2})\.parquet$') # noqa: E501 + +# get a listing (which is sorted), grab the most recent key +last_elem = list( + o for o in bucket.objects.filter(Prefix=prefix) + if key_pattern.match(o.key) +).pop() + +# extract the date of that key. +m = key_pattern.search(last_elem.key) +last_elem_date = datetime.date(int(m.group("y")), int(m.group("m")), int(m.group("d"))) # noqa: E501 + +# get the current date (UTC) +now = datetime.datetime.now(tz=datetime.UTC) +now_date = datetime.date(now.year, now.month, now.day) + +while True: + # Calculate what date would be processed next. + next_elem_date = last_elem_date + datetime.timedelta(days=1) + + # If that's today, we don't want to process it. + if next_elem_date == now_date: + print("Caught up, would process data from today.") + break + + # If we'd be processing data from yesterday, but it's right after midnight, + # also don't process - data might still be flushed. + if (next_elem_date + datetime.timedelta(days=1) == now_date) and now.hour == 0: # noqa: E501 + print("Not processing data from previous day right after midnight") + break + + src = f"http://nix-cache-log.s3.amazonaws.com/log/{next_elem_date.isoformat()}-*" # noqa: E501 + + # Invoke parse-bucket-logs script inside a tempdir and upload on success. + with tempfile.TemporaryDirectory() as td: + work_file_name = os.path.join(td, "output.parquet") + args = ["archeology-parse-bucket-logs", src, work_file_name] + subprocess.run( + args, + check=True # throw exception if nonzero exit code + ) + + dest_key = f"{prefix}{next_elem_date.isoformat()}.parquet" + + # Upload the file + print(f"uploading to s3://{bucket_name}{dest_key}") + bucket.upload_file(work_file_name, dest_key) + + last_elem_date = next_elem_date