remove converter

This commit is contained in:
lucasheld 2022-08-03 11:56:02 +02:00
parent a1a483efce
commit 8d2c0ec4f7
10 changed files with 273 additions and 579 deletions

View file

@ -6,6 +6,8 @@ uptime-kuma-api is a Python wrapper for the Uptime Kuma WebSocket API.
This package was developed to configure Uptime Kuma with Ansible. The Ansible collection can be found at https://github.com/lucasheld/ansible-uptime-kuma. This package was developed to configure Uptime Kuma with Ansible. The Ansible collection can be found at https://github.com/lucasheld/ansible-uptime-kuma.
Python version 3.6+ is required.
Installation Installation
--- ---
uptime-kuma-api is available on the Python Package Index (PyPI). uptime-kuma-api is available on the Python Package Index (PyPI).
@ -31,7 +33,7 @@ To do so, import `UptimeKumaApi` from the library and specify the Uptime Kuma se
Now you can call one of the existing methods of the instance. For example create a new monitor: Now you can call one of the existing methods of the instance. For example create a new monitor:
```python ```python
>>> result = api.add_monitor(type_=MonitorType.HTTP, name="new monitor", url="http://192.168.1.1") >>> result = api.add_monitor(type=MonitorType.HTTP, name="new monitor", url="http://192.168.1.1")
>>> print(result) >>> print(result)
{'msg': 'Added Successfully.', 'monitor_id': 1} {'msg': 'Added Successfully.', 'monitorId': 1}
``` ```

102
scripts/build_models.py Normal file
View file

@ -0,0 +1,102 @@
import re
from pprint import pprint
def parse_data_keys(data):
keys = []
for line in data.split("\n"):
line = line.strip()
if not line:
continue
match = re.match(r'^([^:]+):', line) # example: "type: this.type,"
if match:
key = match.group(1)
else:
key = line.rstrip(",") # example: "notificationIDList,"
keys.append(key)
return keys
def parse_heartbeat():
with open('uptime-kuma/server/model/heartbeat.js') as f:
content = f.read()
all_keys = []
match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1)
keys = parse_data_keys(data)
all_keys.extend(keys)
match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1)
keys = parse_data_keys(data)
all_keys.extend(keys)
all_keys = list(set(all_keys))
return all_keys
def parse_incident():
with open('uptime-kuma/server/model/incident.js') as f:
content = f.read()
match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1)
keys = parse_data_keys(data)
return keys
def parse_monitor():
# todo: toPublicJSON ???
with open('uptime-kuma/server/model/monitor.js') as f:
content = f.read()
matches = re.findall(r'data = {([^}]+)}', content)
all_keys = []
for match in matches:
keys = parse_data_keys(match)
keys = [i for i in keys if i != "...data"]
all_keys.extend(keys)
return all_keys
def parse_proxy():
with open('uptime-kuma/server/model/proxy.js') as f:
content = f.read()
match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1)
keys = parse_data_keys(data)
return keys
def parse_status_page():
with open('uptime-kuma/server/model/status_page.js') as f:
content = f.read()
all_keys = []
match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1)
keys = parse_data_keys(data)
all_keys.extend(keys)
match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1)
keys = parse_data_keys(data)
all_keys.extend(keys)
all_keys = list(set(all_keys))
return all_keys
def parse_tag():
with open('uptime-kuma/server/model/tag.js') as f:
content = f.read()
match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1)
keys = parse_data_keys(data)
return keys
pprint(parse_heartbeat())
pprint(parse_incident())
pprint(parse_monitor())
pprint(parse_proxy())
pprint(parse_status_page())
pprint(parse_tag())
# TODO:
# https://github.com/louislam/uptime-kuma/blob/2adb142ae25984ecebfa4b51c739fec5e492763a/server/proxy.js#L20
# https://github.com/louislam/uptime-kuma/blob/239611a016a85712305100818d4c7b88a14664a9/server/socket-handlers/status-page-socket-handler.js#L118

View file

@ -0,0 +1,6 @@
notification = [
"type",
"isDefault",
"userId",
"applyExisting",
]

View file

@ -6,19 +6,19 @@ from uptime_kuma_api import UptimeKumaException
class TestMonitor(UptimeKumaTestCase): class TestMonitor(UptimeKumaTestCase):
def test_monitor(self): def test_monitor(self):
expected_monitor = { expected_monitor = {
"type_": "http", "type": "http",
"name": "monitor 1", "name": "monitor 1",
"url": "http://192.168.20.135" "url": "http://192.168.20.135"
} }
# add monitor # add monitor
r = self.api.add_monitor( r = self.api.add_monitor(
type_=expected_monitor["type_"], type=expected_monitor["type"],
name=expected_monitor["name"], name=expected_monitor["name"],
url=expected_monitor["url"] url=expected_monitor["url"]
) )
self.assertEqual(r["msg"], "Added Successfully.") self.assertEqual(r["msg"], "Added Successfully.")
monitor_id = r["monitor_id"] monitor_id = r["monitorID"]
# get monitor # get monitor
monitor = self.api.get_monitor(monitor_id) monitor = self.api.get_monitor(monitor_id)
@ -31,7 +31,7 @@ class TestMonitor(UptimeKumaTestCase):
self.compare(monitor, expected_monitor) self.compare(monitor, expected_monitor)
# edit monitor # edit monitor
expected_monitor["type_"] = "ping" expected_monitor["type"] = "ping"
expected_monitor["name"] = "monitor 1 new" expected_monitor["name"] = "monitor 1 new"
expected_monitor["hostname"] = "127.0.0.1" expected_monitor["hostname"] = "127.0.0.1"
del expected_monitor["url"] del expected_monitor["url"]

