forked from DGNum/stateless-uptime-kuma
47 lines
1.8 KiB
Python
47 lines
1.8 KiB
Python
import logging
|
|
import re
|
|
import sys
|
|
|
|
import requests
|
|
from uptime_kuma_api import MonitorType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def hydrate_http_probes(tree, excludes=[], fallback_to_http = True):
|
|
for probe in tree.get("monitors", []):
|
|
if "type" not in probe.kwargs:
|
|
logger.error(f"Fatal: probe {probe.name} must have a 'type' parameter")
|
|
sys.exit(1)
|
|
if (
|
|
probe.kwargs["type"] == MonitorType.KEYWORD
|
|
and probe.kwargs.get("keyword") is None
|
|
):
|
|
logger.debug(f"Hydrating {probe.name}")
|
|
if "url" not in probe.kwargs:
|
|
logger.error("Fatal: http probe must provide an url")
|
|
sys.exit(1)
|
|
if "method" not in probe.kwargs:
|
|
logger.error("Fatal: http probe must provide a method")
|
|
sys.exit(1)
|
|
url = probe.kwargs["url"]
|
|
method = probe.kwargs["method"]
|
|
headers = probe.kwargs.get("headers")
|
|
body = probe.kwargs.get("body")
|
|
req = requests.request(
|
|
method, url, headers=headers, data=body, allow_redirects=True
|
|
)
|
|
req.encoding = req.apparent_encoding
|
|
if req.status_code not in probe.get_status_codes():
|
|
logger.error(f"{probe.name} is not returning the right status code")
|
|
m = re.search("(?s)<title[^>]*>(.*?)</title>", req.text)
|
|
if m is None:
|
|
logger.warn(
|
|
f"Didn't find keywords for probe {probe.name}, falling back on classic HTTP probe"
|
|
)
|
|
logging.debug(req.text)
|
|
if fallback_to_http:
|
|
probe.kwargs["type"] = "http"
|
|
else:
|
|
probe.kwargs["keyword"] = m.group(1).strip()
|
|
logger.debug(f"New keyword: {m.group(1).strip()}")
|