This commit is contained in:
sinavir 2024-04-16 22:48:52 +02:00
commit 927738ec71
16 changed files with 534 additions and 0 deletions

11
.gitignore vendored Normal file
View file

@ -0,0 +1,11 @@
__pycache__
# nix
result
result-*
# Logs
logs
*.log
*.sw?

12
default.nix Normal file
View file

@ -0,0 +1,12 @@
{ sources ? import ./npins
, nixpkgs ? sources.nixpkgs
, pkgs ? import nixpkgs { overlays = [ (import ./overlay.nix) ]; }
}:
rec {
shell = pkgs.mkShell {
packages = [
python
];
};
python = pkgs.python3.withPackages (ps: [ ps.click ps.click-log ps.uptime-kuma-api ]);
}

67
npins/default.nix Normal file
View file

@ -0,0 +1,67 @@
# Generated by npins. Do not modify; will be overwritten regularly
let
data = builtins.fromJSON (builtins.readFile ./sources.json);
version = data.version;
mkSource = spec:
assert spec ? type; let
path =
if spec.type == "Git"
then mkGitSource spec
else if spec.type == "GitRelease"
then mkGitSource spec
else if spec.type == "PyPi"
then mkPyPiSource spec
else if spec.type == "Channel"
then mkChannelSource spec
else builtins.throw "Unknown source type ${spec.type}";
in
spec // {outPath = path;};
mkGitSource = {
repository,
revision,
url ? null,
hash,
...
}:
assert repository ? type;
# At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository
# In the latter case, there we will always be an url to the tarball
if url != null
then
(builtins.fetchTarball {
inherit url;
sha256 = hash; # FIXME: check nix version & use SRI hashes
})
else
assert repository.type == "Git";
builtins.fetchGit {
url = repository.url;
rev = revision;
# hash = hash;
};
mkPyPiSource = {
url,
hash,
...
}:
builtins.fetchurl {
inherit url;
sha256 = hash;
};
mkChannelSource = {
url,
hash,
...
}:
builtins.fetchTarball {
inherit url;
sha256 = hash;
};
in
if version == 3
then builtins.mapAttrs (_: mkSource) data.pins
else throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`"

17
npins/sources.json Normal file
View file

@ -0,0 +1,17 @@
{
"pins": {
"nixpkgs": {
"type": "Git",
"repository": {
"type": "GitHub",
"owner": "NixOS",
"repo": "nixpkgs"
},
"branch": "nixos-unstable",
"revision": "2726f127c15a4cc9810843b96cad73c7eb39e443",
"url": "https://github.com/NixOS/nixpkgs/archive/2726f127c15a4cc9810843b96cad73c7eb39e443.tar.gz",
"hash": "0109bpmax6nbfs2mpfw2axvk47lbvksgx3d0izrjjhw7fn41i9sh"
}
},
"version": 3
}

7
overlay.nix Normal file
View file

@ -0,0 +1,7 @@
final: prev: {
python3 = prev.python3.override {
packageOverrides = python-self: python-super: {
uptime-kuma-api = python-self.callPackage ./uptime-kuma-api.nix { };
};
};
}

26
pyproject.toml Normal file
View file

@ -0,0 +1,26 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "stateless-uptime-kuma"
version = "0.1.0"
authors = [
{ name="sinavir", email="sinavir@sinavir.fr" },
]
description = "Declare uptime-kuma probes in your nix configuration"
readme = "README.md"
requires-python = ">=3.11"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: European Union Public Licence 1.2 (EUPL 1.2)",
]
dependencies = [
"uptime-kuma-api",
]
[project.urls]
Homepage = "https://git.dgnum.eu/mdebray/stateless-uptime-kuma"
[project.scripts]
stateless_uptime_kuma = "cli:main"

1
shell.nix Normal file
View file

@ -0,0 +1 @@
(import ./. {}).shell

View file

@ -0,0 +1,55 @@
import json
import logging
import sys
import click
import click_log
from uptime_kuma_api import UptimeKumaApi
from .hydratation import hydrate_http_probes
from .tree_gen import from_dict
from .uptime_kuma import Manager
logger = logging.getLogger(__name__)
click_log.basic_config()
@click.group()
def cli():
pass
@cli.command()
@click_log.simple_verbosity_option()
@click.option(
"--file",
"-f",
help="File to import probes data from",
type=click.File("r"),
default=sys.stdin,
)
@click.option(
"--scrape-http-keywords",
"-s",
is_flag=True,
help="Scrape keywords for http probe",
default=False,
)
def apply_json(file, scrape_http_keywords):
"""
Apply json probes
"""
with UptimeKumaApi("http://localhost:3001") as api:
api.login("admin", "123456789a")
logging.debug("Reading json")
data = json.load(file)
logging.debug("Parsing json")
tree = from_dict(api, data)
if scrape_http_keywords:
hydrate_http_probes(tree)
logging.debug("Sync probes")
Manager(api, tree).process()
if __name__ == "__main__":
cli()

View file

@ -0,0 +1,37 @@
import logging
import re
import sys
import requests
from uptime_kuma_api import MonitorType
logger = logging.getLogger(__name__)
def hydrate_http_probes(tree, excludes=[]):
for probe in tree.get("monitors", []):
if "type" not in probe.kwargs:
logger.error("Fatal: probes must have a 'type' parameter")
sys.exit(1)
if (
probe.kwargs["type"] == MonitorType.KEYWORD
and probe.kwargs.get("keyword", None) is None
):
logger.debug(f"Hydrating {probe.name}")
if "url" not in probe.kwargs:
logger.error("Fatal: http probe must provide an url")
sys.exit(1)
if "method" not in probe.kwargs:
logger.error("Fatal: http probe must provide a method")
sys.exit(1)
url = probe.kwargs["url"]
method = probe.kwargs["method"]
headers = probe.kwargs.get("headers", None)
body = probe.kwargs.get("body", None)
content = requests.request(method, url, headers=headers, data=body).text
print(len(content))
m = re.search("<title>(.*?)</title>", content)
if m is None:
logger.info(f"Didn't find keywords for probe {probe.name}, skipping")
probe.kwargs["keyword"] = m.group(1)
logger.debug(f"New keyword: {m.group(1)}")

View file

@ -0,0 +1,49 @@
"""
Classes to generate the item tree from json spec
"""
import logging
import sys
from .uptime_kuma import Monitor, Notification, Tag
logger = logging.getLogger(__name__)
def die_tag_format_error():
logger.error(
"Fatal: You must provide tags in monitors in the format [[name, value]]"
)
sys.exit(1)
def from_dict(api, tree):
notif = tree.get("notifications", [])
indexed_notifications = {n["name"]: Notification(api, **n) for n in notif}
tags = tree.get("tags", [])
indexed_tags = {t["name"]: Tag(api, **t) for t in tags}
monitors = tree.get("monitors", [])
indexed_monitors = {}
for m in monitors:
associated_tags = []
for tag in m.get("tags", []):
if not isinstance(tag, list):
die_tag_format_error()
try:
associated_tags.append((indexed_tags[tag[0]], tag[1]))
except IndexError:
die_tag_format_error()
m["tags"] = associated_tags
associated_notifications = [
indexed_notifications[notif] for notif in m.get("notifications", [])
]
m["notifications"] = associated_notifications
if "name" not in m:
logger.error("Fatal: All monitors must have a name")
sys.exit(1)
indexed_monitors[m["name"]] = Monitor(api, **m)
return {
"monitors": indexed_monitors.values(),
"tags": indexed_tags.values(),
"notifications": indexed_notifications.values(),
}

View file

@ -0,0 +1,167 @@
"""
Classes to make the needed operations to reach the specified state.
"""
import logging
logger = logging.getLogger(__name__)
class Manager:
def __init__(self, api, target_tree={}, prune_unused=False):
self.api = api
self.prune_unused = prune_unused
self.target_tree = target_tree
def process(self):
self.sync_tags()
self.sync_notifications()
self.sync_monitors()
self.save()
def save(self):
for v in self.target_tree.values():
for i in v:
i.save() # this method should be safe to be called in whatever order
def sync_monitors(self):
old = self.api.get_monitors()
new = self.target_tree.get("monitors", [])
self.sync(new, old)
def sync_notifications(self):
old = self.api.get_notifications()
new = self.target_tree.get("notifications", [])
self.sync(new, old)
def sync_tags(self):
old = self.api.get_tags()
new = self.target_tree.get("tags", [])
self.sync(new, old)
def sync(self, new, old):
indexed_old = {elem["name"]: elem for elem in old}
for k in new:
if k.name in indexed_old:
k.id = indexed_old[k.name]["id"]
logger.debug(f"Synced item named {k}")
if k.old_name is not None:
logger.warn(f"Found unused oldName for {k}")
elif k.old_name in indexed_old:
k.id = indexed_old[k.old_name]["id"]
logger.info(f"Found renamed item {k.old_name} -> {k}")
else:
k.id = None # Useless
logger.debug(f"Creating key {k}")
class Item:
def __init__(self, api, name, id, old_name=None):
self.api = api
self.name = name
self.id = id
self.old_name = old_name
self.saved = False
def save(self):
raise NotImplementedError()
def __setattr__(self, name, value):
if name != "saved":
self.saved = False
object.__setattr__(self, name, value)
def __str__(self):
return self.name
class Monitor(Item):
def __init__(
self, api, name, id=None, old_name=None, tags=[], notifications=[], **kwargs
):
super().__init__(api, name, id, old_name)
self.kwargs = kwargs
self.tags = tags
self.notifications = notifications
self.saved = False
def save(self):
if self.saved:
return
for t, _ in self.tags:
t.save()
for n in self.notifications:
n.save()
if self.id is None:
rslt = self.api.add_monitor(
name=self.name,
notificationIDList=[i.id for i in self.notifications],
**self.kwargs,
)
self.id = rslt["monitorID"]
for t, value in self.tags:
self.api.add_monitor_tag(tag_id=t.id, monitor_id=self.id, value=value)
else:
rslt = self.api.edit_monitor(
self.id,
name=self.name,
notificationIDList=[i.id for i in self.notifications],
**self.kwargs,
)
current_tags = set(
(i["tag_id"], i["value"]) for i in self.api.get_monitor(self.id)["tags"]
)
for t, v in self.tags:
if (t, v) not in current_tags:
self.api.add_monitor_tag(tag_id=t.id, monitor_id=self.id, value=v)
self.saved = True
def __repr__(self):
return f"Monitor({str(self)})"
class Tag(Item):
def __init__(self, api, name, id=None, old_name=None, color="#000000"):
super().__init__(api, name, id, old_name)
self.tag = name
self.color = color
def save(self):
if self.saved:
return
if self.id is None:
rslt = self.api.add_tag(
name=self.tag,
color=self.color,
)
self.id = rslt["id"]
else:
self.api.edit_tag(
id_=self.id,
name=self.tag,
color=self.color,
)
self.saved = True
class Notification(Item):
def __init__(self, api, name, id=None, old_name=None, **kwargs):
super().__init__(api, name, id, old_name)
self.kwargs = kwargs
def save(self):
if self.saved:
return
if self.id is None:
rslt = self.api.add_notification(
name=self.name,
**self.kwargs,
)
self.id = rslt["id"]
else:
self.api.edit_notification(
id_=self.id,
name=self.name,
**self.kwargs,
)
self.saved = True

View file

@ -0,0 +1,7 @@
{
"monitors": [{
"name": "test_monitor",
"type": "ping",
"hostname": "localhost"
}]
}

View file

@ -0,0 +1,6 @@
{
"monitors": [{
"old_name": "test_monitor",
"name": "monitor_test"
}]
}

21
tests/02_tags.json Normal file
View file

@ -0,0 +1,21 @@
{
"notifications":[{
"name": "dgn",
"type": "ntfy",
"ntfyAuthenticationMethod": "none",
"ntfytopic": "dgnum-test",
"ntfyserverurl": "https://htfy.sh",
"ntfyPriority": 5
}],
"tags":[{
"name":"test"
}],
"monitors": [{
"tags": [["test", "value"]],
"notifications": [ "dgn" ],
"old_name": "test_monitor",
"name": "test_monitor2",
"type": "ping",
"hostname": "localhost"
}]
}

8
tests/03_keywords.json Normal file
View file

@ -0,0 +1,8 @@
{
"monitors": [{
"name": "google",
"type": "keyword",
"url": "https://google.com",
"method": "get"
}]
}

43
uptime-kuma-api.nix Normal file
View file

@ -0,0 +1,43 @@
{ lib
, buildPythonPackage
, fetchFromGitHub
, setuptools
, wheel
, packaging
, python-socketio
, requests
}:
buildPythonPackage rec {
pname = "uptime-kuma-api";
version = "1.2.1";
pyproject = true;
src = fetchFromGitHub {
owner = "lucasheld";
repo = "uptime-kuma-api";
rev = version;
hash = "sha256-Mgp4bSQPiEeulK9dAl+Di4Nj1HG3oVFGKr1bIdRZI44=";
};
nativeBuildInputs = [
setuptools
wheel
];
propagatedBuildInputs = [
packaging
python-socketio
requests
];
pythonImportsCheck = [ "uptime_kuma_api" ];
meta = with lib; {
description = "A Python wrapper for the Uptime Kuma Socket.IO API";
homepage = "https://github.com/lucasheld/uptime-kuma-api";
changelog = "https://github.com/lucasheld/uptime-kuma-api/blob/${src.rev}/CHANGELOG.md";
license = licenses.mit;
maintainers = with maintainers; [ ];
};
}