stateless-uptime-kuma/stateless_uptime_kuma/tree_gen.py

81 lines
3 KiB
Python
Raw Permalink Normal View History

2024-04-16 22:48:52 +02:00
"""
Classes to generate the item tree from json spec
"""
import logging
import sys
from .uptime_kuma import Monitor, Notification, Settings, StatusPage, Tag # type: ignore
2024-04-16 22:48:52 +02:00
logger = logging.getLogger(__name__)
def die_tag_format_error():
logger.error(
2024-04-18 20:26:18 +02:00
"Fatal: You must provide tags in monitors in the format [{name:str, value:str}] (value is optional)"
2024-04-16 22:48:52 +02:00
)
sys.exit(1)
def from_dict(api, tree, autocreate_tags=True):
2024-04-16 22:48:52 +02:00
notif = tree.get("notifications", [])
indexed_notifications = {
name: Notification(api, name, **kwargs) for name, kwargs in notif.items()
}
2024-04-16 22:48:52 +02:00
tags = tree.get("tags", [])
indexed_tags = {name: Tag(api, name, **kwargs) for name, kwargs in tags.items()}
2024-04-16 22:48:52 +02:00
monitors = tree.get("monitors", [])
indexed_monitors = {}
for monitor_name, monitor_kwargs in monitors.items():
2024-04-16 22:48:52 +02:00
associated_tags = []
for tag in monitor_kwargs.get("tags", []):
2024-04-18 20:26:18 +02:00
if not isinstance(tag, dict) or "name" not in tag:
2024-04-16 22:48:52 +02:00
die_tag_format_error()
try:
if autocreate_tags and tag["name"] not in indexed_tags:
indexed_tags[tag["name"]] = Tag(api, name=tag["name"])
2024-04-18 20:26:18 +02:00
associated_tags.append((indexed_tags[tag["name"]], tag.get("value")))
2024-04-16 22:48:52 +02:00
except IndexError:
die_tag_format_error()
monitor_kwargs["tags"] = associated_tags
2024-04-16 22:48:52 +02:00
associated_notifications = [
indexed_notifications[notif]
for notif in monitor_kwargs.get("notifications", [])
2024-04-16 22:48:52 +02:00
]
monitor_kwargs["notifications"] = associated_notifications
indexed_monitors[monitor_name] = Monitor(api, monitor_name, **monitor_kwargs)
status_pages = tree.get("status_pages", [])
indexed_status_pages = {}
for slug, kwargs in status_pages.items():
for group in kwargs.get("publicGroupList", []):
if "monitorList" in group:
monitorList = []
for monitor in group["monitorList"]:
if monitor not in indexed_monitors:
logger.error(
"Fatal: status page is referencing a monitor that doesn't exist"
)
sys.exit(1)
monitorList.append(indexed_monitors[monitor])
group["monitorList"] = monitorList
indexed_status_pages[slug] = StatusPage(api, slug, **kwargs)
settings = []
if "settings" in tree:
parsed_settings = {}
for k, v in tree["settings"].items():
if k.endswith("__FILE"):
with open(v, "rt") as f:
parsed_settings[k[:-len("__FILE")]] = f.read().strip()
else:
parsed_settings[k] = v
settings = [Settings(api, **parsed_settings)]
2024-04-16 22:48:52 +02:00
return {
"monitors": indexed_monitors.values(),
"tags": indexed_tags.values(),
"notifications": indexed_notifications.values(),
"status_pages": indexed_status_pages.values(),
"settings": settings,
2024-04-16 22:48:52 +02:00
}