forked from DGNum/stateless-uptime-kuma
37 lines
1.3 KiB
Python
37 lines
1.3 KiB
Python
|
import logging
|
||
|
import re
|
||
|
import sys
|
||
|
|
||
|
import requests
|
||
|
from uptime_kuma_api import MonitorType
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
def hydrate_http_probes(tree, excludes=[]):
|
||
|
for probe in tree.get("monitors", []):
|
||
|
if "type" not in probe.kwargs:
|
||
|
logger.error("Fatal: probes must have a 'type' parameter")
|
||
|
sys.exit(1)
|
||
|
if (
|
||
|
probe.kwargs["type"] == MonitorType.KEYWORD
|
||
|
and probe.kwargs.get("keyword", None) is None
|
||
|
):
|
||
|
logger.debug(f"Hydrating {probe.name}")
|
||
|
if "url" not in probe.kwargs:
|
||
|
logger.error("Fatal: http probe must provide an url")
|
||
|
sys.exit(1)
|
||
|
if "method" not in probe.kwargs:
|
||
|
logger.error("Fatal: http probe must provide a method")
|
||
|
sys.exit(1)
|
||
|
url = probe.kwargs["url"]
|
||
|
method = probe.kwargs["method"]
|
||
|
headers = probe.kwargs.get("headers", None)
|
||
|
body = probe.kwargs.get("body", None)
|
||
|
content = requests.request(method, url, headers=headers, data=body).text
|
||
|
m = re.search("<title>(.*?)</title>", content)
|
||
|
if m is None:
|
||
|
logger.info(f"Didn't find keywords for probe {probe.name}, skipping")
|
||
|
probe.kwargs["keyword"] = m.group(1)
|
||
|
logger.debug(f"New keyword: {m.group(1)}")
|