View file

@ -6,8 +6,8 @@ class TestMonitorTag(UptimeKumaTestCase):
def test_monitor_tag(self): def test_monitor_tag(self):
r = self.api.add_tag(name="tag 1", color="#ffffff") r = self.api.add_tag(name="tag 1", color="#ffffff")
tag_id = r["id"] tag_id = r["id"]
r = self.api.add_monitor(type_="http", name="monitor 1", url="http://127.0.0.1") r = self.api.add_monitor(type="http", name="monitor 1", url="http://127.0.0.1")
monitor_id = r["monitor_id"] monitor_id = r["monitorID"]
expected_monitor_tag = { expected_monitor_tag = {
"tag_id": tag_id, "tag_id": tag_id,

View file

@ -8,9 +8,9 @@ class TestNotification(UptimeKumaTestCase):
expected_notification = { expected_notification = {
"name": "notification 1", "name": "notification 1",
"default": True, "default": True,
"apply_existing": True, "applyExisting": True,
"type_": "push_by_techulus", "type": "PushByTechulus",
"push_by_techulus_apikey": "123456789" "pushAPIKey": "123456789"
} }
# test notification # test notification
@ -35,14 +35,15 @@ class TestNotification(UptimeKumaTestCase):
# edit notification # edit notification
expected_notification["name"] = "notification 1 new" expected_notification["name"] = "notification 1 new"
expected_notification["default"] = False expected_notification["default"] = False
expected_notification["apply_existing"] = False expected_notification["applyExisting"] = False
expected_notification["type_"] = "push_deer" expected_notification["type"] = "PushDeer"
expected_notification["push_deer_deer_key"] = "987654321" expected_notification["pushdeerKey"] = "987654321"
del expected_notification["push_by_techulus_apikey"] del expected_notification["pushAPIKey"]
r = self.api.edit_notification(notification_id, **expected_notification) r = self.api.edit_notification(notification_id, **expected_notification)
self.assertEqual(r["msg"], "Saved") self.assertEqual(r["msg"], "Saved")
notification = self.api.get_notification(notification_id) notification = self.api.get_notification(notification_id)
self.compare(notification, expected_notification) self.compare(notification, expected_notification)
self.assertIsNone(notification.get("pushAPIKey"))
# delete notification # delete notification
r = self.api.delete_notification(notification_id) r = self.api.delete_notification(notification_id)

View file

@ -10,7 +10,7 @@ class TestStatusPage(UptimeKumaTestCase):
"slug": slug, "slug": slug,
"title": "status page 1", "title": "status page 1",
"description": "description 1", "description": "description 1",
"show_powered_by": False "showPoweredBy": False
} }
# slug must be unique # slug must be unique

View file

@ -4,17 +4,5 @@ from .monitor_type import MonitorType
from .notification_providers import NotificationType, notification_provider_options from .notification_providers import NotificationType, notification_provider_options
from .proxy_protocol import ProxyProtocol from .proxy_protocol import ProxyProtocol
from .incident_style import IncidentStyle from .incident_style import IncidentStyle
from .converter import \
convert_from_socket,\
convert_to_socket, \
params_map_monitor, \
params_map_notification,\
params_map_notification_providers,\
params_map_notification_provider_options,\
get_params_map_notification, \
params_map_proxy, \
params_map_status_page, \
params_map_info, \
params_map_settings
from .exceptions import UptimeKumaException from .exceptions import UptimeKumaException
from .api import UptimeKumaApi from .api import UptimeKumaApi

View file

