1057 lines
32 KiB
Python
1057 lines
32 KiB
Python
import json
|
|
import random
|
|
import string
|
|
import time
|
|
from contextlib import contextmanager
|
|
from copy import deepcopy
|
|
|
|
import requests
|
|
import socketio
|
|
from packaging.version import parse as parse_version
|
|
|
|
from . import AuthMethod
|
|
from . import DockerType
|
|
from . import Event
|
|
from . import IncidentStyle
|
|
from . import MonitorType
|
|
from . import NotificationType, notification_provider_options, notification_provider_conditions
|
|
from . import ProxyProtocol
|
|
from . import UptimeKumaException
|
|
|
|
|
|
def int_to_bool(data, keys):
|
|
if type(data) == list:
|
|
for d in data:
|
|
int_to_bool(d, keys)
|
|
else:
|
|
for key in keys:
|
|
if key in data:
|
|
data[key] = True if data[key] == 1 else False
|
|
|
|
|
|
def gen_secret(length: int) -> str:
|
|
chars = string.ascii_uppercase + string.ascii_lowercase + string.digits
|
|
return ''.join(random.choice(chars) for _ in range(length))
|
|
|
|
|
|
def _convert_monitor_return(monitor):
|
|
if type(monitor["notificationIDList"]) == dict:
|
|
monitor["notificationIDList"] = [int(i) for i in monitor["notificationIDList"].keys()]
|
|
|
|
|
|
def _convert_monitor_input(kwargs):
|
|
if not kwargs["accepted_statuscodes"]:
|
|
kwargs["accepted_statuscodes"] = ["200-299"]
|
|
|
|
dict_notification_ids = {}
|
|
if kwargs["notificationIDList"]:
|
|
for notification_id in kwargs["notificationIDList"]:
|
|
dict_notification_ids[notification_id] = True
|
|
kwargs["notificationIDList"] = dict_notification_ids
|
|
|
|
if not kwargs["databaseConnectionString"]:
|
|
if kwargs["type"] == MonitorType.SQLSERVER:
|
|
kwargs["databaseConnectionString"] = "Server=<hostname>,<port>;Database=<your database>;User Id=<your user id>;Password=<your password>;Encrypt=<true/false>;TrustServerCertificate=<Yes/No>;Connection Timeout=<int>"
|
|
elif kwargs["type"] == MonitorType.POSTGRES:
|
|
kwargs["databaseConnectionString"] = "postgres://username:password@host:port/database"
|
|
|
|
if kwargs["type"] == MonitorType.PUSH and not kwargs.get("pushToken"):
|
|
kwargs["pushToken"] = gen_secret(10)
|
|
|
|
|
|
def _build_notification_data(
|
|
name: str,
|
|
type: NotificationType,
|
|
isDefault: bool = False,
|
|
applyExisting: bool = False,
|
|
**kwargs
|
|
):
|
|
allowed_kwargs = []
|
|
for keys in notification_provider_options.values():
|
|
allowed_kwargs.extend(keys)
|
|
|
|
for key in kwargs.keys():
|
|
if key not in allowed_kwargs:
|
|
raise TypeError(f"unknown argument '{key}'")
|
|
|
|
data = {
|
|
"name": name,
|
|
"type": type,
|
|
"isDefault": isDefault,
|
|
"applyExisting": applyExisting,
|
|
**kwargs
|
|
}
|
|
return data
|
|
|
|
|
|
def _build_proxy_data(
|
|
protocol: ProxyProtocol,
|
|
host: str,
|
|
port: str,
|
|
auth: bool = False,
|
|
username: str = None,
|
|
password: str = None,
|
|
active: bool = True,
|
|
default: bool = False,
|
|
applyExisting: bool = False,
|
|
):
|
|
data = {
|
|
"protocol": protocol,
|
|
"host": host,
|
|
"port": port,
|
|
"auth": auth,
|
|
"username": username,
|
|
"password": password,
|
|
"active": active,
|
|
"default": default,
|
|
"applyExisting": applyExisting
|
|
}
|
|
return data
|
|
|
|
|
|
def _build_status_page_data(
|
|
slug: str,
|
|
|
|
# config
|
|
id: int,
|
|
title: str,
|
|
description: str = None,
|
|
theme: str = "light",
|
|
published: bool = True,
|
|
showTags: bool = False,
|
|
domainNameList: list = None,
|
|
customCSS: str = "",
|
|
footerText: str = None,
|
|
showPoweredBy: bool = True,
|
|
|
|
icon: str = "/icon.svg",
|
|
publicGroupList: list = None
|
|
):
|
|
if theme not in ["light", "dark"]:
|
|
raise ValueError
|
|
if not domainNameList:
|
|
domainNameList = []
|
|
if not publicGroupList:
|
|
publicGroupList = []
|
|
config = {
|
|
"id": id,
|
|
"slug": slug,
|
|
"title": title,
|
|
"description": description,
|
|
"icon": icon,
|
|
"theme": theme,
|
|
"published": published,
|
|
"showTags": showTags,
|
|
"domainNameList": domainNameList,
|
|
"customCSS": customCSS,
|
|
"footerText": footerText,
|
|
"showPoweredBy": showPoweredBy
|
|
}
|
|
return slug, config, icon, publicGroupList
|
|
|
|
|
|
def _convert_docker_host_input(kwargs):
|
|
if not kwargs["dockerDaemon"]:
|
|
if kwargs["dockerType"] == DockerType.SOCKET:
|
|
kwargs["dockerDaemon"] = "/var/run/docker.sock"
|
|
elif kwargs["dockerType"] == DockerType.TCP:
|
|
kwargs["dockerDaemon"] = "tcp://localhost:2375"
|
|
|
|
|
|
def _build_docker_host_data(
|
|
name: str,
|
|
dockerType: DockerType,
|
|
dockerDaemon: str = None
|
|
):
|
|
data = {
|
|
"name": name,
|
|
"dockerType": dockerType,
|
|
"dockerDaemon": dockerDaemon
|
|
}
|
|
return data
|
|
|
|
|
|
def _check_missing_arguments(required_params, kwargs):
|
|
missing_arguments = []
|
|
for required_param in required_params:
|
|
if kwargs.get(required_param) is None:
|
|
missing_arguments.append(required_param)
|
|
if missing_arguments:
|
|
missing_arguments_str = ", ".join([f"'{i}'" for i in missing_arguments])
|
|
raise TypeError(f"missing {len(missing_arguments)} required argument: {missing_arguments_str}")
|
|
|
|
|
|
def _check_argument_conditions(valid_params, kwargs):
|
|
for valid_param in valid_params:
|
|
if valid_param in kwargs:
|
|
value = kwargs[valid_param]
|
|
conditions = valid_params[valid_param]
|
|
min_ = conditions.get("min")
|
|
max_ = conditions.get("max")
|
|
if min_ is not None and value < min_:
|
|
raise ValueError(f"the value of {valid_param} must not be less than {min_}")
|
|
if max_ is not None and value > max_:
|
|
raise ValueError(f"the value of {valid_param} must not be larger than {max_}")
|
|
|
|
|
|
def _check_arguments_monitor(kwargs):
|
|
required_args = [
|
|
"type",
|
|
"name",
|
|
"interval",
|
|
"maxretries",
|
|
"retryInterval"
|
|
]
|
|
_check_missing_arguments(required_args, kwargs)
|
|
|
|
required_args_by_type = {
|
|
MonitorType.HTTP: ["url", "maxredirects"],
|
|
MonitorType.PORT: ["hostname", "port"],
|
|
MonitorType.PING: ["hostname"],
|
|
MonitorType.KEYWORD: ["url", "keyword", "maxredirects"],
|
|
MonitorType.DNS: ["hostname", "dns_resolve_server", "port"],
|
|
MonitorType.PUSH: [],
|
|
MonitorType.STEAM: ["hostname", "port"],
|
|
MonitorType.MQTT: ["hostname", "port", "mqttTopic"],
|
|
MonitorType.SQLSERVER: [],
|
|
MonitorType.POSTGRES: [],
|
|
MonitorType.DOCKER: ["docker_container", "docker_host"],
|
|
MonitorType.RADIUS: ["radiusUsername", "radiusPassword", "radiusSecret", "radiusCalledStationId", "radiusCallingStationId"]
|
|
}
|
|
type_ = kwargs["type"]
|
|
required_args = required_args_by_type[type_]
|
|
_check_missing_arguments(required_args, kwargs)
|
|
|
|
conditions = dict(
|
|
interval=dict(
|
|
min=20,
|
|
),
|
|
maxretries=dict(
|
|
min=0,
|
|
),
|
|
retryInterval=dict(
|
|
min=20,
|
|
),
|
|
maxredirects=dict(
|
|
min=0,
|
|
),
|
|
port=dict(
|
|
min=0,
|
|
max=65535,
|
|
),
|
|
)
|
|
_check_argument_conditions(conditions, kwargs)
|
|
|
|
|
|
def _check_arguments_notification(kwargs):
|
|
required_args = ["type", "name"]
|
|
_check_missing_arguments(required_args, kwargs)
|
|
|
|
# TODO: collect required notification args from /src/components/notifications/*
|
|
# type_ = kwargs["type"]
|
|
# required_args = notification_provider_options[type_]
|
|
# _check_missing_arguments(required_args, kwargs)
|
|
_check_argument_conditions(notification_provider_conditions, kwargs)
|
|
|
|
|
|
def _check_arguments_proxy(kwargs):
|
|
required_args = ["protocol", "host", "port"]
|
|
if kwargs.get("auth"):
|
|
required_args.extend(["username", "password"])
|
|
_check_missing_arguments(required_args, kwargs)
|
|
|
|
conditions = dict(
|
|
port=dict(
|
|
min=0,
|
|
max=65535,
|
|
)
|
|
)
|
|
_check_argument_conditions(conditions, kwargs)
|
|
|
|
|
|
class UptimeKumaApi(object):
|
|
def __init__(self, url):
|
|
self.url = url
|
|
self.sio = socketio.Client()
|
|
|
|
self._event_data: dict = {
|
|
Event.MONITOR_LIST: None,
|
|
Event.NOTIFICATION_LIST: None,
|
|
Event.PROXY_LIST: None,
|
|
Event.STATUS_PAGE_LIST: None,
|
|
Event.HEARTBEAT_LIST: None,
|
|
Event.IMPORTANT_HEARTBEAT_LIST: None,
|
|
Event.AVG_PING: None,
|
|
Event.UPTIME: None,
|
|
Event.HEARTBEAT: None,
|
|
Event.INFO: None,
|
|
Event.CERT_INFO: None,
|
|
Event.DOCKER_HOST_LIST: None,
|
|
Event.AUTO_LOGIN: None
|
|
}
|
|
|
|
self.sio.on(Event.CONNECT, self._event_connect)
|
|
self.sio.on(Event.DISCONNECT, self._event_disconnect)
|
|
self.sio.on(Event.MONITOR_LIST, self._event_monitor_list)
|
|
self.sio.on(Event.NOTIFICATION_LIST, self._event_notification_list)
|
|
self.sio.on(Event.PROXY_LIST, self._event_proxy_list)
|
|
self.sio.on(Event.STATUS_PAGE_LIST, self._event_status_page_list)
|
|
self.sio.on(Event.HEARTBEAT_LIST, self._event_heartbeat_list)
|
|
self.sio.on(Event.IMPORTANT_HEARTBEAT_LIST, self._event_important_heartbeat_list)
|
|
self.sio.on(Event.AVG_PING, self._event_avg_ping)
|
|
self.sio.on(Event.UPTIME, self._event_uptime)
|
|
self.sio.on(Event.HEARTBEAT, self._event_heartbeat)
|
|
self.sio.on(Event.INFO, self._event_info)
|
|
self.sio.on(Event.CERT_INFO, self._event_cert_info)
|
|
self.sio.on(Event.DOCKER_HOST_LIST, self._event_docker_host_list)
|
|
self.sio.on(Event.AUTO_LOGIN, self._event_auto_login)
|
|
|
|
self.connect()
|
|
|
|
@contextmanager
|
|
def wait_for_event(self, event: Event):
|
|
retries = 200
|
|
event_data_before = deepcopy(self._event_data)
|
|
|
|
try:
|
|
yield
|
|
except:
|
|
raise
|
|
else:
|
|
counter = 0
|
|
while event_data_before[event] == self._event_data[event]:
|
|
time.sleep(0.01)
|
|
counter += 1
|
|
if counter >= retries:
|
|
print("wait_for_event timeout")
|
|
break
|
|
|
|
def _get_event_data(self, event):
|
|
monitor_events = [Event.AVG_PING, Event.UPTIME, Event.HEARTBEAT_LIST, Event.IMPORTANT_HEARTBEAT_LIST, Event.CERT_INFO, Event.HEARTBEAT]
|
|
while self._event_data[event] is None:
|
|
# do not wait for events that are not sent
|
|
if self._event_data[Event.MONITOR_LIST] == {} and event in monitor_events:
|
|
return []
|
|
time.sleep(0.01)
|
|
time.sleep(0.05) # wait for multiple messages
|
|
return deepcopy(self._event_data[event])
|
|
|
|
def _call(self, event, data=None):
|
|
r = self.sio.call(event, data)
|
|
if type(r) == dict and "ok" in r:
|
|
if not r["ok"]:
|
|
raise UptimeKumaException(r["msg"])
|
|
r.pop("ok")
|
|
return r
|
|
|
|
# event handlers
|
|
|
|
def _event_connect(self):
|
|
pass
|
|
|
|
def _event_disconnect(self):
|
|
pass
|
|
|
|
def _event_monitor_list(self, data):
|
|
self._event_data[Event.MONITOR_LIST] = data
|
|
|
|
def _event_notification_list(self, data):
|
|
self._event_data[Event.NOTIFICATION_LIST] = data
|
|
|
|
def _event_proxy_list(self, data):
|
|
self._event_data[Event.PROXY_LIST] = data
|
|
|
|
def _event_status_page_list(self, data):
|
|
self._event_data[Event.STATUS_PAGE_LIST] = data
|
|
|
|
def _event_heartbeat_list(self, id_, data, bool_):
|
|
if self._event_data[Event.HEARTBEAT_LIST] is None:
|
|
self._event_data[Event.HEARTBEAT_LIST] = []
|
|
self._event_data[Event.HEARTBEAT_LIST].append({
|
|
"id": id_,
|
|
"data": data,
|
|
"bool": bool_,
|
|
})
|
|
|
|
def _event_important_heartbeat_list(self, id_, data, bool_):
|
|
if self._event_data[Event.IMPORTANT_HEARTBEAT_LIST] is None:
|
|
self._event_data[Event.IMPORTANT_HEARTBEAT_LIST] = []
|
|
self._event_data[Event.IMPORTANT_HEARTBEAT_LIST].append({
|
|
"id": id_,
|
|
"data": data,
|
|
"bool": bool_,
|
|
})
|
|
|
|
def _event_avg_ping(self, id_, data):
|
|
if self._event_data[Event.AVG_PING] is None:
|
|
self._event_data[Event.AVG_PING] = []
|
|
self._event_data[Event.AVG_PING].append({
|
|
"id": id_,
|
|
"data": data,
|
|
})
|
|
|
|
def _event_uptime(self, id_, hours_24, days_30):
|
|
if self._event_data[Event.UPTIME] is None:
|
|
self._event_data[Event.UPTIME] = []
|
|
self._event_data[Event.UPTIME].append({
|
|
"id": id_,
|
|
"hours_24": hours_24,
|
|
"days_30": days_30,
|
|
})
|
|
|
|
def _event_heartbeat(self, data):
|
|
if self._event_data[Event.HEARTBEAT] is None:
|
|
self._event_data[Event.HEARTBEAT] = []
|
|
self._event_data[Event.HEARTBEAT].append(data)
|
|
|
|
def _event_info(self, data):
|
|
self._event_data[Event.INFO] = data
|
|
|
|
def _event_cert_info(self, id_, data):
|
|
if self._event_data[Event.CERT_INFO] is None:
|
|
self._event_data[Event.CERT_INFO] = []
|
|
self._event_data[Event.CERT_INFO].append({
|
|
"id": id_,
|
|
"data": data,
|
|
})
|
|
|
|
def _event_docker_host_list(self, data):
|
|
self._event_data[Event.DOCKER_HOST_LIST] = data
|
|
|
|
def _event_auto_login(self):
|
|
self._event_data[Event.AUTO_LOGIN] = True
|
|
|
|
# connection
|
|
|
|
def connect(self):
|
|
url = self.url.rstrip("/")
|
|
try:
|
|
self.sio.connect(f'{url}/socket.io/')
|
|
except:
|
|
print("")
|
|
|
|
def disconnect(self):
|
|
self.sio.disconnect()
|
|
|
|
# builder
|
|
|
|
@property
|
|
def version(self):
|
|
info = self.info()
|
|
return info["version"]
|
|
|
|
def _build_monitor_data(
|
|
self,
|
|
type: MonitorType,
|
|
name: str,
|
|
interval: int = 60,
|
|
retryInterval: int = 60,
|
|
resendInterval: int = 0,
|
|
maxretries: int = 0,
|
|
upsideDown: bool = False,
|
|
notificationIDList: list = None,
|
|
|
|
# HTTP, KEYWORD
|
|
url: str = None,
|
|
expiryNotification: bool = False,
|
|
ignoreTls: bool = False,
|
|
maxredirects: int = 10,
|
|
accepted_statuscodes: list = None,
|
|
proxyId: int = None,
|
|
method: str = "GET",
|
|
body: str = None,
|
|
headers: str = None,
|
|
authMethod: AuthMethod = AuthMethod.NONE,
|
|
basic_auth_user: str = None,
|
|
basic_auth_pass: str = None,
|
|
authDomain: str = None,
|
|
authWorkstation: str = None,
|
|
|
|
# KEYWORD
|
|
keyword: str = None,
|
|
|
|
# DNS, PING, STEAM, MQTT
|
|
hostname: str = None,
|
|
|
|
# DNS, STEAM, MQTT
|
|
port: int = 53,
|
|
|
|
# DNS
|
|
dns_resolve_server: str = "1.1.1.1",
|
|
dns_resolve_type: str = "A",
|
|
|
|
# MQTT
|
|
mqttUsername: str = None,
|
|
mqttPassword: str = None,
|
|
mqttTopic: str = None,
|
|
mqttSuccessMessage: str = None,
|
|
|
|
# SQLSERVER, POSTGRES
|
|
databaseConnectionString: str = None,
|
|
databaseQuery: str = None,
|
|
|
|
# DOCKER
|
|
docker_container: str = "",
|
|
docker_host: int = None,
|
|
|
|
# RADIUS
|
|
radiusUsername: str = None,
|
|
radiusPassword: str = None,
|
|
radiusSecret: str = None,
|
|
radiusCalledStationId: str = None,
|
|
radiusCallingStationId: str = None
|
|
):
|
|
data = {
|
|
"type": type,
|
|
"name": name,
|
|
"interval": interval,
|
|
"retryInterval": retryInterval,
|
|
"maxretries": maxretries,
|
|
"notificationIDList": notificationIDList,
|
|
"upsideDown": upsideDown,
|
|
}
|
|
|
|
if parse_version(self.version) >= parse_version("1.18"):
|
|
data.update({
|
|
"resendInterval": resendInterval
|
|
})
|
|
|
|
if type == MonitorType.KEYWORD:
|
|
data.update({
|
|
"keyword": keyword,
|
|
})
|
|
|
|
# HTTP, KEYWORD
|
|
data.update({
|
|
"url": url,
|
|
"expiryNotification": expiryNotification,
|
|
"ignoreTls": ignoreTls,
|
|
"maxredirects": maxredirects,
|
|
"accepted_statuscodes": accepted_statuscodes,
|
|
"proxyId": proxyId,
|
|
"method": method,
|
|
"body": body,
|
|
"headers": headers,
|
|
"authMethod": authMethod,
|
|
})
|
|
|
|
if authMethod in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]:
|
|
data.update({
|
|
"basic_auth_user": basic_auth_user,
|
|
"basic_auth_pass": basic_auth_pass,
|
|
})
|
|
|
|
if authMethod == AuthMethod.NTLM:
|
|
data.update({
|
|
"authDomain": authDomain,
|
|
"authWorkstation": authWorkstation,
|
|
})
|
|
|
|
# PORT, PING, DNS, STEAM, MQTT
|
|
data.update({
|
|
"hostname": hostname,
|
|
})
|
|
|
|
# PORT, DNS, STEAM, MQTT
|
|
data.update({
|
|
"port": port,
|
|
})
|
|
|
|
# DNS
|
|
data.update({
|
|
"dns_resolve_server": dns_resolve_server,
|
|
"dns_resolve_type": dns_resolve_type,
|
|
})
|
|
|
|
# MQTT
|
|
data.update({
|
|
"mqttUsername": mqttUsername,
|
|
"mqttPassword": mqttPassword,
|
|
"mqttTopic": mqttTopic,
|
|
"mqttSuccessMessage": mqttSuccessMessage,
|
|
})
|
|
|
|
# SQLSERVER, POSTGRES
|
|
data.update({
|
|
"databaseConnectionString": databaseConnectionString
|
|
})
|
|
if type in [MonitorType.SQLSERVER, MonitorType.POSTGRES]:
|
|
data.update({
|
|
"databaseQuery": databaseQuery,
|
|
})
|
|
|
|
# DOCKER
|
|
if type == MonitorType.DOCKER:
|
|
data.update({
|
|
"docker_container": docker_container,
|
|
"docker_host": docker_host
|
|
})
|
|
|
|
# RADIUS
|
|
if type == MonitorType.RADIUS:
|
|
data.update({
|
|
"radiusUsername": radiusUsername,
|
|
"radiusPassword": radiusPassword,
|
|
"radiusSecret": radiusSecret,
|
|
"radiusCalledStationId": radiusCalledStationId,
|
|
"radiusCallingStationId": radiusCallingStationId
|
|
})
|
|
|
|
return data
|
|
|
|
# monitor
|
|
|
|
def get_monitors(self):
|
|
r = list(self._get_event_data(Event.MONITOR_LIST).values())
|
|
for monitor in r:
|
|
_convert_monitor_return(monitor)
|
|
int_to_bool(r, ["active"])
|
|
return r
|
|
|
|
def get_monitor(self, id_: int):
|
|
r = self._call('getMonitor', id_)["monitor"]
|
|
_convert_monitor_return(r)
|
|
int_to_bool(r, ["active"])
|
|
return r
|
|
|
|
def pause_monitor(self, id_: int):
|
|
return self._call('pauseMonitor', id_)
|
|
|
|
def resume_monitor(self, id_: int):
|
|
return self._call('resumeMonitor', id_)
|
|
|
|
def delete_monitor(self, id_: int):
|
|
with self.wait_for_event(Event.MONITOR_LIST):
|
|
return self._call('deleteMonitor', id_)
|
|
|
|
def get_monitor_beats(self, id_: int, hours: int):
|
|
r = self._call('getMonitorBeats', (id_, hours))["data"]
|
|
int_to_bool(r, ["important", "status"])
|
|
return r
|
|
|
|
def add_monitor(self, **kwargs):
|
|
data = self._build_monitor_data(**kwargs)
|
|
_convert_monitor_input(data)
|
|
_check_arguments_monitor(data)
|
|
with self.wait_for_event(Event.MONITOR_LIST):
|
|
return self._call('add', data)
|
|
|
|
def edit_monitor(self, id_: int, **kwargs):
|
|
data = self.get_monitor(id_)
|
|
data.update(kwargs)
|
|
_convert_monitor_input(data)
|
|
_check_arguments_monitor(data)
|
|
with self.wait_for_event(Event.MONITOR_LIST):
|
|
return self._call('editMonitor', data)
|
|
|
|
# monitor tags
|
|
|
|
def add_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
|
|
r = self._call('addMonitorTag', (tag_id, monitor_id, value))
|
|
# the monitor list event does not send the updated tags
|
|
self._event_data[Event.MONITOR_LIST][str(monitor_id)] = self.get_monitor(monitor_id)
|
|
return r
|
|
|
|
# editMonitorTag is unused in uptime-kuma
|
|
# def edit_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
|
|
# return self._call('editMonitorTag', (tag_id, monitor_id, value))
|
|
|
|
def delete_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
|
|
r = self._call('deleteMonitorTag', (tag_id, monitor_id, value))
|
|
# the monitor list event does not send the updated tags
|
|
self._event_data[Event.MONITOR_LIST][str(monitor_id)] = self.get_monitor(monitor_id)
|
|
return r
|
|
|
|
# notification
|
|
|
|
def get_notifications(self):
|
|
notifications = self._get_event_data(Event.NOTIFICATION_LIST)
|
|
r = []
|
|
for notification_raw in notifications:
|
|
notification = notification_raw.copy()
|
|
config = json.loads(notification["config"])
|
|
del notification["config"]
|
|
notification.update(config)
|
|
r.append(notification)
|
|
return r
|
|
|
|
def get_notification(self, id_: int):
|
|
notifications = self.get_notifications()
|
|
for notification in notifications:
|
|
if notification["id"] == id_:
|
|
return notification
|
|
raise UptimeKumaException("notification does not exist")
|
|
|
|
def test_notification(self, **kwargs):
|
|
data = _build_notification_data(**kwargs)
|
|
|
|
_check_arguments_notification(data)
|
|
return self._call('testNotification', data)
|
|
|
|
def add_notification(self, **kwargs):
|
|
data = _build_notification_data(**kwargs)
|
|
|
|
_check_arguments_notification(data)
|
|
with self.wait_for_event(Event.NOTIFICATION_LIST):
|
|
return self._call('addNotification', (data, None))
|
|
|
|
def edit_notification(self, id_: int, **kwargs):
|
|
notification = self.get_notification(id_)
|
|
|
|
# remove old notification provider options from notification object
|
|
if "type" in kwargs and kwargs["type"] != notification["type"]:
|
|
for provider in notification_provider_options:
|
|
provider_options = notification_provider_options[provider]
|
|
if provider != kwargs["type"]:
|
|
for option in provider_options:
|
|
if option in notification:
|
|
del notification[option]
|
|
|
|
notification.update(kwargs)
|
|
_check_arguments_notification(notification)
|
|
with self.wait_for_event(Event.NOTIFICATION_LIST):
|
|
return self._call('addNotification', (notification, id_))
|
|
|
|
def delete_notification(self, id_: int):
|
|
with self.wait_for_event(Event.NOTIFICATION_LIST):
|
|
return self._call('deleteNotification', id_)
|
|
|
|
def check_apprise(self):
|
|
return self._call('checkApprise')
|
|
|
|
# proxy
|
|
|
|
def get_proxies(self):
|
|
r = self._get_event_data(Event.PROXY_LIST)
|
|
int_to_bool(r, ["auth", "active", "default", "applyExisting"])
|
|
return r
|
|
|
|
def get_proxy(self, id_: int):
|
|
proxies = self.get_proxies()
|
|
for proxy in proxies:
|
|
if proxy["id"] == id_:
|
|
return proxy
|
|
raise UptimeKumaException("proxy does not exist")
|
|
|
|
def add_proxy(self, **kwargs):
|
|
data = _build_proxy_data(**kwargs)
|
|
|
|
_check_arguments_proxy(data)
|
|
with self.wait_for_event(Event.PROXY_LIST):
|
|
return self._call('addProxy', (data, None))
|
|
|
|
def edit_proxy(self, id_: int, **kwargs):
|
|
proxy = self.get_proxy(id_)
|
|
proxy.update(kwargs)
|
|
_check_arguments_proxy(proxy)
|
|
with self.wait_for_event(Event.PROXY_LIST):
|
|
return self._call('addProxy', (proxy, id_))
|
|
|
|
def delete_proxy(self, id_: int):
|
|
with self.wait_for_event(Event.PROXY_LIST):
|
|
return self._call('deleteProxy', id_)
|
|
|
|
# status page
|
|
|
|
def get_status_pages(self):
|
|
return list(self._get_event_data(Event.STATUS_PAGE_LIST).values())
|
|
|
|
def get_status_page(self, slug: str):
|
|
r1 = self._call('getStatusPage', slug)
|
|
r2 = requests.get(f"{self.url}/api/status-page/{slug}").json()
|
|
|
|
config = r1["config"]
|
|
config.update(r2["config"])
|
|
return {
|
|
**config,
|
|
"incident": r2["incident"],
|
|
"publicGroupList": r2["publicGroupList"]
|
|
}
|
|
|
|
def add_status_page(self, slug: str, title: str):
|
|
with self.wait_for_event(Event.STATUS_PAGE_LIST):
|
|
return self._call('addStatusPage', (title, slug))
|
|
|
|
def delete_status_page(self, slug: str):
|
|
r = self._call('deleteStatusPage', slug)
|
|
|
|
# uptime kuma does not send the status page list event when a status page is deleted
|
|
for status_page in self._event_data[Event.STATUS_PAGE_LIST].values():
|
|
if status_page["slug"] == slug:
|
|
status_page_id = status_page["id"]
|
|
del self._event_data[Event.STATUS_PAGE_LIST][str(status_page_id)]
|
|
break
|
|
|
|
return r
|
|
|
|
def save_status_page(self, slug: str, **kwargs):
|
|
status_page = self.get_status_page(slug)
|
|
status_page.pop("incident")
|
|
status_page.update(kwargs)
|
|
data = _build_status_page_data(**status_page)
|
|
r = self._call('saveStatusPage', data)
|
|
|
|
# uptime kuma does not send the status page list event when a status page is saved
|
|
status_page = self._call('getStatusPage', slug)["config"]
|
|
status_page_id = status_page["id"]
|
|
if self._event_data[Event.STATUS_PAGE_LIST] is None:
|
|
self._event_data[Event.STATUS_PAGE_LIST] = {}
|
|
self._event_data[Event.STATUS_PAGE_LIST][str(status_page_id)] = status_page
|
|
|
|
return r
|
|
|
|
def post_incident(
|
|
self,
|
|
slug: str,
|
|
title: str,
|
|
content: str,
|
|
style: IncidentStyle = IncidentStyle.PRIMARY
|
|
):
|
|
incident = {
|
|
"title": title,
|
|
"content": content,
|
|
"style": style
|
|
}
|
|
r = self._call('postIncident', (slug, incident))["incident"]
|
|
self.save_status_page(slug)
|
|
return r
|
|
|
|
def unpin_incident(self, slug: str):
|
|
r = self._call('unpinIncident', slug)
|
|
self.save_status_page(slug)
|
|
return r
|
|
|
|
# heartbeat
|
|
|
|
def get_heartbeats(self):
|
|
r = self._get_event_data(Event.HEARTBEAT_LIST)
|
|
for i in r:
|
|
int_to_bool(i["data"], ["important", "status"])
|
|
return r
|
|
|
|
def get_important_heartbeats(self):
|
|
r = self._get_event_data(Event.IMPORTANT_HEARTBEAT_LIST)
|
|
for i in r:
|
|
int_to_bool(i["data"], ["important", "status"])
|
|
return r
|
|
|
|
def get_heartbeat(self):
|
|
r = self._get_event_data(Event.HEARTBEAT)
|
|
int_to_bool(r, ["important", "status"])
|
|
return r
|
|
|
|
# avg ping
|
|
|
|
def avg_ping(self):
|
|
return self._get_event_data(Event.AVG_PING)
|
|
|
|
# cert info
|
|
|
|
def cert_info(self):
|
|
return self._get_event_data(Event.CERT_INFO)
|
|
|
|
# uptime
|
|
|
|
def uptime(self):
|
|
return self._get_event_data(Event.UPTIME)
|
|
|
|
# info
|
|
|
|
def info(self) -> dict:
|
|
r = self._get_event_data(Event.INFO)
|
|
return r
|
|
|
|
# clear
|
|
|
|
def clear_events(self, monitor_id: int):
|
|
return self._call('clearEvents', monitor_id)
|
|
|
|
def clear_heartbeats(self, monitor_id: int):
|
|
return self._call('clearHeartbeats', monitor_id)
|
|
|
|
def clear_statistics(self):
|
|
return self._call('clearStatistics')
|
|
|
|
# tags
|
|
|
|
def get_tags(self):
|
|
return self._call('getTags')["tags"]
|
|
|
|
def get_tag(self, id_: int):
|
|
tags = self.get_tags()
|
|
for tag in tags:
|
|
if tag["id"] == id_:
|
|
return tag
|
|
raise UptimeKumaException("tag does not exist")
|
|
|
|
# not working, monitor id required?
|
|
# def edit_tag(self, id_: int, name: str, color: str):
|
|
# return self._call('editTag', {
|
|
# "id": id_,
|
|
# "name": name,
|
|
# "color": color
|
|
# })
|
|
|
|
def delete_tag(self, id_: int):
|
|
return self._call('deleteTag', id_)
|
|
|
|
def add_tag(self, name: str, color: str):
|
|
return self._call('addTag', {
|
|
"name": name,
|
|
"color": color,
|
|
"new": True
|
|
})["tag"]
|
|
|
|
# settings
|
|
|
|
def get_settings(self):
|
|
r = self._call('getSettings')["data"]
|
|
return r
|
|
|
|
def set_settings(
|
|
self,
|
|
password: str = None, # only required if disableAuth is true
|
|
|
|
# about
|
|
checkUpdate: bool = True,
|
|
checkBeta: bool = False,
|
|
|
|
# monitor history
|
|
keepDataPeriodDays: int = 180,
|
|
|
|
# general
|
|
entryPage: str = "dashboard",
|
|
searchEngineIndex: bool = False,
|
|
primaryBaseURL: str = "",
|
|
steamAPIKey: str = "",
|
|
|
|
# notifications
|
|
tlsExpiryNotifyDays: list = None,
|
|
|
|
# security
|
|
disableAuth: bool = False,
|
|
|
|
# reverse proxy
|
|
trustProxy: bool = False
|
|
):
|
|
if not tlsExpiryNotifyDays:
|
|
tlsExpiryNotifyDays = [7, 14, 21]
|
|
|
|
data = {
|
|
"checkUpdate": checkUpdate,
|
|
"checkBeta": checkBeta,
|
|
"keepDataPeriodDays": keepDataPeriodDays,
|
|
"entryPage": entryPage,
|
|
"searchEngineIndex": searchEngineIndex,
|
|
"primaryBaseURL": primaryBaseURL,
|
|
"steamAPIKey": steamAPIKey,
|
|
"tlsExpiryNotifyDays": tlsExpiryNotifyDays,
|
|
"disableAuth": disableAuth
|
|
}
|
|
if parse_version(self.version) >= parse_version("1.18"):
|
|
data.update({
|
|
"trustProxy": trustProxy
|
|
})
|
|
return self._call('setSettings', (data, password))
|
|
|
|
def change_password(self, old_password: str, new_password: str):
|
|
return self._call('changePassword', {
|
|
"currentPassword": old_password,
|
|
"newPassword": new_password,
|
|
})
|
|
|
|
def upload_backup(self, json_data, import_handle: str = "skip"):
|
|
if import_handle not in ["overwrite", "skip", "keep"]:
|
|
raise ValueError()
|
|
return self._call('uploadBackup', (json_data, import_handle))
|
|
|
|
# 2FA
|
|
|
|
def twofa_status(self):
|
|
return self._call('twoFAStatus')
|
|
|
|
def prepare_2fa(self, password: str):
|
|
return self._call('prepare2FA', password)
|
|
|
|
def verify_token(self, token: str, password: str):
|
|
return self._call('verifyToken', (token, password))
|
|
|
|
def save_2fa(self, password: str):
|
|
return self._call('save2FA', password)
|
|
|
|
def disable_2fa(self, password: str):
|
|
return self._call('disable2FA', password)
|
|
|
|
# login
|
|
|
|
def _wait_for_auto_login(self):
|
|
while self._event_data[Event.AUTO_LOGIN] is None:
|
|
time.sleep(0.01)
|
|
|
|
def login(self, username: str = None, password: str = None, token: str = ""):
|
|
# autologin
|
|
if username is None and password is None:
|
|
self._wait_for_auto_login()
|
|
return
|
|
|
|
return self._call('login', {
|
|
"username": username,
|
|
"password": password,
|
|
"token": token
|
|
})
|
|
|
|
def login_by_token(self, token: str):
|
|
return self._call('loginByToken', token)
|
|
|
|
def logout(self):
|
|
return self._call('logout')
|
|
|
|
# setup
|
|
|
|
def need_setup(self):
|
|
return self._call('needSetup')
|
|
|
|
def setup(self, username: str, password: str):
|
|
return self._call("setup", (username, password))
|
|
|
|
# database
|
|
|
|
def get_database_size(self):
|
|
return self._call('getDatabaseSize')
|
|
|
|
def shrink_database(self):
|
|
return self._call('shrinkDatabase')
|
|
|
|
# docker host
|
|
|
|
def get_docker_hosts(self):
|
|
r = self._get_event_data(Event.DOCKER_HOST_LIST)
|
|
return r
|
|
|
|
def get_docker_host(self, id_: int):
|
|
docker_hosts = self.get_docker_hosts()
|
|
for docker_host in docker_hosts:
|
|
if docker_host["id"] == id_:
|
|
return docker_host
|
|
raise UptimeKumaException("docker host does not exist")
|
|
|
|
def test_docker_host(self, **kwargs):
|
|
data = _build_docker_host_data(**kwargs)
|
|
return self._call('testDockerHost', data)
|
|
|
|
def add_docker_host(self, **kwargs):
|
|
data = _build_docker_host_data(**kwargs)
|
|
_convert_docker_host_input(data)
|
|
with self.wait_for_event(Event.DOCKER_HOST_LIST):
|
|
return self._call('addDockerHost', (data, None))
|
|
|
|
def edit_docker_host(self, id_: int, **kwargs):
|
|
data = self.get_docker_host(id_)
|
|
data.update(kwargs)
|
|
_convert_docker_host_input(data)
|
|
with self.wait_for_event(Event.DOCKER_HOST_LIST):
|
|
return self._call('addDockerHost', (data, id_))
|
|
|
|
def delete_docker_host(self, id_: int):
|
|
with self.wait_for_event(Event.DOCKER_HOST_LIST):
|
|
return self._call('deleteDockerHost', id_)
|