uptime-kuma-api/uptime_kuma_api/api.py

2553 lines
88 KiB
Python
Raw Normal View History

2022-07-05 22:12:37 +02:00
import json
import random
import string
import time
from contextlib import contextmanager
from copy import deepcopy
from typing import Any
2022-07-02 16:00:54 +02:00
2022-08-26 14:04:43 +02:00
import requests
2022-07-02 16:00:54 +02:00
import socketio
2022-09-07 13:03:10 +02:00
from packaging.version import parse as parse_version
2022-07-02 16:00:54 +02:00
from . import AuthMethod
from . import DockerType
from . import Event
from . import IncidentStyle
2022-07-02 16:00:54 +02:00
from . import MonitorType
2022-09-07 13:03:10 +02:00
from . import NotificationType, notification_provider_options, notification_provider_conditions
2022-07-02 20:26:18 +02:00
from . import ProxyProtocol
from . import UptimeKumaException
from .docstrings import append_docstring, monitor_docstring, notification_docstring, proxy_docstring, \
docker_host_docstring
2022-07-05 22:12:37 +02:00
2022-08-03 11:56:02 +02:00
def int_to_bool(data, keys) -> None:
2022-07-05 22:12:37 +02:00
if type(data) == list:
for d in data:
int_to_bool(d, keys)
else:
for key in keys:
if key in data:
data[key] = True if data[key] == 1 else False
def gen_secret(length: int) -> str:
chars = string.ascii_uppercase + string.ascii_lowercase + string.digits
return ''.join(random.choice(chars) for _ in range(length))
def _convert_monitor_return(monitor) -> None:
if type(monitor["notificationIDList"]) == dict:
monitor["notificationIDList"] = [int(i) for i in monitor["notificationIDList"].keys()]
def _convert_monitor_input(kwargs) -> None:
2022-09-06 13:41:51 +02:00
if not kwargs["accepted_statuscodes"]:
kwargs["accepted_statuscodes"] = ["200-299"]
dict_notification_ids = {}
if kwargs["notificationIDList"]:
for notification_id in kwargs["notificationIDList"]:
dict_notification_ids[notification_id] = True
kwargs["notificationIDList"] = dict_notification_ids
2022-07-05 22:12:37 +02:00
2022-09-07 13:03:10 +02:00
if not kwargs["databaseConnectionString"]:
if kwargs["type"] == MonitorType.SQLSERVER:
kwargs["databaseConnectionString"] = "Server=<hostname>,<port>;Database=<your database>;User Id=<your user id>;Password=<your password>;Encrypt=<true/false>;TrustServerCertificate=<Yes/No>;Connection Timeout=<int>"
elif kwargs["type"] == MonitorType.POSTGRES:
kwargs["databaseConnectionString"] = "postgres://username:password@host:port/database"
if kwargs["type"] == MonitorType.PUSH and not kwargs.get("pushToken"):
kwargs["pushToken"] = gen_secret(10)
2022-07-05 22:12:37 +02:00
2022-08-02 11:58:49 +02:00
def _build_notification_data(
name: str,
2022-08-03 11:56:02 +02:00
type: NotificationType,
isDefault: bool = False,
applyExisting: bool = False,
2022-08-02 11:58:49 +02:00
**kwargs
) -> dict:
allowed_kwargs = []
for keys in notification_provider_options.values():
allowed_kwargs.extend(keys)
for key in kwargs.keys():
if key not in allowed_kwargs:
raise TypeError(f"unknown argument '{key}'")
data = {
2022-07-05 22:12:37 +02:00
"name": name,
2022-08-03 11:56:02 +02:00
"type": type,
"isDefault": isDefault,
"applyExisting": applyExisting,
2022-07-05 22:12:37 +02:00
**kwargs
}
return data
2022-07-05 22:12:37 +02:00
def _build_proxy_data(
protocol: ProxyProtocol,
host: str,
port: str,
auth: bool = False,
username: str = None,
password: str = None,
active: bool = True,
default: bool = False,
2022-08-03 11:56:02 +02:00
applyExisting: bool = False,
) -> dict:
data = {
2022-07-05 22:12:37 +02:00
"protocol": protocol,
"host": host,
"port": port,
"auth": auth,
"username": username,
"password": password,
"active": active,
"default": default,
2022-08-03 11:56:02 +02:00
"applyExisting": applyExisting
2022-07-05 22:12:37 +02:00
}
return data
2022-07-02 16:00:54 +02:00
2022-07-06 21:29:40 +02:00
def _build_status_page_data(
slug: str,
# config
2022-08-03 11:56:02 +02:00
id: int,
2022-07-06 21:29:40 +02:00
title: str,
description: str = None,
theme: str = "light",
2022-07-06 21:29:40 +02:00
published: bool = True,
2022-08-03 11:56:02 +02:00
showTags: bool = False,
domainNameList: list = None,
customCSS: str = "",
footerText: str = None,
showPoweredBy: bool = True,
2022-07-06 21:29:40 +02:00
2022-08-03 11:56:02 +02:00
icon: str = "/icon.svg",
2022-08-26 14:04:43 +02:00
publicGroupList: list = None
) -> (str, dict, str, list):
if theme not in ["light", "dark"]:
raise ValueError
2022-08-03 11:56:02 +02:00
if not domainNameList:
domainNameList = []
2022-08-26 14:04:43 +02:00
if not publicGroupList:
publicGroupList = []
2022-07-06 21:29:40 +02:00
config = {
2022-08-03 11:56:02 +02:00
"id": id,
2022-07-06 21:29:40 +02:00
"slug": slug,
"title": title,
"description": description,
2022-08-03 11:56:02 +02:00
"icon": icon,
"theme": theme,
2022-07-06 21:29:40 +02:00
"published": published,
2022-08-03 11:56:02 +02:00
"showTags": showTags,
"domainNameList": domainNameList,
"customCSS": customCSS,
"footerText": footerText,
"showPoweredBy": showPoweredBy
2022-07-06 21:29:40 +02:00
}
2022-08-26 14:04:43 +02:00
return slug, config, icon, publicGroupList
2022-07-06 21:29:40 +02:00
def _convert_docker_host_input(kwargs) -> None:
2022-09-07 13:03:10 +02:00
if not kwargs["dockerDaemon"]:
if kwargs["dockerType"] == DockerType.SOCKET:
kwargs["dockerDaemon"] = "/var/run/docker.sock"
elif kwargs["dockerType"] == DockerType.TCP:
kwargs["dockerDaemon"] = "tcp://localhost:2375"
def _build_docker_host_data(
name: str,
dockerType: DockerType,
dockerDaemon: str = None
) -> dict:
2022-09-07 13:03:10 +02:00
data = {
"name": name,
"dockerType": dockerType,
"dockerDaemon": dockerDaemon
}
return data
def _check_missing_arguments(required_params, kwargs) -> None:
2022-07-09 22:15:41 +02:00
missing_arguments = []
for required_param in required_params:
2022-08-03 11:56:02 +02:00
if kwargs.get(required_param) is None:
2022-07-09 22:15:41 +02:00
missing_arguments.append(required_param)
if missing_arguments:
missing_arguments_str = ", ".join([f"'{i}'" for i in missing_arguments])
raise TypeError(f"missing {len(missing_arguments)} required argument: {missing_arguments_str}")
def _check_argument_conditions(valid_params, kwargs) -> None:
2022-07-10 18:07:11 +02:00
for valid_param in valid_params:
2022-08-03 11:56:02 +02:00
if valid_param in kwargs:
value = kwargs[valid_param]
2022-07-10 18:07:11 +02:00
conditions = valid_params[valid_param]
min_ = conditions.get("min")
max_ = conditions.get("max")
if min_ is not None and value < min_:
raise ValueError(f"the value of {valid_param} must not be less than {min_}")
if max_ is not None and value > max_:
raise ValueError(f"the value of {valid_param} must not be larger than {max_}")
def _check_arguments_monitor(kwargs) -> None:
2022-07-10 18:07:11 +02:00
required_args = [
2022-08-03 11:56:02 +02:00
"type",
2022-07-09 22:15:41 +02:00
"name",
2022-08-03 11:56:02 +02:00
"interval",
"maxretries",
"retryInterval"
2022-07-09 22:15:41 +02:00
]
2022-08-03 11:56:02 +02:00
_check_missing_arguments(required_args, kwargs)
2022-07-09 22:15:41 +02:00
2022-07-10 18:07:11 +02:00
required_args_by_type = {
2022-08-03 11:56:02 +02:00
MonitorType.HTTP: ["url", "maxredirects"],
2022-07-09 22:15:41 +02:00
MonitorType.PORT: ["hostname", "port"],
MonitorType.PING: ["hostname"],
2022-08-03 11:56:02 +02:00
MonitorType.KEYWORD: ["url", "keyword", "maxredirects"],
2022-07-09 22:15:41 +02:00
MonitorType.DNS: ["hostname", "dns_resolve_server", "port"],
MonitorType.PUSH: [],
MonitorType.STEAM: ["hostname", "port"],
2022-08-03 11:56:02 +02:00
MonitorType.MQTT: ["hostname", "port", "mqttTopic"],
2022-07-09 22:15:41 +02:00
MonitorType.SQLSERVER: [],
2022-09-07 13:03:10 +02:00
MonitorType.POSTGRES: [],
MonitorType.DOCKER: ["docker_container", "docker_host"],
MonitorType.RADIUS: ["radiusUsername", "radiusPassword", "radiusSecret", "radiusCalledStationId", "radiusCallingStationId"]
2022-07-09 22:15:41 +02:00
}
2022-08-03 11:56:02 +02:00
type_ = kwargs["type"]
2022-07-10 18:07:11 +02:00
required_args = required_args_by_type[type_]
2022-08-03 11:56:02 +02:00
_check_missing_arguments(required_args, kwargs)
2022-07-10 18:07:11 +02:00
conditions = dict(
interval=dict(
min=20,
),
maxretries=dict(
min=0,
),
retryInterval=dict(
min=20,
),
maxredirects=dict(
min=0,
),
port=dict(
min=0,
max=65535,
),
)
2022-08-03 11:56:02 +02:00
_check_argument_conditions(conditions, kwargs)
2022-07-10 18:07:11 +02:00
def _check_arguments_notification(kwargs) -> None:
2022-08-03 11:56:02 +02:00
required_args = ["type", "name"]
_check_missing_arguments(required_args, kwargs)
2022-07-10 18:07:11 +02:00
# TODO: collect required notification args from /src/components/notifications/*
# type_ = kwargs["type"]
# required_args = notification_provider_options[type_]
# _check_missing_arguments(required_args, kwargs)
2022-09-07 13:03:10 +02:00
_check_argument_conditions(notification_provider_conditions, kwargs)
2022-07-09 22:15:41 +02:00
def _check_arguments_proxy(kwargs) -> None:
2022-07-10 18:07:11 +02:00
required_args = ["protocol", "host", "port"]
2022-08-02 23:47:56 +02:00
if kwargs.get("auth"):
2022-07-10 18:07:11 +02:00
required_args.extend(["username", "password"])
2022-08-03 11:56:02 +02:00
_check_missing_arguments(required_args, kwargs)
2022-07-10 18:07:11 +02:00
conditions = dict(
port=dict(
min=0,
max=65535,
)
)
2022-08-03 11:56:02 +02:00
_check_argument_conditions(conditions, kwargs)
2022-07-09 22:15:41 +02:00
2022-07-02 16:00:54 +02:00
class UptimeKumaApi(object):
"""This class is used to communicate with Uptime Kuma.
Example::
Import UptimeKumaApi from the library and specify the Uptime Kuma server url (e.g. 'http://127.0.0.1:3001'), username and password to initialize the connection.
>>> from uptime_kuma_api import UptimeKumaApi
>>> api = UptimeKumaApi('INSERT_URL')
>>> api.login('INSERT_USERNAME', 'INSERT_PASSWORD')
Now you can call one of the existing methods of the instance. For example create a new monitor:
>>> api.add_monitor(
... type=MonitorType.HTTP,
... name="Google",
... url="https://google.com"
... )
{
'msg': 'Added Successfully.',
'monitorId': 1
}
At the end, the connection to the API must be disconnected so that the program does not block.
>>> api.disconnect()
:param url: The url to the Uptime Kuma instance. For example ``http://127.0.0.1:3001``
:type url: str
"""
def __init__(self, url: str) -> None:
self.url = url
2022-07-02 16:00:54 +02:00
self.sio = socketio.Client()
2022-08-02 21:32:28 +02:00
self._event_data: dict = {
2022-08-05 15:52:19 +02:00
Event.MONITOR_LIST: None,
Event.NOTIFICATION_LIST: None,
Event.PROXY_LIST: None,
Event.STATUS_PAGE_LIST: None,
Event.HEARTBEAT_LIST: None,
Event.IMPORTANT_HEARTBEAT_LIST: None,
Event.AVG_PING: None,
Event.UPTIME: None,
Event.HEARTBEAT: None,
Event.INFO: None,
2022-09-07 13:03:10 +02:00
Event.CERT_INFO: None,
Event.DOCKER_HOST_LIST: None,
Event.AUTO_LOGIN: None
2022-07-02 16:00:54 +02:00
}
2022-08-05 15:52:19 +02:00
self.sio.on(Event.CONNECT, self._event_connect)
self.sio.on(Event.DISCONNECT, self._event_disconnect)
self.sio.on(Event.MONITOR_LIST, self._event_monitor_list)
self.sio.on(Event.NOTIFICATION_LIST, self._event_notification_list)
self.sio.on(Event.PROXY_LIST, self._event_proxy_list)
self.sio.on(Event.STATUS_PAGE_LIST, self._event_status_page_list)
self.sio.on(Event.HEARTBEAT_LIST, self._event_heartbeat_list)
self.sio.on(Event.IMPORTANT_HEARTBEAT_LIST, self._event_important_heartbeat_list)
self.sio.on(Event.AVG_PING, self._event_avg_ping)
self.sio.on(Event.UPTIME, self._event_uptime)
self.sio.on(Event.HEARTBEAT, self._event_heartbeat)
self.sio.on(Event.INFO, self._event_info)
self.sio.on(Event.CERT_INFO, self._event_cert_info)
2022-09-07 13:03:10 +02:00
self.sio.on(Event.DOCKER_HOST_LIST, self._event_docker_host_list)
self.sio.on(Event.AUTO_LOGIN, self._event_auto_login)
2022-07-02 16:00:54 +02:00
self.connect()
2022-07-02 16:00:54 +02:00
@contextmanager
def wait_for_event(self, event: Event) -> None:
retries = 200
event_data_before = deepcopy(self._event_data)
try:
yield
except:
raise
else:
counter = 0
while event_data_before[event] == self._event_data[event]:
time.sleep(0.01)
counter += 1
if counter >= retries:
print("wait_for_event timeout")
break
def _get_event_data(self, event) -> Any:
2022-08-05 15:52:19 +02:00
monitor_events = [Event.AVG_PING, Event.UPTIME, Event.HEARTBEAT_LIST, Event.IMPORTANT_HEARTBEAT_LIST, Event.CERT_INFO, Event.HEARTBEAT]
2022-07-05 22:12:37 +02:00
while self._event_data[event] is None:
# do not wait for events that are not sent
2022-08-05 15:52:19 +02:00
if self._event_data[Event.MONITOR_LIST] == {} and event in monitor_events:
return []
2022-07-02 16:00:54 +02:00
time.sleep(0.01)
time.sleep(0.05) # wait for multiple messages
return deepcopy(self._event_data[event])
2022-07-02 16:00:54 +02:00
def _call(self, event, data=None) -> Any:
r = self.sio.call(event, data)
if type(r) == dict and "ok" in r:
if not r["ok"]:
raise UptimeKumaException(r["msg"])
r.pop("ok")
return r
2022-07-02 16:00:54 +02:00
# event handlers
def _event_connect(self) -> None:
2022-07-02 16:00:54 +02:00
pass
def _event_disconnect(self) -> None:
2022-07-02 16:00:54 +02:00
pass
def _event_monitor_list(self, data) -> None:
2022-08-05 15:52:19 +02:00
self._event_data[Event.MONITOR_LIST] = data
2022-07-02 16:00:54 +02:00
def _event_notification_list(self, data) -> None:
2022-08-05 15:52:19 +02:00
self._event_data[Event.NOTIFICATION_LIST] = data
2022-07-02 16:00:54 +02:00
def _event_proxy_list(self, data) -> None:
2022-08-05 15:52:19 +02:00
self._event_data[Event.PROXY_LIST] = data
2022-07-02 16:00:54 +02:00
def _event_status_page_list(self, data) -> None:
2022-08-05 15:52:19 +02:00
self._event_data[Event.STATUS_PAGE_LIST] = data
2022-07-02 16:00:54 +02:00
def _event_heartbeat_list(self, id_, data, bool_) -> None:
2022-08-05 15:52:19 +02:00
if self._event_data[Event.HEARTBEAT_LIST] is None:
self._event_data[Event.HEARTBEAT_LIST] = []
self._event_data[Event.HEARTBEAT_LIST].append({
2022-07-02 16:00:54 +02:00
"id": id_,
"data": data,
"bool": bool_,
})
def _event_important_heartbeat_list(self, id_, data, bool_) -> None:
2022-08-05 15:52:19 +02:00
if self._event_data[Event.IMPORTANT_HEARTBEAT_LIST] is None:
self._event_data[Event.IMPORTANT_HEARTBEAT_LIST] = []
self._event_data[Event.IMPORTANT_HEARTBEAT_LIST].append({
2022-07-02 16:00:54 +02:00
"id": id_,
"data": data,
"bool": bool_,
})
def _event_avg_ping(self, id_, data) -> None:
2022-08-05 15:52:19 +02:00
if self._event_data[Event.AVG_PING] is None:
self._event_data[Event.AVG_PING] = []
self._event_data[Event.AVG_PING].append({
2022-07-02 16:00:54 +02:00
"id": id_,
"data": data,
})
def _event_uptime(self, monitor_id, duration, uptime) -> None:
2022-08-05 15:52:19 +02:00
if self._event_data[Event.UPTIME] is None:
self._event_data[Event.UPTIME] = []
self._event_data[Event.UPTIME].append({
"id": monitor_id,
"duration": duration,
"uptime": uptime,
2022-07-02 16:00:54 +02:00
})
def _event_heartbeat(self, data) -> None:
2022-08-05 15:52:19 +02:00
if self._event_data[Event.HEARTBEAT] is None:
self._event_data[Event.HEARTBEAT] = []
self._event_data[Event.HEARTBEAT].append(data)
2022-07-02 16:00:54 +02:00
def _event_info(self, data) -> None:
2022-08-05 15:52:19 +02:00
self._event_data[Event.INFO] = data
2022-07-02 16:00:54 +02:00
def _event_cert_info(self, id_, data) -> None:
2022-08-05 15:52:19 +02:00
if self._event_data[Event.CERT_INFO] is None:
self._event_data[Event.CERT_INFO] = []
self._event_data[Event.CERT_INFO].append({
"id": id_,
"data": data,
})
def _event_docker_host_list(self, data) -> None:
2022-09-07 13:03:10 +02:00
self._event_data[Event.DOCKER_HOST_LIST] = data
def _event_auto_login(self) -> None:
self._event_data[Event.AUTO_LOGIN] = True
2022-07-02 16:00:54 +02:00
# connection
def connect(self) -> None:
"""
Connects to Uptime Kuma.
Called automatically when the UptimeKumaApi instance is created.
"""
url = self.url.rstrip("/")
try:
self.sio.connect(f'{url}/socket.io/')
except:
raise UptimeKumaException("unable to connect")
def disconnect(self) -> None:
"""
Disconnects from Uptime Kuma.
2022-07-02 16:00:54 +02:00
Needs to be called to prevent blocking the program.
"""
2022-07-02 16:00:54 +02:00
self.sio.disconnect()
2022-09-07 13:03:10 +02:00
# builder
@property
def version(self) -> str:
2022-09-07 13:03:10 +02:00
info = self.info()
return info["version"]
def _build_monitor_data(
self,
type: MonitorType,
name: str,
interval: int = 60,
retryInterval: int = 60,
resendInterval: int = 0,
maxretries: int = 0,
upsideDown: bool = False,
notificationIDList: list = None,
# HTTP, KEYWORD
url: str = None,
expiryNotification: bool = False,
ignoreTls: bool = False,
maxredirects: int = 10,
accepted_statuscodes: list = None,
proxyId: int = None,
method: str = "GET",
body: str = None,
headers: str = None,
authMethod: AuthMethod = AuthMethod.NONE,
basic_auth_user: str = None,
basic_auth_pass: str = None,
authDomain: str = None,
authWorkstation: str = None,
# KEYWORD
keyword: str = None,
# DNS, PING, STEAM, MQTT
hostname: str = None,
# DNS, STEAM, MQTT
port: int = 53,
# DNS
dns_resolve_server: str = "1.1.1.1",
dns_resolve_type: str = "A",
# MQTT
mqttUsername: str = None,
mqttPassword: str = None,
mqttTopic: str = None,
mqttSuccessMessage: str = None,
# SQLSERVER, POSTGRES
databaseConnectionString: str = None,
databaseQuery: str = None,
# DOCKER
docker_container: str = "",
docker_host: int = None,
# RADIUS
radiusUsername: str = None,
radiusPassword: str = None,
radiusSecret: str = None,
radiusCalledStationId: str = None,
radiusCallingStationId: str = None
) -> dict:
2022-09-07 13:03:10 +02:00
data = {
"type": type,
"name": name,
"interval": interval,
"retryInterval": retryInterval,
"maxretries": maxretries,
"notificationIDList": notificationIDList,
"upsideDown": upsideDown,
}
if parse_version(self.version) >= parse_version("1.18"):
data.update({
"resendInterval": resendInterval
})
if type == MonitorType.KEYWORD:
data.update({
"keyword": keyword,
})
# HTTP, KEYWORD
data.update({
"url": url,
"expiryNotification": expiryNotification,
"ignoreTls": ignoreTls,
"maxredirects": maxredirects,
"accepted_statuscodes": accepted_statuscodes,
"proxyId": proxyId,
"method": method,
"body": body,
"headers": headers,
"authMethod": authMethod,
})
if authMethod in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]:
data.update({
"basic_auth_user": basic_auth_user,
"basic_auth_pass": basic_auth_pass,
})
if authMethod == AuthMethod.NTLM:
data.update({
"authDomain": authDomain,
"authWorkstation": authWorkstation,
})
# PORT, PING, DNS, STEAM, MQTT
2022-09-07 13:03:10 +02:00
data.update({
"hostname": hostname,
})
# PORT, DNS, STEAM, MQTT
2022-09-07 13:03:10 +02:00
data.update({
"port": port,
})
# DNS
data.update({
"dns_resolve_server": dns_resolve_server,
"dns_resolve_type": dns_resolve_type,
})
# MQTT
data.update({
"mqttUsername": mqttUsername,
"mqttPassword": mqttPassword,
"mqttTopic": mqttTopic,
"mqttSuccessMessage": mqttSuccessMessage,
})
# SQLSERVER, POSTGRES
data.update({
"databaseConnectionString": databaseConnectionString
})
if type in [MonitorType.SQLSERVER, MonitorType.POSTGRES]:
data.update({
"databaseQuery": databaseQuery,
})
# DOCKER
if type == MonitorType.DOCKER:
data.update({
"docker_container": docker_container,
"docker_host": docker_host
})
# RADIUS
if type == MonitorType.RADIUS:
data.update({
"radiusUsername": radiusUsername,
"radiusPassword": radiusPassword,
"radiusSecret": radiusSecret,
"radiusCalledStationId": radiusCalledStationId,
"radiusCallingStationId": radiusCallingStationId
})
return data
# monitor
2022-07-02 16:00:54 +02:00
def get_monitors(self) -> list:
"""
Get all monitors.
:return: A list of monitors.
Example::
>>> api.get_monitors()
[
{
'id': 1,
'name': 'Google',
'url': 'https://google.com',
'method': 'GET',
'hostname': None,
'port': 53,
'maxretries': 0,
'weight': 2000,
'active': True,
'type': 'http',
'interval': 60,
'retryInterval': 60,
'resendInterval': 0,
'keyword': None,
'expiryNotification': False,
'ignoreTls': False,
'upsideDown': False,
'maxredirects': 10,
'accepted_statuscodes': [
'200-299'
],
'dns_resolve_type': 'A',
'dns_resolve_server': '1.1.1.1',
'dns_last_result': None,
'pushToken': None,
'docker_container': None,
'docker_host': None,
'proxyId': None,
'notificationIDList': [],
'tags': [],
'mqttUsername': None,
'mqttPassword': None,
'mqttTopic': None,
'mqttSuccessMessage': None,
'databaseConnectionString': None,
'databaseQuery': None,
'authMethod': '',
'authWorkstation': None,
'authDomain': None,
'radiusUsername': None,
'radiusPassword': None,
'radiusCalledStationId': None,
'radiusCallingStationId': None,
'radiusSecret': None,
'headers': None,
'body': None,
'basic_auth_user': None,
'basic_auth_pass': None
}
]
"""
2022-08-05 15:52:19 +02:00
r = list(self._get_event_data(Event.MONITOR_LIST).values())
for monitor in r:
_convert_monitor_return(monitor)
int_to_bool(r, ["active"])
return r
2022-07-02 16:00:54 +02:00
def get_monitor(self, id_: int) -> dict:
"""
Get a monitor.
:param id_: The monitor id.
:type id_: int
:return: The monitor.
Example::
>>> api.get_monitor(1)
{
'id': 1,
'name': 'Google',
'url': 'https://google.com',
'method': 'GET',
'hostname': None,
'port': 53,
'maxretries': 0,
'weight': 2000,
'active': True,
'type': 'http',
'interval': 60,
'retryInterval': 60,
'resendInterval': 0,
'keyword': None,
'expiryNotification': False,
'ignoreTls': False,
'upsideDown': False,
'maxredirects': 10,
'accepted_statuscodes': [
'200-299'
],
'dns_resolve_type': 'A',
'dns_resolve_server': '1.1.1.1',
'dns_last_result': None,
'pushToken': None,
'docker_container': None,
'docker_host': None,
'proxyId': None,
'notificationIDList': [],
'tags': [],
'mqttUsername': None,
'mqttPassword': None,
'mqttTopic': None,
'mqttSuccessMessage': None,
'databaseConnectionString': None,
'databaseQuery': None,
'authMethod': '',
'authWorkstation': None,
'authDomain': None,
'radiusUsername': None,
'radiusPassword': None,
'radiusCalledStationId': None,
'radiusCallingStationId': None,
'radiusSecret': None,
'headers': None,
'body': None,
'basic_auth_user': None,
'basic_auth_pass': None
}
"""
r = self._call('getMonitor', id_)["monitor"]
_convert_monitor_return(r)
int_to_bool(r, ["active"])
return r
2022-07-02 16:00:54 +02:00
def pause_monitor(self, id_: int) -> dict:
"""
Pauses a monitor.
:param id_: The monitor id.
:type id_: int
:return: The server response.
Example::
>>> api.pause_monitor(1)
{
'msg': 'Paused Successfully.'
}
"""
2022-07-09 19:52:21 +02:00
return self._call('pauseMonitor', id_)
2022-07-02 16:00:54 +02:00
def resume_monitor(self, id_: int) -> dict:
"""
Resumes a monitor.
:param id_: The monitor id.
:type id_: int
:return: The server response.
Example::
>>> api.resume_monitor(1)
{
'msg': 'Resumed Successfully.'
}
"""
2022-07-09 19:52:21 +02:00
return self._call('resumeMonitor', id_)
2022-07-02 16:00:54 +02:00
def delete_monitor(self, id_: int) -> dict:
"""
Deletes a monitor.
:param id_: The monitor id.
:type id_: int
:return: The server response.
Example::
>>> api.delete_monitor(1)
{
'msg': 'Deleted Successfully.'
}
"""
with self.wait_for_event(Event.MONITOR_LIST):
return self._call('deleteMonitor', id_)
2022-07-02 16:00:54 +02:00
def get_monitor_beats(self, id_: int, hours: int) -> list:
"""
Get monitor beats for a specific monitor in a time range.
:param id_: The monitor id.
:type id_: int
:param hours: Period time in hours from now.
:type hours: int
:return: The server response.
Example::
>>> api.get_monitor_beats(1, 6)
[
{
'down_count': 0,
'duration': 0,
'id': 25,
'important': True,
'monitor_id': 1,
'msg': '200 - OK',
'ping': 201,
'status': True,
'time': '2022-12-15 12:38:42.661'
},
{
'down_count': 0,
'duration': 60,
'id': 26,
'important': False,
'monitor_id': 1,
'msg': '200 - OK',
'ping': 193,
'status': True,
'time': '2022-12-15 12:39:42.878'
},
...
]
"""
r = self._call('getMonitorBeats', (id_, hours))["data"]
int_to_bool(r, ["important", "status"])
return r
2022-07-02 16:00:54 +02:00
@append_docstring(monitor_docstring("add"))
def add_monitor(self, **kwargs) -> dict:
"""
Adds a new monitor.
:return: The server response.
Example::
>>> api.add_monitor(
... type=MonitorType.HTTP,
... name="Google",
... url="https://google.com"
... )
{
'msg': 'Added Successfully.',
'monitorID': 1
}
"""
2022-09-07 13:03:10 +02:00
data = self._build_monitor_data(**kwargs)
_convert_monitor_input(data)
2022-07-10 18:07:11 +02:00
_check_arguments_monitor(data)
with self.wait_for_event(Event.MONITOR_LIST):
return self._call('add', data)
2022-07-02 16:00:54 +02:00
@append_docstring(monitor_docstring("edit"))
def edit_monitor(self, id_: int, **kwargs) -> dict:
"""
Edits an existing monitor.
:param id_: The monitor id.
:type id_: int
:return: The server response.
Example::
>>> api.edit_monitor(1, interval=20)
{
'monitorID': 1,
'msg': 'Saved.'
}
"""
data = self.get_monitor(id_)
2022-07-09 19:52:21 +02:00
data.update(kwargs)
_convert_monitor_input(data)
2022-07-10 18:07:11 +02:00
_check_arguments_monitor(data)
with self.wait_for_event(Event.MONITOR_LIST):
return self._call('editMonitor', data)
2022-07-02 16:00:54 +02:00
# monitor tags
def add_monitor_tag(self, tag_id: int, monitor_id: int, value: str = "") -> dict:
"""
Add a tag to a monitor.
:param tag_id: Id of the tag.
:type tag_id: int
:param monitor_id: Id of the monitor to add the tag to.
:type monitor_id: int
:param value: (optional) Value of the tag.
:type value: str
:return: The server response.
Example::
>>> api.add_monitor_tag(
... tag_id=1,
... monitor_id=1,
... value="test"
... )
{
'msg': 'Added Successfully.'
}
"""
r = self._call('addMonitorTag', (tag_id, monitor_id, value))
# the monitor list event does not send the updated tags
self._event_data[Event.MONITOR_LIST][str(monitor_id)] = self.get_monitor(monitor_id)
return r
2022-07-02 16:00:54 +02:00
# editMonitorTag is unused in uptime-kuma
2022-07-05 22:12:37 +02:00
# def edit_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
# return self._call('editMonitorTag', (tag_id, monitor_id, value))
2022-07-02 16:00:54 +02:00
def delete_monitor_tag(self, tag_id: int, monitor_id: int, value: str = "") -> dict:
"""
Delete a tag from a monitor.
:param tag_id: Id of the tag to remove.
:type tag_id: id
:param monitor_id: Id of monitor to remove the tag from.
:type monitor_id: id
:param value: (optional) Value of the tag.
:type value: str
:return: The server response.
Example::
>>> api.delete_monitor_tag(
... tag_id=1,
... monitor_id=1,
... value="test"
... )
{
'msg': 'Deleted Successfully.'
}
"""
r = self._call('deleteMonitorTag', (tag_id, monitor_id, value))
# the monitor list event does not send the updated tags
self._event_data[Event.MONITOR_LIST][str(monitor_id)] = self.get_monitor(monitor_id)
return r
2022-07-02 16:00:54 +02:00
# notification
2022-07-02 16:00:54 +02:00
def get_notifications(self) -> list:
"""
Get all notifications.
:return: All notifications.
Example::
>>> api.get_notifications()
[
{
'active': True,
'applyExisting': True,
'id': 1,
'isDefault': True,
'name': 'notification 1',
'pushAPIKey': '123456789',
'type': 'PushByTechulus',
'userId': 1
}
]
"""
2022-08-05 15:52:19 +02:00
notifications = self._get_event_data(Event.NOTIFICATION_LIST)
r = []
for notification_raw in notifications:
2022-07-05 22:12:37 +02:00
notification = notification_raw.copy()
config = json.loads(notification["config"])
del notification["config"]
notification.update(config)
r.append(notification)
return r
2022-07-05 22:12:37 +02:00
def get_notification(self, id_: int) -> dict:
"""
Get a notification.
:param id_: Id of the notification to get.
:type id_: int
:return: The notification.
Example::
>>> api.get_notification(1)
{
'active': True,
'applyExisting': True,
'id': 1,
'isDefault': True,
'name': 'notification 1',
'pushAPIKey': '123456789',
'type': 'PushByTechulus',
'userId': 1
}
"""
2022-07-05 22:12:37 +02:00
notifications = self.get_notifications()
for notification in notifications:
if notification["id"] == id_:
return notification
raise UptimeKumaException("notification does not exist")
2022-07-02 16:00:54 +02:00
@append_docstring(notification_docstring("test"))
def test_notification(self, **kwargs) -> dict:
"""
Test a notification.
:return: The server response.
Example::
>>> api.test_notification(
... name="notification 1",
... isDefault=True,
... applyExisting=True,
... type=NotificationType.PUSHBYTECHULUS,
... pushAPIKey="INSERT_PUSH_API_KEY"
... )
{
'ok': True,
'msg': 'Sent Successfully.'
}
"""
2022-07-09 22:15:41 +02:00
data = _build_notification_data(**kwargs)
2022-07-10 18:07:11 +02:00
_check_arguments_notification(data)
return self._call('testNotification', data)
2022-07-02 16:00:54 +02:00
@append_docstring(notification_docstring("add"))
def add_notification(self, **kwargs) -> dict:
"""
Add a notification.
:return: The server response.
Example::
>>> api.add_notification(
... name="notification 1",
... isDefault=True,
... applyExisting=True,
... type=NotificationType.PUSHBYTECHULUS,
... pushAPIKey="123456789"
... )
{
'id': 1,
'msg': 'Saved'
}
"""
2022-07-09 22:15:41 +02:00
data = _build_notification_data(**kwargs)
2022-07-10 18:07:11 +02:00
_check_arguments_notification(data)
with self.wait_for_event(Event.NOTIFICATION_LIST):
return self._call('addNotification', (data, None))
2022-07-02 16:00:54 +02:00
@append_docstring(notification_docstring("edit"))
def edit_notification(self, id_: int, **kwargs) -> dict:
"""
Edit a notification.
:param id_: Id of the notification to edit.
:type id_: int
:return: The server response.
Example::
>>> api.edit_notification(
... name="notification 1 edited",
... isDefault=False,
... applyExisting=False,
... type=NotificationType.PUSHDEER,
... pushdeerKey="987654321"
... )
{
'id': 1,
'msg': 'Saved'
}
"""
2022-07-05 22:12:37 +02:00
notification = self.get_notification(id_)
2022-08-03 11:56:02 +02:00
# remove old notification provider options from notification object
if "type" in kwargs and kwargs["type"] != notification["type"]:
for provider in notification_provider_options:
provider_options = notification_provider_options[provider]
2022-08-03 11:56:02 +02:00
if provider != kwargs["type"]:
for option in provider_options:
if option in notification:
del notification[option]
notification.update(kwargs)
2022-07-10 18:07:11 +02:00
_check_arguments_notification(notification)
with self.wait_for_event(Event.NOTIFICATION_LIST):
return self._call('addNotification', (notification, id_))
2022-07-02 16:00:54 +02:00
def delete_notification(self, id_: int) -> dict:
"""
Delete a notification.
:param id_: Id of the notification to delete.
:type id_: int
:return: The server response.
Example::
>>> api.delete_notification(1)
{
'msg': 'Deleted'
}
"""
with self.wait_for_event(Event.NOTIFICATION_LIST):
return self._call('deleteNotification', id_)
2022-07-02 16:00:54 +02:00
def check_apprise(self) -> bool:
"""
Check if apprise exists.
:return: The server response.
Example::
>>> api.check_apprise()
True
"""
return self._call('checkApprise')
2022-07-02 16:00:54 +02:00
# proxy
def get_proxies(self) -> list:
"""
Get all proxies.
:return: All proxies.
Example::
>>> api.get_proxies()
[
{
'active': True,
'auth': True,
'createdDate': '2022-12-15 16:24:24',
'default': False,
'host': '127.0.0.1',
'id': 1,
'password': 'password',
'port': 8080,
'protocol': 'http',
'userId': 1,
'username': 'username'
}
]
"""
2022-08-05 15:52:19 +02:00
r = self._get_event_data(Event.PROXY_LIST)
2022-08-03 11:56:02 +02:00
int_to_bool(r, ["auth", "active", "default", "applyExisting"])
return r
2022-07-05 22:12:37 +02:00
def get_proxy(self, id_: int) -> dict:
"""
Get a proxy.
:param id_: Id of the proxy to get.
:type id_: int
:return: The proxy.
Example::
>>> api.get_proxy(1)
{
'active': True,
'auth': True,
'createdDate': '2022-12-15 16:24:24',
'default': False,
'host': '127.0.0.1',
'id': 1,
'password': 'password',
'port': 8080,
'protocol': 'http',
'userId': 1,
'username': 'username'
}
"""
2022-07-05 22:12:37 +02:00
proxies = self.get_proxies()
for proxy in proxies:
if proxy["id"] == id_:
return proxy
raise UptimeKumaException("proxy does not exist")
2022-07-02 16:00:54 +02:00
@append_docstring(proxy_docstring("add"))
def add_proxy(self, **kwargs) -> dict:
"""
Add a proxy.
:return: The server response.
Example::
>>> api.add_proxy(
... protocol=ProxyProtocol.HTTP,
... host="127.0.0.1",
... port=8080,
... auth=True,
... username="username",
... password="password",
... active=True,
... default=False,
... applyExisting=False
... )
{
'id': 1,
'msg': 'Saved'
}
"""
2022-07-09 22:15:41 +02:00
data = _build_proxy_data(**kwargs)
2022-07-10 18:07:11 +02:00
_check_arguments_proxy(data)
with self.wait_for_event(Event.PROXY_LIST):
return self._call('addProxy', (data, None))
2022-07-02 20:26:18 +02:00
@append_docstring(proxy_docstring("edit"))
def edit_proxy(self, id_: int, **kwargs) -> dict:
"""
Edit a proxy.
:param id_: Id of the proxy to edit.
:type id_: int
:return: The server response.
Example::
>>> api.edit_proxy(1,
... protocol=ProxyProtocol.HTTPS,
... host="127.0.0.2",
... port=8888
... )
{
'id': 1,
'msg': 'Saved'
}
"""
proxy = self.get_proxy(id_)
proxy.update(kwargs)
2022-07-10 18:07:11 +02:00
_check_arguments_proxy(proxy)
with self.wait_for_event(Event.PROXY_LIST):
return self._call('addProxy', (proxy, id_))
2022-07-02 20:26:18 +02:00
def delete_proxy(self, id_: int) -> dict:
"""
Delete a proxy.
:param id_: Id of the proxy to delete.
:type id_: int
:return: The server response.
Example::
>>> api.delete_proxy(1)
{
'msg': 'Deleted'
}
"""
with self.wait_for_event(Event.PROXY_LIST):
return self._call('deleteProxy', id_)
2022-07-02 20:26:18 +02:00
2022-07-02 16:00:54 +02:00
# status page
def get_status_pages(self) -> list:
"""
Get all status pages.
:return: All status pages.
Example::
>>> api.get_status_pages()
[
{
'customCSS': '',
'description': 'description 1',
'domainNameList': [],
'footerText': None,
'icon': '/icon.svg',
'id': 1,
'published': True,
'showPoweredBy': False,
'showTags': False,
'slug': 'slug1',
'theme': 'light',
'title': 'status page 1'
}
]
"""
return list(self._get_event_data(Event.STATUS_PAGE_LIST).values())
2022-07-02 20:40:14 +02:00
def get_status_page(self, slug: str) -> dict:
"""
Get a status page.
:param slug: Slug
:type slug: str
:return: The status page.
Example::
>>> api.get_status_page("slug1")
{
'customCSS': '',
'description': 'description 1',
'domainNameList': [],
'footerText': None,
'icon': '/icon.svg',
'id': 1,
'incident': {
'content': 'content 1',
'createdDate': '2022-12-15 16:51:43',
'id': 1,
'lastUpdatedDate': None,
'pin': 1,
'style': 'danger',
'title': 'title 1'
},
'publicGroupList': [
{
'id': 1,
'monitorList': [
{
'id': 1,
'name': 'monitor 1',
'sendUrl': 0
}
],
'name': 'Services',
'weight': 1
}
],
'published': True,
'showPoweredBy': False,
'showTags': False,
'slug': 'slug1',
'theme': 'light',
'title': 'status page 1'
}
"""
2022-08-26 14:04:43 +02:00
r1 = self._call('getStatusPage', slug)
r2 = requests.get(f"{self.url}/api/status-page/{slug}").json()
config = r1["config"]
config.update(r2["config"])
return {
**config,
"incident": r2["incident"],
"publicGroupList": r2["publicGroupList"]
}
2022-07-06 21:29:40 +02:00
def add_status_page(self, slug: str, title: str) -> dict:
"""
Add a status page.
:param slug: Slug
:type slug: str
:param title: Title
:type title: str
:return: The server response.
Example::
>>> api.add_status_page("slug1", "status page 1")
{
'msg': 'OK!'
}
"""
with self.wait_for_event(Event.STATUS_PAGE_LIST):
return self._call('addStatusPage', (title, slug))
2022-07-05 22:12:37 +02:00
def delete_status_page(self, slug: str) -> dict:
"""
Delete a status page.
:param slug: Slug
:type slug: str
:return: The server response.
Example::
>>> api.delete_status_page("slug1")
{}
"""
r = self._call('deleteStatusPage', slug)
# uptime kuma does not send the status page list event when a status page is deleted
for status_page in self._event_data[Event.STATUS_PAGE_LIST].values():
if status_page["slug"] == slug:
status_page_id = status_page["id"]
del self._event_data[Event.STATUS_PAGE_LIST][str(status_page_id)]
break
return r
2022-07-05 22:12:37 +02:00
def save_status_page(self, slug: str, **kwargs) -> dict:
"""
Save a status page.
:param slug: Slug
:type slug: str
:param id: Id of the status page to save
:type id: int
:param title: Title
:type title: str
:param description: (optional) Description
:type description: str
:param theme: (optional) Switch Theme
:type theme: str
:param published: (optional) Published
:type published: bool
:param showTags: (optional) Show Tags
:type showTags: bool
:param domainNameList: (optional) Domain Names
:type domainNameList: list
:param customCSS: (optional) Custom CSS
:type customCSS: str
:param footerText: (optional) Custom Footer
:type footerText: str
:param showPoweredBy: (optional) Show Powered By
:type showPoweredBy: bool
:param icon: (optional) Icon
:type icon: str
:param publicGroupList: (optional) Public Group List
:type publicGroupList: list
:return: The server response.
Example::
>>> monitor_id = 1
>>> api.save_status_page(
... slug="slug1",
... title="status page 1",
... description="description 1",
... theme="light",
... published=True,
... showTags=False,
... domainNameList=[],
... customCSS="",
... footerText=None,
... showPoweredBy=False,
... icon="/icon.svg",
... publicGroupList=[
... {
... 'name': 'Services',
... 'weight': 1,
... 'monitorList': [
... {
... "id": monitor_id
... }
... ]
... }
... ]
... )
{
'publicGroupList': [
{
'id': 1,
'monitorList': [
{
'id': 1
}
],
'name': 'Services',
'weight': 1
}
]
}
"""
2022-07-06 21:29:40 +02:00
status_page = self.get_status_page(slug)
2022-08-26 14:04:43 +02:00
status_page.pop("incident")
status_page.update(kwargs)
2022-07-06 21:29:40 +02:00
data = _build_status_page_data(**status_page)
r = self._call('saveStatusPage', data)
# uptime kuma does not send the status page list event when a status page is saved
status_page = self._call('getStatusPage', slug)["config"]
status_page_id = status_page["id"]
if self._event_data[Event.STATUS_PAGE_LIST] is None:
self._event_data[Event.STATUS_PAGE_LIST] = {}
self._event_data[Event.STATUS_PAGE_LIST][str(status_page_id)] = status_page
return r
2022-07-02 20:40:14 +02:00
2022-07-05 22:12:37 +02:00
def post_incident(
self,
slug: str,
title: str,
content: str,
style: IncidentStyle = IncidentStyle.PRIMARY
) -> dict:
"""
Post an incident to status page.
:param slug: Slug
:type slug: str
:param title: Title
:type title: str
:param content: Content
:type content: str
:param style: (optional) Style
:type style: IncidentStyle.DANGER
:return: The server response.
Example::
>>> api.post_incident(
... slug="slug1",
... title="title 1",
... content="content 1",
... style=IncidentStyle.DANGER
... )
{
'content': 'content 1',
'createdDate': '2022-12-15 16:51:43',
'id': 1,
'pin': True,
'style': 'danger',
'title': 'title 1'
}
"""
2022-07-05 22:12:37 +02:00
incident = {
"title": title,
"content": content,
"style": style
}
r = self._call('postIncident', (slug, incident))["incident"]
2022-07-06 21:29:40 +02:00
self.save_status_page(slug)
return r
2022-07-05 22:12:37 +02:00
def unpin_incident(self, slug: str) -> dict:
"""
Unpin an incident from a status page.
:param slug: Slug
:type slug: str
:return: The server response.
Example::
>>> api.unpin_incident(slug="slug1")
{}
"""
r = self._call('unpinIncident', slug)
2022-07-06 21:29:40 +02:00
self.save_status_page(slug)
return r
2022-07-02 20:40:14 +02:00
2022-07-02 16:00:54 +02:00
# heartbeat
def get_heartbeats(self) -> list:
"""
Get heartbeats.
:return: The heartbeats.
Example::
>>> api.get_heartbeats()
[
{
'bool': False,
'data': [
{
'down_count': 0,
'duration': 0,
'id': 1,
'important': True,
'monitor_id': 1,
'msg': 'connect ECONNREFUSED 127.0.0.1:80',
'ping': None,
'status': False,
'time': '2022-12-15 16:51:41.782'
},
{
'down_count': 0,
'duration': 60,
'id': 2,
'important': False,
'monitor_id': 1,
'msg': 'connect ECONNREFUSED 127.0.0.1:80',
'ping': None,
'status': False,
'time': '2022-12-15 16:52:41.799'
},
...
],
'id': '1'
}
]
"""
2022-08-05 15:52:19 +02:00
r = self._get_event_data(Event.HEARTBEAT_LIST)
for i in r:
int_to_bool(i["data"], ["important", "status"])
return r
2022-07-02 16:00:54 +02:00
def get_important_heartbeats(self) -> list:
"""
Get important heartbeats.
:return: The important heartbeats.
Example::
>>> api.get_important_heartbeats()
[
{
'bool': False,
'data': [
{
'duration': 0,
'important': True,
'monitorID': 1,
'msg': 'connect ECONNREFUSED 127.0.0.1:80',
'ping': None,
'status': False,
'time': '2022-12-15 16:51:41.782'
}
],
'id': '1'
}
]
"""
2022-08-05 15:52:19 +02:00
r = self._get_event_data(Event.IMPORTANT_HEARTBEAT_LIST)
for i in r:
int_to_bool(i["data"], ["important", "status"])
return r
2022-07-02 16:00:54 +02:00
def get_heartbeat(self) -> list:
"""
Get heartbeat.
:return: The heartbeat.
Example::
>>> api.get_heartbeat()
[
{
'duration': 60,
'important': False,
'monitorID': 1,
'msg': 'connect ECONNREFUSED 127.0.0.1:80',
'status': False,
'time': '2022-12-15 17:17:42.099'
}
]
"""
2022-08-05 15:52:19 +02:00
r = self._get_event_data(Event.HEARTBEAT)
int_to_bool(r, ["important", "status"])
return r
2022-07-02 16:00:54 +02:00
# avg ping
def avg_ping(self) -> list:
"""
Get average ping.
:return: The average ping.
Example::
>>> api.avg_ping()
[
{
'id': '1',
'data': 67
}
]
"""
2022-08-05 15:52:19 +02:00
return self._get_event_data(Event.AVG_PING)
2022-07-02 16:00:54 +02:00
# cert info
def cert_info(self) -> list:
"""
Get certificate info.
:return: Certificate info.
Example::
>>> api.cert_info()
[
{
'id': '2',
'data': '{"valid":true,"certInfo":{"subject":{"C":"US","ST":"California","L":"San Francisco","O":"Cloudflare, Inc.","CN":"cloudflare-dns.com"},"issuer":{"C":"US","O":"DigiCert Inc","CN":"DigiCert TLS Hybrid ECC SHA384 2020 CA1"},"subjectaltname":"DNS:cloudflare-dns.com, DNS:*.cloudflare-dns.com, DNS:one.one.one.one, IP Address:1.0.0.1, IP Address:1.1.1.1, IP Address:162.159.36.1, IP Address:162.159.46.1, IP Address:2606:4700:4700:0:0:0:0:1001, IP Address:2606:4700:4700:0:0:0:0:1111, IP Address:2606:4700:4700:0:0:0:0:64, IP Address:2606:4700:4700:0:0:0:0:6400","infoAccess":{"OCSP - URI":["http://ocsp.digicert.com"],"CA Issuers - URI":["http://cacerts.digicert.com/DigiCertTLSHybridECCSHA3842020CA1-1.crt"]},"bits":256,"pubkey":{"type":"Buffer","data":[4,252,62,81,239,116,29,198,218,120,186,174,165,138,74,221,217,11,230,226,91,87,49,87,222,211,191,182,217,138,59,79,210,84,84,136,207,189,46,101,231,102,235,197,223,208,49,84,82,167,44,238,18,134,163,154,102,193,234,6,121,3,186,27,240]},"asn1Curve":"prime256v1","nistCurve":"P-256","valid_from":"Sep 13 00:00:00 2022 GMT","valid_to":"Sep 13 23:59:59 2023 GMT","fingerprint":"D1:D4:67:E7:BC:0E:AC:CD:C8:87:A6:12:B8:B2:BC:15:C1:69:04:6B","fingerprint256":"66:67:73:19:84:78:03:0C:56:FB:23:76:8E:48:19:C2:B7:5C:32:2C:D3:BE:A4:A8:34:6B:B0:3C:22:8D:4F:18","fingerprint512":"C2:76:3A:C5:AC:64:76:BB:BF:9F:AB:3A:B9:04:55:06:A4:8D:13:67:08:26:10:A0:FE:22:B5:E2:26:E0:67:1F:EC:17:B7:C2:59:18:1E:7B:46:99:7C:54:A4:9E:4B:C6:58:B4:16:B4:88:6F:0C:5B:60:D1:78:AD:E9:CE:28:1C","ext_key_usage":["1.3.6.1.5.5.7.3.1","1.3.6.1.5.5.7.3.2"],"serialNumber":"0D1C7AF28E5F2717DBB27F410820BDF7","raw":{"type":"Buffer","data":[48,130,5,247,48,130,5,125,160,3,2,1,2,2,16,13,28,122,242,142,95,39,23,219,178,127,65,8,32,189,247,48,10,6,8,42,134,72,206,61,4,3,3,48,86,49,11,48,9,6,3,85,4,6,19,2,85,83,49,21,48,19,6,3,85,4,10,19,12,68,105,103,105,67,101,114,116,32,73,110,99,49,48,48,46,6,3,85,4,3,19,39,68,105,103,105,67,101,114,116,32,84,76,83,32,72,121,98,114,105,100,32,69,67,67,32,83,72,65,51,56,52,32,50,48,50,48,32,67,65,49,48,30,23,13,50,50,48,57,49,51,48,48,48,48,48,48,90,23,13,50,51,48,57,49,51,50,51,53,57,53,57,90,48,114,49,11,48,9,6,3,85,4,6,19,2,85,83,49,19,48,17,6,3,85,4,8,19,10,67,97,108,105,102,111,114,110,105,97,49,22,48,20,6,3,85,4,7,19,13,83,97,110,32,70,114,97,110,99,105,115,99,111,49,25,48,23,6,3,85,4,10,19,16,67,108,111,117,100,102,108,97,114,101,44,32,73,110,99,46,49,27,48,25,6,3,85,4,3,19,18,99,108,111,117,100,102,108,97,114,101,45,100,110,115,46,99,111,109,48,89,48,19,6,7,42,134,72,206,61,2,1,6,8,42,134,72,206,61,3,1,7,3,66,0,4,252,62,81,239,116,29,198,218,120,186,174,165,138,74,221,217,11,230,226,91,87,49,87,222,211,191,182,217,138,59,79,210,84,84,136,207,189,46,101,231,102,235,197,223,208,49,84,82,167,44,238,18,134,163,154,102,193,234,6,121,3,186,27,240,163,130,4,15,48,130,4,11,48,31,6,3,85,29,35,4,24,48,22,128,20,10,188,8,41,23,140,165,57,109,122,14,206,51,199,46,179,237,251,195,122,48,29,6,3,85,29,14,4,22,4,20,210,99,186,148,214,84,127,76,133,20,8,58,28,133,86,41,239,89,143,204,48,129,166,6,3,85,29,17,4,129,158,48,129,155,130,18,99,108,111,117,100,102,108,97,114,101,45,100,110,115,46,99,111,109,130,20,42,46,99,108,111,117,100,102,108,97,114,101,45,100,110,115,46,99,111,109,130,15,111,110,101,46,111,110,101,46,111,110,101,46,111,110,101,135,4,1,0,0,1,135,4,1,1,1,1,135,4,162,159,36,1,135,4,162,159,46,1,135,16,38,6,71,0,71,0,0,0,0,0,0,0,0,0,16,1,135,16,38,6,71,0,71,0,0,0,0,0,0,0,0,0,17,17,135,16,38,6,71,0,71,0,0,0,0,0,0,0,0,0,0,100,135,16,38,6,71,0,71,0,0,0,0,0,0,0,0,0,100,0,48,14,6,3,85,29,15,1,1,255,4,4,3,2,7,128,48,29,6,3,85,29,37,4,22,48,20,6,8,43,6,1,5,5,7,3,1,6,8,43,6,1,5,5,7,3,2,48,129,155,6,3,85,29,31,4,129,147,48,129,144,48,70,160,68,160,66,134,64,104,116,116,112,58,47,47,99,114,108,51,46,100,105,103,105,99,101,114,116,46,99,111,109,47,68,105,103,105,67,101,114,116,84,76,83,72,121,98,114,105,100,69,67,67,83,72,65,51,56,52,50,48,50,48,67,65,49,45,49,46,99,114,108,48,70,160,68,160,66,134,64,104,116,116,112,58,47,47,99,114,108,52,46,100,105,103,105,99,101,114,116,46,99,111,109,
}
]
"""
2022-08-05 15:52:19 +02:00
return self._get_event_data(Event.CERT_INFO)
2022-07-02 16:00:54 +02:00
# uptime
def uptime(self) -> list:
"""
Get monitor uptime.
:return: Monitor uptime.
Example::
>>> api.uptime()
[
{
'id': '2',
'duration': 24,
'uptime': 1
},
{
'id': '2',
'duration': 720,
'uptime': 1
}
]
"""
2022-08-05 15:52:19 +02:00
return self._get_event_data(Event.UPTIME)
2022-07-02 16:00:54 +02:00
# info
2022-09-07 13:03:10 +02:00
def info(self) -> dict:
"""
Get server info.
:return: Server info.
Example::
>>> api.info()
{
'version': '1.18.5',
'latestVersion': '1.18.5',
'primaryBaseURL': None
}
"""
2022-08-05 15:52:19 +02:00
r = self._get_event_data(Event.INFO)
return r
2022-07-02 16:00:54 +02:00
# clear
def clear_events(self, monitor_id: int) -> dict:
"""
Clear monitor events.
:param monitor_id: Id of the monitor to clear events.
:type monitor_id: int
:return: The server response.
Example::
>>> api.clear_events(1)
{}
"""
return self._call('clearEvents', monitor_id)
2022-07-02 16:00:54 +02:00
def clear_heartbeats(self, monitor_id: int) -> dict:
"""
Clear monitor heartbeats.
:param monitor_id: Id of the monitor to clear heartbeats.
:type monitor_id: int
:return: The server response.
Example::
>>> api.clear_heartbeats(1)
{}
"""
return self._call('clearHeartbeats', monitor_id)
2022-07-02 16:00:54 +02:00
def clear_statistics(self) -> dict:
"""
Clear statistics.
:return: The server response.
Example::
>>> api.clear_statistics()
{}
"""
return self._call('clearStatistics')
2022-07-02 16:00:54 +02:00
# tags
def get_tags(self) -> list:
"""
Get all tags.
:return: All tags.
Example::
>>> api.get_tags()
[
{
'color': '#ffffff',
'id': 1,
'name': 'tag 1'
}
]
"""
return self._call('getTags')["tags"]
2022-07-02 16:00:54 +02:00
def get_tag(self, id_: int) -> dict:
"""
Get a tag.
:param id_: Id of the monitor to get.
:type id_: int
:return: The tag.
Example::
>>> api.get_tag(1)
{
'color': '#ffffff',
'id': 1,
'name': 'tag 1'
}
"""
2022-07-05 22:12:37 +02:00
tags = self.get_tags()
for tag in tags:
if tag["id"] == id_:
return tag
raise UptimeKumaException("tag does not exist")
2022-07-05 22:12:37 +02:00
# not working, monitor id required?
2022-07-05 22:12:37 +02:00
# def edit_tag(self, id_: int, name: str, color: str):
# return self._call('editTag', {
2022-07-05 22:12:37 +02:00
# "id": id_,
# "name": name,
# "color": color
# })
def delete_tag(self, id_: int) -> dict:
"""
Delete a tag.
:param id_: Id of the monitor to delete.
:type id_: int
:return: The server response.
Example::
>>> api.delete_tag(1)
{
'msg': 'Deleted Successfully.'
}
"""
return self._call('deleteTag', id_)
2022-07-02 16:00:54 +02:00
def add_tag(self, name: str, color: str) -> dict:
"""
Add a tag.
:param name: Tag name
:type name: str
:param color: Tag color
:type color: str
:return: The server response.
Example::
>>> api.add_tag(
... name="tag 1",
... color="#ffffff"
... )
{
'color': '#ffffff',
'id': 1,
'name': 'tag 1'
}
"""
return self._call('addTag', {
2022-07-02 16:00:54 +02:00
"name": name,
2022-07-05 22:12:37 +02:00
"color": color,
2022-07-02 16:00:54 +02:00
"new": True
})["tag"]
2022-07-02 16:00:54 +02:00
# settings
def get_settings(self) -> dict:
"""
Get settings.
:return: Settings.
Example::
>>> api.get_settings()
{
'checkUpdate': False,
'checkBeta': False,
'keepDataPeriodDays': 180,
'entryPage': 'dashboard',
'searchEngineIndex': False,
'primaryBaseURL': '',
'steamAPIKey': '',
'tlsExpiryNotifyDays': [
7,
14,
21
],
'disableAuth': False,
'trustProxy': False
}
"""
r = self._call('getSettings')["data"]
return r
def set_settings(
self,
password: str = None, # only required if disableAuth is true
2022-07-02 16:00:54 +02:00
# about
2022-08-03 11:56:02 +02:00
checkUpdate: bool = True,
checkBeta: bool = False,
# monitor history
2022-08-03 11:56:02 +02:00
keepDataPeriodDays: int = 180,
# general
2022-08-03 11:56:02 +02:00
entryPage: str = "dashboard",
searchEngineIndex: bool = False,
primaryBaseURL: str = "",
steamAPIKey: str = "",
# notifications
2022-08-03 11:56:02 +02:00
tlsExpiryNotifyDays: list = None,
# security
2022-09-07 13:03:10 +02:00
disableAuth: bool = False,
# reverse proxy
trustProxy: bool = False
) -> dict:
"""
Set settings.
:param password: (optional) Password
:type password: str
:param checkUpdate: (optional) Show update if available
:type checkUpdate: bool
:param checkBeta: (optional) Also check beta release
:type checkBeta: bool
:param keepDataPeriodDays: (optional) Keep monitor history data for X days.
:type keepDataPeriodDays: int
:param entryPage: (optional) Entry Page
:type entryPage: str
:param searchEngineIndex: (optional) Search Engine Visibility
:type searchEngineIndex: bool
:param primaryBaseURL: (optional) Primary Base URL
:type primaryBaseURL: str
:param steamAPIKey: (optional) Steam API Key. For monitoring a Steam Game Server you need a Steam Web-API key.
:type steamAPIKey: str
:param tlsExpiryNotifyDays: (optional) TLS Certificate Expiry. HTTPS Monitors trigger notification when TLS certificate expires in.
:type tlsExpiryNotifyDays: list
:param disableAuth: (optional) Disable Authentication
:type disableAuth: bool
:param trustProxy: (optional) Trust Proxy. Trust 'X-Forwarded-*' headers. If you want to get the correct client IP and your Uptime Kuma is behind such as Nginx or Apache, you should enable this.
:type trustProxy: bool
:return: The server response.
Example::
>>> api.set_settings(
... checkUpdate=False,
... checkBeta=False,
... keepDataPeriodDays=180,
... entryPage="dashboard",
... searchEngineIndex=False,
... primaryBaseURL="",
... steamAPIKey="",
... tlsExpiryNotifyDays=[7, 14, 21],
... disableAuth=False,
... trustProxy=False
... )
{
'msg': 'Saved'
}
"""
2022-08-03 11:56:02 +02:00
if not tlsExpiryNotifyDays:
tlsExpiryNotifyDays = [7, 14, 21]
data = {
2022-08-03 11:56:02 +02:00
"checkUpdate": checkUpdate,
"checkBeta": checkBeta,
"keepDataPeriodDays": keepDataPeriodDays,
"entryPage": entryPage,
"searchEngineIndex": searchEngineIndex,
"primaryBaseURL": primaryBaseURL,
"steamAPIKey": steamAPIKey,
"tlsExpiryNotifyDays": tlsExpiryNotifyDays,
"disableAuth": disableAuth
}
2022-09-07 13:03:10 +02:00
if parse_version(self.version) >= parse_version("1.18"):
data.update({
"trustProxy": trustProxy
})
return self._call('setSettings', (data, password))
2022-07-02 16:00:54 +02:00
def change_password(self, old_password: str, new_password: str) -> dict:
"""
Change password.
:param old_password: Old password
:type old_password: str
:param new_password: New password
:type new_password: str
:return: The server response.
Example::
>>> api.change_password(
... old_password="secret123",
... new_password="321terces"
... )
{
'msg': 'Password has been updated successfully.'
}
"""
return self._call('changePassword', {
2022-07-02 16:00:54 +02:00
"currentPassword": old_password,
"newPassword": new_password,
})
def upload_backup(self, json_data: str, import_handle: str = "skip") -> dict:
"""
Import Backup.
:param json_data: Backup data as json string.
:type json_data: str
:param import_handle: (optional) Choose "skip" if you want to skip every monitor or notification with the same name. "overwrite" will delete every existing monitor and notification. "keep" will keep both.
:type import_handle: str
:return: The server response.
Example::
>>> json_data = json.dumps({
... "version": "1.17.1",
... "notificationList": [],
... "monitorList": [],
... "proxyList": []
... })
>>> api.upload_backup(
... json_data=json_data,
... import_handle="overwrite"
... )
{
'msg': 'Backup successfully restored.'
}
"""
2022-07-02 16:00:54 +02:00
if import_handle not in ["overwrite", "skip", "keep"]:
raise ValueError()
return self._call('uploadBackup', (json_data, import_handle))
2022-07-02 16:00:54 +02:00
# 2FA
def twofa_status(self) -> dict:
"""
Get current 2FA status.
:return: The server response.
Example::
>>> api.twofa_status()
{
'status': False
}
"""
return self._call('twoFAStatus')
2022-07-02 16:00:54 +02:00
def prepare_2fa(self, password: str) -> dict:
"""
Prepare 2FA configuration.
:param password: Current password.
:type password: str
:return: The server response.
Example::
>>> password = "secret123"
>>> r = api.prepare_2fa(password)
>>> r
{
'uri': 'otpauth://totp/Uptime%20Kuma:admin?secret=NBGVQNSNNRXWQ3LJJN4DIWSWIIYW45CZJRXXORSNOY3USSKXO5RG4MDPI5ZUK5CWJFIFOVCBGZVG24TSJ5LDE2BTMRLXOZBSJF3TISA'
}
>>> uri = r["uri"]
>>>
>>> from urllib import parse
>>> def parse_secret(uri):
... query = parse.urlsplit(uri).query
... params = dict(parse.parse_qsl(query))
... return params["secret"]
>>> secret = parse_secret(uri)
>>> secret
NBGVQNSNNRXWQ3LJJN4DIWSWIIYW45CZJRXXORSNOY3USSKXO5RG4MDPI5ZUK5CWJFIFOVCBGZVG24TSJ5LDE2BTMRLXOZBSJF3TISA
"""
return self._call('prepare2FA', password)
2022-07-02 16:00:54 +02:00
def verify_token(self, token: str, password: str) -> dict:
"""
Verify the provided 2FA token.
:param token: 2FA token.
:type token: str
:param password: Current password.
:type password: str
:return: The server response.
Example::
>>> import pyotp
>>> def generate_token(secret):
... totp = pyotp.TOTP(secret)
... return totp.now()
>>> token = generate_token(secret)
>>> token
526564
>>> api.verify_token(token, password)
{
'valid': True
}
"""
2022-08-05 14:33:28 +02:00
return self._call('verifyToken', (token, password))
def save_2fa(self, password: str) -> dict:
"""
Save the current 2FA configuration.
:param password: Current password.
:type password: str
:return: The server response.
Example::
>>> api.save_2fa(password)
{
'msg': '2FA Enabled.'
}
"""
return self._call('save2FA', password)
2022-07-02 16:00:54 +02:00
def disable_2fa(self, password: str) -> dict:
"""
Disable 2FA for this user.
:param password: Current password.
:type password: str
:return: The server response.
Example::
>>> api.disable_2fa(password)
{
'msg': '2FA Disabled.'
}
"""
return self._call('disable2FA', password)
2022-07-02 16:00:54 +02:00
# login
def login(self, username: str = None, password: str = None, token: str = "") -> dict:
"""
Login.
If username and password is not provided, auto login is performed if disableAuth is enabled.
:param username: (optional) Username. Must be None if disableAuth is enabled.
:type username: str
:param password: (optional) Password. Must be None if disableAuth is enabled.
:type password: str
:param token: (optional) 2FA Token. Required if 2FA is enabled.
:type token: str
:return: The server response.
Example::
>>> username = "admin"
>>> password = "secret123"
>>> api.login(username, password)
{
'token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFkbWluIiwiaWF0IjoxNjcxMTk3MjkzfQ.lpho_LuKMnoltXOdA7-jZ98gXOU-UbEIuxMwMRm4Nz0'
}
"""
# autologin
if username is None and password is None:
with self.wait_for_event(Event.AUTO_LOGIN):
return {}
return self._call('login', {
2022-07-02 16:00:54 +02:00
"username": username,
"password": password,
2022-08-26 17:02:55 +02:00
"token": token
2022-07-02 16:00:54 +02:00
})
def login_by_token(self, token: str) -> dict:
"""
Login by token.
:param token: Login token generated by :meth:`~login`
:type token: str
:return: The server response.
Example::
>>> api.login_by_token(token)
{}
"""
return self._call('loginByToken', token)
2022-07-02 16:00:54 +02:00
def logout(self) -> None:
"""
Logout.
:return: The server response.
Example::
>>> api.logout()
None
"""
return self._call('logout')
2022-07-02 16:00:54 +02:00
# setup
def need_setup(self) -> bool:
"""
Check if the server has already been set up.
:return: The server response.
Example::
>>> api.need_setup()
True
"""
return self._call('needSetup')
2022-07-02 16:00:54 +02:00
def setup(self, username: str, password: str) -> dict:
"""
Set up the server.
:param username: Username
:type username: str
:param password: Password
:type password: str
:return: The server response.
Example::
>>> api.setup(username, password)
{
'msg': 'Added Successfully.'
}
"""
return self._call("setup", (username, password))
2022-07-02 20:40:14 +02:00
# database
def get_database_size(self) -> dict:
"""
Get database size.
:return: The server response.
Example::
>>> api.get_database_size()
{
'size': 61440
}
"""
return self._call('getDatabaseSize')
2022-07-02 20:40:14 +02:00
def shrink_database(self) -> dict:
"""
Shrink database.
Trigger database VACUUM for SQLite. If your database is created after 1.10.0, AUTO_VACUUM is already enabled and this action is not needed.
:return: The server response.
Example::
>>> api.shrink_database()
{}
"""
return self._call('shrinkDatabase')
2022-09-07 13:03:10 +02:00
# docker host
def get_docker_hosts(self) -> list:
"""
Get all docker hosts.
:return: All docker hosts.
Example::
>>> api.get_docker_hosts()
[
{
'dockerDaemon': '/var/run/docker.sock',
'dockerType': 'socket',
'id': 1,
'name': 'name 1',
'userID': 1
}
]
"""
2022-09-07 13:03:10 +02:00
r = self._get_event_data(Event.DOCKER_HOST_LIST)
return r
def get_docker_host(self, id_: int) -> dict:
"""
Get a docker host.
:param id_: Id of the docker host to get.
:type id_: int
:return: The docker host.
Example::
>>> api.get_docker_host(1)
{
'dockerDaemon': '/var/run/docker.sock',
'dockerType': 'socket',
'id': 1,
'name': 'name 1',
'userID': 1
}
"""
2022-09-07 13:03:10 +02:00
docker_hosts = self.get_docker_hosts()
for docker_host in docker_hosts:
if docker_host["id"] == id_:
return docker_host
raise UptimeKumaException("docker host does not exist")
@append_docstring(docker_host_docstring("test"))
def test_docker_host(self, **kwargs) -> dict:
"""
Test a docker host.
:return: The server response.
Example::
>>> api.test_docker_host(
... name="name 1",
... dockerType=DockerType.SOCKET,
... dockerDaemon="/var/run/docker.sock"
... )
{
'msg': 'Connected Successfully. Amount of containers: 10'
}
"""
2022-09-07 13:03:10 +02:00
data = _build_docker_host_data(**kwargs)
return self._call('testDockerHost', data)
@append_docstring(docker_host_docstring("add"))
def add_docker_host(self, **kwargs) -> dict:
"""
Add a docker host.
:return: The server response.
Example::
>>> api.add_docker_host(
... name="name 1",
... dockerType=DockerType.SOCKET,
... dockerDaemon="/var/run/docker.sock"
... )
{
'id': 1,
'msg': 'Saved'
}
"""
2022-09-07 13:03:10 +02:00
data = _build_docker_host_data(**kwargs)
_convert_docker_host_input(data)
with self.wait_for_event(Event.DOCKER_HOST_LIST):
return self._call('addDockerHost', (data, None))
2022-09-07 13:03:10 +02:00
@append_docstring(docker_host_docstring("edit"))
def edit_docker_host(self, id_: int, **kwargs) -> dict:
"""
Edit a docker host.
:param id_: Id of the docker host to edit.
:type id_: int
:return: The server response.
Example::
>>> api.edit_docker_host(1, name="name 2")
{
'id': 1,
'msg': 'Saved'
}
"""
2022-09-07 13:03:10 +02:00
data = self.get_docker_host(id_)
data.update(kwargs)
_convert_docker_host_input(data)
with self.wait_for_event(Event.DOCKER_HOST_LIST):
return self._call('addDockerHost', (data, id_))
2022-09-07 13:03:10 +02:00
def delete_docker_host(self, id_: int) -> dict:
"""
Delete a docker host.
:param id_: Id of the docker host to delete.
:type id_: int
:return: The server response.
Example::
>>> api.delete_docker_host(1)
{
'msg': 'Deleted'
}
"""
with self.wait_for_event(Event.DOCKER_HOST_LIST):
return self._call('deleteDockerHost', id_)