@ -8,19 +8,9 @@ from . import MonitorType
from . import NotificationType, notification_provider_options from . import NotificationType, notification_provider_options
from . import ProxyProtocol from . import ProxyProtocol
from . import IncidentStyle from . import IncidentStyle
from . import \
convert_from_socket,\
convert_to_socket, \
params_map_monitor, \
params_map_notification,\
params_map_notification_providers, \
get_params_map_notification, \
params_map_proxy, \
params_map_status_page, \
params_map_info, \
params_map_settings
from . import UptimeKumaException from . import UptimeKumaException
def int_to_bool(data, keys): def int_to_bool(data, keys):
if type(data) == list: if type(data) == list:
for d in data: for d in data:
@ -32,30 +22,30 @@ def int_to_bool(data, keys):
def _build_monitor_data( def _build_monitor_data(
type_: MonitorType, type: MonitorType,
name: str, name: str,
heartbeat_interval: int = 60, interval: int = 60,
heartbeat_retry_interval: int = 60, retryInterval: int = 60,
retries: int = 0, maxretries: int = 0,
upside_down_mode: bool = False, upsideDown: bool = False,
tags: list = None, tags: list = None,
notification_ids: list = None, notificationIDList: list = None,
# HTTP, KEYWORD # HTTP, KEYWORD
url: str = None, url: str = None,
certificate_expiry_notification: bool = False, expiryNotification: bool = False,
ignore_tls_error: bool = False, ignoreTls: bool = False,
max_redirects: int = 10, maxredirects: int = 10,
accepted_status_codes: list = None, accepted_statuscodes: list = None,
proxy_id: int = None, proxyId: int = None,
http_method: str = "GET", method: str = "GET",
http_body: str = None, body: str = None,
http_headers: str = None, headers: str = None,
auth_method: AuthMethod = AuthMethod.NONE, authMethod: AuthMethod = AuthMethod.NONE,
auth_user: str = None, basic_auth_user: str = None,
auth_pass: str = None, basic_auth_pass: str = None,
auth_domain: str = None, authDomain: str = None,
auth_workstation: str = None, authWorkstation: str = None,
# KEYWORD # KEYWORD
keyword: str = None, keyword: str = None,
@ -71,38 +61,38 @@ def _build_monitor_data(
dns_resolve_type: str = "A", dns_resolve_type: str = "A",
# MQTT # MQTT
mqtt_username: str = None, mqttUsername: str = None,
mqtt_password: str = None, mqttPassword: str = None,
mqtt_topic: str = None, mqttTopic: str = None,
mqtt_success_message: str = None, mqttSuccessMessage: str = None,
# SQLSERVER # SQLSERVER
sqlserver_connection_string: str = "Server=<hostname>,<port>;" databaseConnectionString: str = "Server=<hostname>,<port>;"
"Database=<your database>;" "Database=<your database>;"
"User Id=<your user id>;" "User Id=<your user id>;"
"Password=<your password>;" "Password=<your password>;"
"Encrypt=<true/false>;" "Encrypt=<true/false>;"
"TrustServerCertificate=<Yes/No>;" "TrustServerCertificate=<Yes/No>;"
"Connection Timeout=<int>", "Connection Timeout=<int>",
sqlserver_query: str = None databaseQuery: str = None
): ):
if not accepted_status_codes: if not accepted_statuscodes:
accepted_status_codes = ["200-299"] accepted_statuscodes = ["200-299"]
dict_notification_ids = {} dict_notification_ids = {}
if notification_ids: if notificationIDList:
for notification_id in notification_ids: for notification_id in notificationIDList:
dict_notification_ids[notification_id] = True dict_notification_ids[notification_id] = True
notification_ids = dict_notification_ids notificationIDList = dict_notification_ids
data = { data = {
"type_": type_, "type": type,
"name": name, "name": name,
"heartbeat_interval": heartbeat_interval, "interval": interval,
"heartbeat_retry_interval": heartbeat_retry_interval, "retryInterval": retryInterval,
"retries": retries, "maxretries": maxretries,
"notification_ids": notification_ids, "notificationIDList": notificationIDList,
"upside_down_mode": upside_down_mode, "upsideDown": upsideDown,
} }
if tags: if tags:
@ -110,7 +100,7 @@ def _build_monitor_data(
"tags": tags "tags": tags
}) })
if type_ == MonitorType.KEYWORD: if type == MonitorType.KEYWORD:
data.update({ data.update({
"keyword": keyword, "keyword": keyword,
}) })
@ -118,27 +108,27 @@ def _build_monitor_data(
# HTTP, KEYWORD # HTTP, KEYWORD
data.update({ data.update({
"url": url, "url": url,
"certificate_expiry_notification": certificate_expiry_notification, "expiryNotification": expiryNotification,
"ignore_tls_error": ignore_tls_error, "ignoreTls": ignoreTls,
"max_redirects": max_redirects, "maxredirects": maxredirects,
"accepted_status_codes": accepted_status_codes, "accepted_statuscodes": accepted_statuscodes,
"proxy_id": proxy_id, "proxyId": proxyId,
"http_method": http_method, "method": method,
"http_body": http_body, "body": body,
"http_headers": http_headers, "headers": headers,
"auth_method": auth_method, "authMethod": authMethod,
}) })
if auth_method in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]: if authMethod in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]:
data.update({ data.update({
"auth_user": auth_user, "basic_auth_user": basic_auth_user,
"auth_pass": auth_pass, "basic_auth_pass": basic_auth_pass,
}) })
if auth_method == AuthMethod.NTLM: if authMethod == AuthMethod.NTLM:
data.update({ data.update({
"auth_domain": auth_domain, "authDomain": authDomain,
"auth_workstation": auth_workstation, "authWorkstation": authWorkstation,
}) })
# DNS, PING, STEAM, MQTT # DNS, PING, STEAM, MQTT
@ -159,42 +149,38 @@ def _build_monitor_data(
# MQTT # MQTT
data.update({ data.update({
"mqtt_username": mqtt_username, "mqttUsername": mqttUsername,
"mqtt_password": mqtt_password, "mqttPassword": mqttPassword,
"mqtt_topic": mqtt_topic, "mqttTopic": mqttTopic,
"mqtt_success_message": mqtt_success_message, "mqttSuccessMessage": mqttSuccessMessage,
}) })
# SQLSERVER # SQLSERVER
data.update({ data.update({
"sqlserver_connection_string": sqlserver_connection_string "databaseConnectionString": databaseConnectionString
}) })
if type_ == MonitorType.SQLSERVER: if type == MonitorType.SQLSERVER:
data.update({ data.update({
"sqlserver_query": sqlserver_query, "databaseQuery": databaseQuery,
}) })
data = convert_to_socket(params_map_monitor, data)
return data return data
def _build_notification_data( def _build_notification_data(
name: str, name: str,
type_: NotificationType, type: NotificationType,
default: bool = False, isDefault: bool = False,
apply_existing: bool = False, applyExisting: bool = False,
**kwargs **kwargs
): ):
params_map = get_params_map_notification(type_)
type_ = convert_to_socket(params_map, type_)
data = { data = {
"name": name, "name": name,
"type_": type_, "type": type,
"default": default, "isDefault": isDefault,
"apply_existing": apply_existing, "applyExisting": applyExisting,
**kwargs **kwargs
} }
data = convert_to_socket(params_map, data)
return data return data
@ -207,7 +193,7 @@ def _build_proxy_data(
password: str = None, password: str = None,
active: bool = True, active: bool = True,
default: bool = False, default: bool = False,
apply_existing: bool = False, applyExisting: bool = False,
): ):
data = { data = {
"protocol": protocol, "protocol": protocol,
@ -218,9 +204,8 @@ def _build_proxy_data(
"password": password, "password": password,
"active": active, "active": active,
"default": default, "default": default,
"apply_existing": apply_existing "applyExisting": applyExisting
} }
data = convert_to_socket(params_map_proxy, data)
return data return data
@ -228,24 +213,24 @@ def _build_status_page_data(
slug: str, slug: str,
# config # config
id_: int, id: int,
title: str, title: str,
description: str = None, description: str = None,
theme: str = "light", theme: str = "light",
published: bool = True, published: bool = True,
show_tags: bool = False, showTags: bool = False,
domain_name_list: list = None, domainNameList: list = None,
custom_css: str = "", customCSS: str = "",
footer_text: str = None, footerText: str = None,
show_powered_by: bool = True, showPoweredBy: bool = True,
img_data_url: str = "/icon.svg", icon: str = "/icon.svg",
monitors: list = None monitors: list = None
): ):
if theme not in ["light", "dark"]: if theme not in ["light", "dark"]:
raise ValueError raise ValueError
if not domain_name_list: if not domainNameList:
domain_name_list = [] domainNameList = []
public_group_list = [] public_group_list = []
if monitors: if monitors:
public_group_list.append({ public_group_list.append({
@ -253,39 +238,36 @@ def _build_status_page_data(
"monitorList": monitors "monitorList": monitors
}) })
config = { config = {
"id_": id_, "id": id,
"slug": slug, "slug": slug,
"title": title, "title": title,
"description": description, "description": description,
"img_data_url": img_data_url, "icon": icon,
"theme": theme, "theme": theme,
"published": published, "published": published,
"show_tags": show_tags, "showTags": showTags,
"domain_name_list": domain_name_list, "domainNameList": domainNameList,
"custom_css": custom_css, "customCSS": customCSS,
"footer_text": footer_text, "footerText": footerText,
"show_powered_by": show_powered_by "showPoweredBy": showPoweredBy
} }
config = convert_to_socket(params_map_status_page, config) return slug, config, icon, public_group_list
return slug, config, img_data_url, public_group_list
def _check_missing_arguments(required_params, kwargs, params_map): def _check_missing_arguments(required_params, kwargs):
missing_arguments = [] missing_arguments = []
for required_param in required_params: for required_param in required_params:
required_param_sock = convert_to_socket(params_map, required_param) if kwargs.get(required_param) is None:
if kwargs.get(required_param_sock) is None:
missing_arguments.append(required_param) missing_arguments.append(required_param)
if missing_arguments: if missing_arguments:
missing_arguments_str = ", ".join([f"'{i}'" for i in missing_arguments]) missing_arguments_str = ", ".join([f"'{i}'" for i in missing_arguments])
raise TypeError(f"missing {len(missing_arguments)} required argument: {missing_arguments_str}") raise TypeError(f"missing {len(missing_arguments)} required argument: {missing_arguments_str}")
def _check_argument_conditions(valid_params, kwargs, params_map): def _check_argument_conditions(valid_params, kwargs):
for valid_param in valid_params: for valid_param in valid_params:
valid_param_sock = convert_to_socket(params_map, valid_param) if valid_param in kwargs:
if valid_param_sock in kwargs: value = kwargs[valid_param]
value = kwargs[valid_param_sock]
conditions = valid_params[valid_param] conditions = valid_params[valid_param]
min_ = conditions.get("min") min_ = conditions.get("min")
max_ = conditions.get("max") max_ = conditions.get("max")
@ -297,40 +279,40 @@ def _check_argument_conditions(valid_params, kwargs, params_map):
def _check_arguments_monitor(kwargs): def _check_arguments_monitor(kwargs):
required_args = [ required_args = [
"type_", "type",
"name", "name",
"heartbeat_interval", "interval",
"retries", "maxretries",
"heartbeat_retry_interval" "retryInterval"
] ]
_check_missing_arguments(required_args, kwargs, params_map_monitor) _check_missing_arguments(required_args, kwargs)
required_args_by_type = { required_args_by_type = {
MonitorType.HTTP: ["url", "max_redirects"], MonitorType.HTTP: ["url", "maxredirects"],
MonitorType.PORT: ["hostname", "port"], MonitorType.PORT: ["hostname", "port"],
MonitorType.PING: ["hostname"], MonitorType.PING: ["hostname"],
MonitorType.KEYWORD: ["url", "keyword", "max_redirects"], MonitorType.KEYWORD: ["url", "keyword", "maxredirects"],
MonitorType.DNS: ["hostname", "dns_resolve_server", "port"], MonitorType.DNS: ["hostname", "dns_resolve_server", "port"],
MonitorType.PUSH: [], MonitorType.PUSH: [],
MonitorType.STEAM: ["hostname", "port"], MonitorType.STEAM: ["hostname", "port"],
MonitorType.MQTT: ["hostname", "port", "mqtt_topic"], MonitorType.MQTT: ["hostname", "port", "mqttTopic"],
MonitorType.SQLSERVER: [], MonitorType.SQLSERVER: [],
} }
type_ = kwargs[convert_to_socket(params_map_monitor, "type")] type_ = kwargs["type"]
required_args = required_args_by_type[type_] required_args = required_args_by_type[type_]
_check_missing_arguments(required_args, kwargs, params_map_monitor) _check_missing_arguments(required_args, kwargs)
conditions = { conditions = {
"heartbeat_interval": { "interval": {
"min": 20 "min": 20
}, },
"retries": { "maxretries": {
"min": 0 "min": 0
}, },
"heartbeat_retry_interval": { "retryInterval": {
"min": 20 "min": 20
}, },
"max_redirects": { "maxredirects": {
"min": 0 "min": 0
}, },
"port": { "port": {
@ -338,41 +320,39 @@ def _check_arguments_monitor(kwargs):
"max": 65535 "max": 65535
} }
} }
_check_argument_conditions(conditions, kwargs, params_map_monitor) _check_argument_conditions(conditions, kwargs)
def _check_arguments_notification(kwargs): def _check_arguments_notification(kwargs):
required_args = ["type_", "name"] required_args = ["type", "name"]
_check_missing_arguments(required_args, kwargs, params_map_notification) _check_missing_arguments(required_args, kwargs)
type_ = kwargs[convert_to_socket(params_map_notification, "type")] type_ = kwargs["type"]
required_args_sock = notification_provider_options[type_] required_args = notification_provider_options[type_]
params_map = get_params_map_notification(type_sock=type_) _check_missing_arguments(required_args, kwargs)
required_args = convert_from_socket(params_map, required_args_sock)
_check_missing_arguments(required_args, kwargs, params_map)
provider_conditions = { provider_conditions = {
'gotify_priority': { 'gotifyPriority': {
'max': 10, 'max': 10,
'min': 0 'min': 0
}, },
'ntfy_priority': { 'ntfyPriority': {
'max': 5, 'max': 5,
'min': 1 'min': 1
}, },
'smtp_smtp_port': { 'smtpPort': {
'max': 65535, 'max': 65535,
'min': 0 'min': 0
} }
} }
_check_argument_conditions(provider_conditions, kwargs, params_map) _check_argument_conditions(provider_conditions, kwargs)
def _check_arguments_proxy(kwargs): def _check_arguments_proxy(kwargs):
required_args = ["protocol", "host", "port"] required_args = ["protocol", "host", "port"]
if kwargs.get("auth"): if kwargs.get("auth"):
required_args.extend(["username", "password"]) required_args.extend(["username", "password"])
_check_missing_arguments(required_args, kwargs, params_map_proxy) _check_missing_arguments(required_args, kwargs)
conditions = { conditions = {
"port": { "port": {
@ -380,7 +360,7 @@ def _check_arguments_proxy(kwargs):
"max": 65535 "max": 65535
} }
} }
_check_argument_conditions(conditions, kwargs, params_map_proxy) _check_argument_conditions(conditions, kwargs)
class UptimeKumaApi(object): class UptimeKumaApi(object):
@ -505,13 +485,11 @@ class UptimeKumaApi(object):
def get_monitors(self): def get_monitors(self):
r = list(self._get_event_data("monitorList").values()) r = list(self._get_event_data("monitorList").values())
r = convert_from_socket(params_map_monitor, r)
int_to_bool(r, ["active"]) int_to_bool(r, ["active"])
return r return r
def get_monitor(self, id_: int): def get_monitor(self, id_: int):
r = self._call('getMonitor', id_)["monitor"] r = self._call('getMonitor', id_)["monitor"]
r = convert_from_socket(params_map_monitor, r)
int_to_bool(r, ["active"]) int_to_bool(r, ["active"])
return r return r
@ -531,22 +509,15 @@ class UptimeKumaApi(object):
def add_monitor(self, **kwargs): def add_monitor(self, **kwargs):
data = _build_monitor_data(**kwargs) data = _build_monitor_data(**kwargs)
_check_arguments_monitor(data) _check_arguments_monitor(data)
r = self._call('add', data) r = self._call('add', data)
r = convert_from_socket(params_map_monitor, r)
return r return r
def edit_monitor(self, id_: int, **kwargs): def edit_monitor(self, id_: int, **kwargs):
data = self.get_monitor(id_) data = self.get_monitor(id_)
data.update(kwargs) data.update(kwargs)
data = convert_to_socket(params_map_monitor, data)
_check_arguments_monitor(data) _check_arguments_monitor(data)
r = self._call('editMonitor', data) r = self._call('editMonitor', data)
r = convert_from_socket(params_map_monitor, r)
return r return r
# monitor tags # monitor tags
@ -571,10 +542,6 @@ class UptimeKumaApi(object):
config = json.loads(notification["config"]) config = json.loads(notification["config"])
del notification["config"] del notification["config"]
notification.update(config) notification.update(config)
notification["type"] = convert_from_socket(params_map_notification_providers, notification["type"])
params_map = get_params_map_notification(notification["type"])
notification = convert_from_socket(params_map, notification)
r.append(notification) r.append(notification)
return r return r
@ -600,24 +567,16 @@ class UptimeKumaApi(object):
def edit_notification(self, id_: int, **kwargs): def edit_notification(self, id_: int, **kwargs):
notification = self.get_notification(id_) notification = self.get_notification(id_)
if "type_" in kwargs and kwargs["type_"] != notification["type_"]:
# remove old notification provider options from notification object # remove old notification provider options from notification object
if "type" in kwargs and kwargs["type"] != notification["type"]:
for provider in notification_provider_options: for provider in notification_provider_options:
provider_options = notification_provider_options[provider] provider_options = notification_provider_options[provider]
params_map = get_params_map_notification(type_sock=provider) if provider != kwargs["type"]:
provider_options = convert_from_socket(params_map, provider_options)
if provider != kwargs["type_"]:
for option in provider_options: for option in provider_options:
if option in notification: if option in notification:
del notification[option] del notification[option]
# convert type from py to sock
kwargs["type_"] = convert_to_socket(params_map_notification_providers, kwargs["type_"])
notification.update(kwargs) notification.update(kwargs)
params_map = get_params_map_notification(type_sock=kwargs["type_"])
notification = convert_to_socket(params_map, notification)
_check_arguments_notification(notification) _check_arguments_notification(notification)
return self._call('addNotification', (notification, id_)) return self._call('addNotification', (notification, id_))
@ -631,8 +590,7 @@ class UptimeKumaApi(object):
def get_proxies(self): def get_proxies(self):
r = self._get_event_data("proxyList") r = self._get_event_data("proxyList")
r = convert_from_socket(params_map_proxy, r) int_to_bool(r, ["auth", "active", "default", "applyExisting"])
int_to_bool(r, ["auth", "active", "default", "apply_existing"])
return r return r
def get_proxy(self, id_: int): def get_proxy(self, id_: int):
@ -651,8 +609,6 @@ class UptimeKumaApi(object):
def edit_proxy(self, id_: int, **kwargs): def edit_proxy(self, id_: int, **kwargs):
proxy = self.get_proxy(id_) proxy = self.get_proxy(id_)
proxy.update(kwargs) proxy.update(kwargs)
proxy = convert_to_socket(params_map_proxy, proxy)
_check_arguments_proxy(proxy) _check_arguments_proxy(proxy)
return self._call('addProxy', (proxy, id_)) return self._call('addProxy', (proxy, id_))
@ -663,7 +619,6 @@ class UptimeKumaApi(object):
def get_status_pages(self): def get_status_pages(self):
r = list(self._get_event_data("statusPageList").values()) r = list(self._get_event_data("statusPageList").values())
r = convert_from_socket(params_map_status_page, r)
return r return r
def get_status_page(self, slug: str): def get_status_page(self, slug: str):
@ -671,7 +626,6 @@ class UptimeKumaApi(object):
config = r["config"] config = r["config"]
del r["config"] del r["config"]
r.update(config) r.update(config)
r = convert_from_socket(params_map_status_page, r)
return r return r
def add_status_page(self, slug: str, title: str): def add_status_page(self, slug: str, title: str):
@ -699,7 +653,6 @@ class UptimeKumaApi(object):
"style": style "style": style
} }
r = self._call('postIncident', (slug, incident))["incident"] r = self._call('postIncident', (slug, incident))["incident"]
r = convert_from_socket(params_map_status_page, r)
self.save_status_page(slug) self.save_status_page(slug)
return r return r
@ -741,7 +694,6 @@ class UptimeKumaApi(object):
def info(self): def info(self):
r = self._get_event_data("info") r = self._get_event_data("info")
r = convert_from_socket(params_map_info, r)
return r return r
# clear # clear
@ -789,7 +741,6 @@ class UptimeKumaApi(object):
def get_settings(self): def get_settings(self):
r = self._call('getSettings')["data"] r = self._call('getSettings')["data"]
r = convert_from_socket(params_map_settings, r)
return r return r
def set_settings( def set_settings(
@ -797,39 +748,38 @@ class UptimeKumaApi(object):
password: str, password: str,
# about # about
check_update: bool = True, checkUpdate: bool = True,
check_beta: bool = False, checkBeta: bool = False,
# monitor history # monitor history
keep_data_period_days: int = 180, keepDataPeriodDays: int = 180,
# general # general
entry_page: str = "dashboard", entryPage: str = "dashboard",
search_engine_index: bool = False, searchEngineIndex: bool = False,
primary_base_url: str = "", primaryBaseURL: str = "",
steam_api_key: str = "", steamAPIKey: str = "",
# notifications # notifications
tls_expiry_notify_days: list = None, tlsExpiryNotifyDays: list = None,
# security # security
disable_auth: bool = False disableAuth: bool = False
): ):
if not tls_expiry_notify_days: if not tlsExpiryNotifyDays:
tls_expiry_notify_days = [7, 14, 21] tlsExpiryNotifyDays = [7, 14, 21]
data = { data = {
"check_update": check_update, "checkUpdate": checkUpdate,
"check_beta": check_beta, "checkBeta": checkBeta,
"keep_data_period_days": keep_data_period_days, "keepDataPeriodDays": keepDataPeriodDays,
"entry_page": entry_page, "entryPage": entryPage,
"search_engine_index": search_engine_index, "searchEngineIndex": searchEngineIndex,
"primary_base_url": primary_base_url, "primaryBaseURL": primaryBaseURL,
"steam_api_key": steam_api_key, "steamAPIKey": steamAPIKey,
"tls_expiry_notify_days": tls_expiry_notify_days, "tlsExpiryNotifyDays": tlsExpiryNotifyDays,
"disable_auth": disable_auth "disableAuth": disableAuth
} }
data = convert_to_socket(params_map_settings, data)
return self._call('setSettings', (data, password)) return self._call('setSettings', (data, password))
def change_password(self, old_password: str, new_password: str): def change_password(self, old_password: str, new_password: str):

