stateless-uptime-kuma/stateless_uptime_kuma/hydratation.py
2024-04-19 17:19:50 +02:00

36 lines
1.3 KiB
Python

import logging
import re
import sys
import requests
from uptime_kuma_api import MonitorType
logger = logging.getLogger(__name__)
def hydrate_http_probes(tree, excludes=[]):
for probe in tree.get("monitors", []):
if "type" not in probe.kwargs:
logger.error("Fatal: probes must have a 'type' parameter")
sys.exit(1)
if (
probe.kwargs["type"] == MonitorType.KEYWORD
and probe.kwargs.get("keyword") is None
):
logger.debug(f"Hydrating {probe.name}")
if "url" not in probe.kwargs:
logger.error("Fatal: http probe must provide an url")
sys.exit(1)
if "method" not in probe.kwargs:
logger.error("Fatal: http probe must provide a method")
sys.exit(1)
url = probe.kwargs["url"]
method = probe.kwargs["method"]
headers = probe.kwargs.get("headers")
body = probe.kwargs.get("body")
content = requests.request(method, url, headers=headers, data=body).text
m = re.search("<title>(.*?)</title>", content)
if m is None:
logger.info(f"Didn't find keywords for probe {probe.name}, skipping")
probe.kwargs["keyword"] = m.group(1)
logger.debug(f"New keyword: {m.group(1)}")