View file

@ -1,355 +0,0 @@
# socket -> python
params_map_monitor = {
"type": "type_",
"interval": "heartbeat_interval",
"retryInterval": "heartbeat_retry_interval",
"maxretries": "retries",
"notificationIDList": "notification_ids",
"upsideDown": "upside_down_mode",
"expiryNotification": "certificate_expiry_notification",
"ignoreTls": "ignore_tls_error",
"maxredirects": "max_redirects",
"accepted_statuscodes": "accepted_status_codes",
"proxyId": "proxy_id",
"method": "http_method",
"body": "http_body",
"headers": "http_headers",
"authMethod": "auth_method",
"basicauth-user": "auth_user",
"basicauth-pass": "auth_pass",
"basicauth-domain": "auth_domain",
"basicauth-workstation": "auth_workstation",
"mqttUsername": "mqtt_username",
"mqttPassword": "mqtt_password",
"mqttTopic": "mqtt_topic",
"mqttSuccessMessage": "mqtt_success_message",
"databaseConnectionString": "sqlserver_connection_string",
"sqlserverQuery": "sqlserver_query",
"authDomain": "auth_domain",
"authWorkstation": "auth_workstation",
"databaseQuery": "database_query",
"monitorID": "monitor_id"
}
params_map_notification = {
"type": "type_",
"isDefault": "default",
"userId": "user_id",
"applyExisting": "apply_existing",
}
params_map_notification_providers = {
'alerta': 'alerta',
'AliyunSMS': 'aliyun_sms',
'apprise': 'apprise',
'Bark': 'bark',
'clicksendsms': 'clicksendsms',
'DingDing': 'ding_ding',
'discord': 'discord',
'Feishu': 'feishu',
'GoogleChat': 'google_chat',
'gorush': 'gorush',
'gotify': 'gotify',
'line': 'line',
'lunasea': 'lunasea',
'matrix': 'matrix',
'mattermost': 'mattermost',
'ntfy': 'ntfy',
'octopush': 'octopush',
'OneBot': 'one_bot',
'PagerDuty': 'pager_duty',
'promosms': 'promosms',
'pushbullet': 'pushbullet',
'PushDeer': 'push_deer',
'pushover': 'pushover',
'pushy': 'pushy',
'rocket.chat': 'rocket_chat',
'serwersms': 'serwersms',
'signal': 'signal',
'slack': 'slack',
'smtp': 'smtp',
'stackfield': 'stackfield',
'teams': 'teams',
'PushByTechulus': 'push_by_techulus',
'telegram': 'telegram',
'webhook': 'webhook',
'WeCom': 'we_com'
}
params_map_notification_provider_options = {
'alerta': {
'alertaApiEndpoint': 'alerta_api_endpoint',
'alertaApiKey': 'alerta_api_key',
'alertaEnvironment': 'alerta_environment',
'alertaAlertState': 'alerta_alert_state',
'alertaRecoverState': 'alerta_recover_state',
},
'aliyun_sms': {
'phonenumber': 'aliyun_sms_phonenumber',
'templateCode': 'aliyun_sms_template_code',
'signName': 'aliyun_sms_sign_name',
'accessKeyId': 'aliyun_sms_access_key_id',
'secretAccessKey': 'aliyun_sms_secret_access_key',
},
'apprise': {
'appriseURL': 'apprise_url',
'title': 'apprise_title',
},
'bark': {
'barkEndpoint': 'bark_endpoint',
},
'clicksendsms': {
'clicksendsmsLogin': 'clicksendsms_login',
'clicksendsmsPassword': 'clicksendsms_password',
'clicksendsmsToNumber': 'clicksendsms_to_number',
'clicksendsmsSenderName': 'clicksendsms_sender_name',
},
'ding_ding': {
'webHookUrl': 'ding_ding_web_hook_url',
'secretKey': 'ding_ding_secret_key',
},
'discord': {
'discordUsername': 'discord_username',
'discordWebhookUrl': 'discord_webhook_url',
'discordPrefixMessage': 'discord_prefix_message',
},
'feishu': {
'feishuWebHookUrl': 'feishu_web_hook_url',
},
'google_chat': {
'googleChatWebhookURL': 'google_chat_chat_webhook_url',
},
'gorush': {
'gorushDeviceToken': 'gorush_device_token',
'gorushPlatform': 'gorush_platform',
'gorushTitle': 'gorush_title',
'gorushPriority': 'gorush_priority',
'gorushRetry': 'gorush_retry',
'gorushTopic': 'gorush_topic',
'gorushServerURL': 'gorush_server_url',
},
'gotify': {
'gotifyserverurl': 'gotify_serverurl',
'gotifyapplicationToken': 'gotify_application_token',
'gotifyPriority': 'gotify_priority',
},
'line': {
'lineChannelAccessToken': 'line_channel_access_token',
'lineUserID': 'line_user_id',
},
'lunasea': {
'lunaseaDevice': 'lunasea_device',
},
'matrix': {
'internalRoomId': 'matrix_internal_room_id',
'accessToken': 'matrix_access_token',
'homeserverUrl': 'matrix_homeserver_url',
},
'mattermost': {
'mattermostusername': 'mattermost_username',
'mattermostWebhookUrl': 'mattermost_webhook_url',
'mattermostchannel': 'mattermost_channel',
'mattermosticonemo': 'mattermost_iconemo',
'mattermosticonurl': 'mattermost_iconurl',
},
'ntfy': {
'ntfyserverurl': 'ntfy_serverurl',
'ntfytopic': 'ntfy_topic',
'ntfyPriority': 'ntfy_priority',
},
'octopush': {
'octopushVersion': 'octopush_version',
'octopushAPIKey': 'octopush_apikey',
'octopushLogin': 'octopush_login',
'octopushPhoneNumber': 'octopush_phone_number',
'octopushSMSType': 'octopush_smstype',
'octopushSenderName': 'octopush_sender_name',
'octopushDMLogin': 'octopush_dmlogin',
'octopushDMAPIKey': 'octopush_dmapikey',
'octopushDMPhoneNumber': 'octopush_dmphone_number',
'octopushDMSenderName': 'octopush_dmsender_name',
'octopushDMSMSType': 'octopush_dmsmstype',
},
'one_bot': {
'httpAddr': 'one_bot_http_addr',
'accessToken': 'one_bot_access_token',
'msgType': 'one_bot_msg_type',
'recieverId': 'one_bot_reciever_id',
},
'pager_duty': {
'pagerdutyAutoResolve': 'pager_duty_duty_auto_resolve',
'pagerdutyIntegrationUrl': 'pager_duty_duty_integration_url',
'pagerdutyPriority': 'pager_duty_duty_priority',
'pagerdutyIntegrationKey': 'pager_duty_duty_integration_key',
},
'promosms': {
'promosmsLogin': 'promosms_login',
'promosmsPassword': 'promosms_password',
'promosmsPhoneNumber': 'promosms_phone_number',
'promosmsSMSType': 'promosms_smstype',
'promosmsSenderName': 'promosms_sender_name',
},
'pushbullet': {
'pushbulletAccessToken': 'pushbullet_access_token',
},
'push_deer': {
'pushdeerKey': 'push_deer_deer_key',
},
'pushover': {
'pushoveruserkey': 'pushover_userkey',
'pushoverapptoken': 'pushover_apptoken',
'pushoversounds': 'pushover_sounds',
'pushoverpriority': 'pushover_priority',
'pushovertitle': 'pushover_title',
'pushoverdevice': 'pushover_device',
},
'pushy': {
'pushyAPIKey': 'pushy_apikey',
'pushyToken': 'pushy_token',
},
'rocket_chat': {
'rocketchannel': 'rocket_chat_channel',
'rocketusername': 'rocket_chat_username',
'rocketiconemo': 'rocket_chat_iconemo',
'rocketwebhookURL': 'rocket_chat_webhook_url',
'rocketbutton': 'rocket_chat_button',
},
'serwersms': {
'serwersmsUsername': 'serwersms_username',
'serwersmsPassword': 'serwersms_password',
'serwersmsPhoneNumber': 'serwersms_phone_number',
'serwersmsSenderName': 'serwersms_sender_name',
},
'signal': {
'signalNumber': 'signal_number',
'signalRecipients': 'signal_recipients',
'signalURL': 'signal_url',
},
'slack': {
'slackbutton': 'slack_button',
'slackchannel': 'slack_channel',
'slackusername': 'slack_username',
'slackiconemo': 'slack_iconemo',
'slackwebhookURL': 'slack_webhook_url',
},
'smtp': {
'smtpHost': 'smtp_host',
'smtpPort': 'smtp_port',
'smtpSecure': 'smtp_secure',
'smtpIgnoreTLSError': 'smtp_ignore_tlserror',
'smtpDkimDomain': 'smtp_dkim_domain',
'smtpDkimKeySelector': 'smtp_dkim_key_selector',
'smtpDkimPrivateKey': 'smtp_dkim_private_key',
'smtpDkimHashAlgo': 'smtp_dkim_hash_algo',
'smtpDkimheaderFieldNames': 'smtp_dkimheader_field_names',
'smtpDkimskipFields': 'smtp_dkimskip_fields',
'smtpUsername': 'smtp_username',
'smtpPassword': 'smtp_password',
'customSubject': 'smtp_custom_subject',
'smtpFrom': 'smtp_from',
'smtpCC': 'smtp_cc',
'smtpBCC': 'smtp_bcc',
'smtpTo': 'smtp_to',
},
'stackfield': {
'stackfieldwebhookURL': 'stackfield_webhook_url',
},
'teams': {
'webhookUrl': 'teams_webhook_url',
},
'push_by_techulus': {
'pushAPIKey': 'push_by_techulus_apikey',
},
'telegram': {
'telegramBotToken': 'telegram_bot_token',
'telegramChatID': 'telegram_chat_id',
},
'webhook': {
'webhookContentType': 'webhook_content_type',
'webhookURL': 'webhook_url',
},
'we_com': {
'weComBotKey': 'we_com_com_bot_key',
},
}
params_map_proxy = {
"applyExisting": "apply_existing",
"createdDate": "created_date",
"userId": "user_id"
}
params_map_status_page = {
"id": "id_",
"slug": "slug",
"title": "title",
"description": "description",
"icon": "img_data_url",
"published": "published",
"showTags": "show_tags",
"domainNameList": "domain_name_list",
"customCSS": "custom_css",
"footerText": "footer_text",
"showPoweredBy": "show_powered_by",
"createdDate": "created_date"
}
params_map_info = {
"latestVersion": "latest_version",
"primaryBaseURL": "primary_base_url"
}
params_map_settings = {
# about
"checkUpdate": "check_update",
"checkBeta": "check_beta",
# monitor history
"keepDataPeriodDays": "keep_data_period_days",
# general
"entryPage": "entry_page",
"searchEngineIndex": "search_engine_index",
"primaryBaseURL": "primary_base_url",
"steamAPIKey": "steam_api_key",
# notifications
"tlsExpiryNotifyDays": "tls_expiry_notify_days",
# security
"disableAuth": "disable_auth"
}
def _convert_to_from_socket(params_map: dict, params, to_socket=False):
if type(params) == list:
out = []
params_list = params
for params in params_list:
params_py = _convert_to_from_socket(params_map, params, to_socket)
out.append(params_py)
else:
if to_socket:
params_map = {v: k for k, v in params_map.items()}
if type(params) == dict:
out = {}
for key, value in params.items():
key = params_map.get(key, key)
out[key] = value
else:
return params_map.get(params, params)
return out
def convert_from_socket(params_map, params):
return _convert_to_from_socket(params_map, params)
def convert_to_socket(params_map, params):
return _convert_to_from_socket(params_map, params, to_socket=True)
def get_params_map_notification(type_py=None, type_sock=None):
if not type_py:
type_py = convert_from_socket(params_map_notification_providers, type_sock)
return {
**params_map_notification,
**params_map_notification_providers,
**params_map_notification_provider_options[type_py]
}