From 8d2c0ec4f72021e47a279b92bb5c1e10cbc008d2 Mon Sep 17 00:00:00 2001 From: lucasheld Date: Wed, 3 Aug 2022 11:56:02 +0200 Subject: [PATCH] remove converter --- README.md | 6 +- scripts/build_models.py | 102 ++++++++++ scripts/models/notification.py | 6 + tests/test_monitor.py | 8 +- tests/test_monitor_tag.py | 4 +- tests/test_notification.py | 15 +- tests/test_status_page.py | 2 +- uptime_kuma_api/__init__.py | 12 -- uptime_kuma_api/api.py | 342 ++++++++++++++----------------- uptime_kuma_api/converter.py | 355 --------------------------------- 10 files changed, 273 insertions(+), 579 deletions(-) create mode 100644 scripts/build_models.py create mode 100644 scripts/models/notification.py delete mode 100644 uptime_kuma_api/converter.py diff --git a/README.md b/README.md index 6fad1ea..24e15f8 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ uptime-kuma-api is a Python wrapper for the Uptime Kuma WebSocket API. This package was developed to configure Uptime Kuma with Ansible. The Ansible collection can be found at https://github.com/lucasheld/ansible-uptime-kuma. +Python version 3.6+ is required. + Installation --- uptime-kuma-api is available on the Python Package Index (PyPI). @@ -31,7 +33,7 @@ To do so, import `UptimeKumaApi` from the library and specify the Uptime Kuma se Now you can call one of the existing methods of the instance. For example create a new monitor: ```python ->>> result = api.add_monitor(type_=MonitorType.HTTP, name="new monitor", url="http://192.168.1.1") +>>> result = api.add_monitor(type=MonitorType.HTTP, name="new monitor", url="http://192.168.1.1") >>> print(result) -{'msg': 'Added Successfully.', 'monitor_id': 1} +{'msg': 'Added Successfully.', 'monitorId': 1} ``` diff --git a/scripts/build_models.py b/scripts/build_models.py new file mode 100644 index 0000000..a351e2e --- /dev/null +++ b/scripts/build_models.py @@ -0,0 +1,102 @@ +import re +from pprint import pprint + + +def parse_data_keys(data): + keys = [] + for line in data.split("\n"): + line = line.strip() + if not line: + continue + match = re.match(r'^([^:]+):', line) # example: "type: this.type," + if match: + key = match.group(1) + else: + key = line.rstrip(",") # example: "notificationIDList," + keys.append(key) + return keys + + +def parse_heartbeat(): + with open('uptime-kuma/server/model/heartbeat.js') as f: + content = f.read() + all_keys = [] + match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content) + data = match.group(1) + keys = parse_data_keys(data) + all_keys.extend(keys) + match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content) + data = match.group(1) + keys = parse_data_keys(data) + all_keys.extend(keys) + all_keys = list(set(all_keys)) + return all_keys + + +def parse_incident(): + with open('uptime-kuma/server/model/incident.js') as f: + content = f.read() + match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content) + data = match.group(1) + keys = parse_data_keys(data) + return keys + + +def parse_monitor(): + # todo: toPublicJSON ??? + with open('uptime-kuma/server/model/monitor.js') as f: + content = f.read() + matches = re.findall(r'data = {([^}]+)}', content) + all_keys = [] + for match in matches: + keys = parse_data_keys(match) + keys = [i for i in keys if i != "...data"] + all_keys.extend(keys) + return all_keys + + +def parse_proxy(): + with open('uptime-kuma/server/model/proxy.js') as f: + content = f.read() + match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content) + data = match.group(1) + keys = parse_data_keys(data) + return keys + + +def parse_status_page(): + with open('uptime-kuma/server/model/status_page.js') as f: + content = f.read() + all_keys = [] + match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content) + data = match.group(1) + keys = parse_data_keys(data) + all_keys.extend(keys) + match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content) + data = match.group(1) + keys = parse_data_keys(data) + all_keys.extend(keys) + all_keys = list(set(all_keys)) + return all_keys + + +def parse_tag(): + with open('uptime-kuma/server/model/tag.js') as f: + content = f.read() + match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content) + data = match.group(1) + keys = parse_data_keys(data) + return keys + + +pprint(parse_heartbeat()) +pprint(parse_incident()) +pprint(parse_monitor()) +pprint(parse_proxy()) +pprint(parse_status_page()) +pprint(parse_tag()) + + +# TODO: +# https://github.com/louislam/uptime-kuma/blob/2adb142ae25984ecebfa4b51c739fec5e492763a/server/proxy.js#L20 +# https://github.com/louislam/uptime-kuma/blob/239611a016a85712305100818d4c7b88a14664a9/server/socket-handlers/status-page-socket-handler.js#L118 diff --git a/scripts/models/notification.py b/scripts/models/notification.py new file mode 100644 index 0000000..44822e9 --- /dev/null +++ b/scripts/models/notification.py @@ -0,0 +1,6 @@ +notification = [ + "type", + "isDefault", + "userId", + "applyExisting", +] diff --git a/tests/test_monitor.py b/tests/test_monitor.py index f92bfeb..5d475af 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -6,19 +6,19 @@ from uptime_kuma_api import UptimeKumaException class TestMonitor(UptimeKumaTestCase): def test_monitor(self): expected_monitor = { - "type_": "http", + "type": "http", "name": "monitor 1", "url": "http://192.168.20.135" } # add monitor r = self.api.add_monitor( - type_=expected_monitor["type_"], + type=expected_monitor["type"], name=expected_monitor["name"], url=expected_monitor["url"] ) self.assertEqual(r["msg"], "Added Successfully.") - monitor_id = r["monitor_id"] + monitor_id = r["monitorID"] # get monitor monitor = self.api.get_monitor(monitor_id) @@ -31,7 +31,7 @@ class TestMonitor(UptimeKumaTestCase): self.compare(monitor, expected_monitor) # edit monitor - expected_monitor["type_"] = "ping" + expected_monitor["type"] = "ping" expected_monitor["name"] = "monitor 1 new" expected_monitor["hostname"] = "127.0.0.1" del expected_monitor["url"] diff --git a/tests/test_monitor_tag.py b/tests/test_monitor_tag.py index c946a04..f21d8cf 100644 --- a/tests/test_monitor_tag.py +++ b/tests/test_monitor_tag.py @@ -6,8 +6,8 @@ class TestMonitorTag(UptimeKumaTestCase): def test_monitor_tag(self): r = self.api.add_tag(name="tag 1", color="#ffffff") tag_id = r["id"] - r = self.api.add_monitor(type_="http", name="monitor 1", url="http://127.0.0.1") - monitor_id = r["monitor_id"] + r = self.api.add_monitor(type="http", name="monitor 1", url="http://127.0.0.1") + monitor_id = r["monitorID"] expected_monitor_tag = { "tag_id": tag_id, diff --git a/tests/test_notification.py b/tests/test_notification.py index dd62d82..d34e731 100644 --- a/tests/test_notification.py +++ b/tests/test_notification.py @@ -8,9 +8,9 @@ class TestNotification(UptimeKumaTestCase): expected_notification = { "name": "notification 1", "default": True, - "apply_existing": True, - "type_": "push_by_techulus", - "push_by_techulus_apikey": "123456789" + "applyExisting": True, + "type": "PushByTechulus", + "pushAPIKey": "123456789" } # test notification @@ -35,14 +35,15 @@ class TestNotification(UptimeKumaTestCase): # edit notification expected_notification["name"] = "notification 1 new" expected_notification["default"] = False - expected_notification["apply_existing"] = False - expected_notification["type_"] = "push_deer" - expected_notification["push_deer_deer_key"] = "987654321" - del expected_notification["push_by_techulus_apikey"] + expected_notification["applyExisting"] = False + expected_notification["type"] = "PushDeer" + expected_notification["pushdeerKey"] = "987654321" + del expected_notification["pushAPIKey"] r = self.api.edit_notification(notification_id, **expected_notification) self.assertEqual(r["msg"], "Saved") notification = self.api.get_notification(notification_id) self.compare(notification, expected_notification) + self.assertIsNone(notification.get("pushAPIKey")) # delete notification r = self.api.delete_notification(notification_id) diff --git a/tests/test_status_page.py b/tests/test_status_page.py index 2d986a5..9be29cd 100644 --- a/tests/test_status_page.py +++ b/tests/test_status_page.py @@ -10,7 +10,7 @@ class TestStatusPage(UptimeKumaTestCase): "slug": slug, "title": "status page 1", "description": "description 1", - "show_powered_by": False + "showPoweredBy": False } # slug must be unique diff --git a/uptime_kuma_api/__init__.py b/uptime_kuma_api/__init__.py index 932e01a..edee89e 100644 --- a/uptime_kuma_api/__init__.py +++ b/uptime_kuma_api/__init__.py @@ -4,17 +4,5 @@ from .monitor_type import MonitorType from .notification_providers import NotificationType, notification_provider_options from .proxy_protocol import ProxyProtocol from .incident_style import IncidentStyle -from .converter import \ - convert_from_socket,\ - convert_to_socket, \ - params_map_monitor, \ - params_map_notification,\ - params_map_notification_providers,\ - params_map_notification_provider_options,\ - get_params_map_notification, \ - params_map_proxy, \ - params_map_status_page, \ - params_map_info, \ - params_map_settings from .exceptions import UptimeKumaException from .api import UptimeKumaApi diff --git a/uptime_kuma_api/api.py b/uptime_kuma_api/api.py index 144e62b..f18cd46 100644 --- a/uptime_kuma_api/api.py +++ b/uptime_kuma_api/api.py @@ -8,19 +8,9 @@ from . import MonitorType from . import NotificationType, notification_provider_options from . import ProxyProtocol from . import IncidentStyle -from . import \ - convert_from_socket,\ - convert_to_socket, \ - params_map_monitor, \ - params_map_notification,\ - params_map_notification_providers, \ - get_params_map_notification, \ - params_map_proxy, \ - params_map_status_page, \ - params_map_info, \ - params_map_settings from . import UptimeKumaException + def int_to_bool(data, keys): if type(data) == list: for d in data: @@ -32,30 +22,30 @@ def int_to_bool(data, keys): def _build_monitor_data( - type_: MonitorType, + type: MonitorType, name: str, - heartbeat_interval: int = 60, - heartbeat_retry_interval: int = 60, - retries: int = 0, - upside_down_mode: bool = False, + interval: int = 60, + retryInterval: int = 60, + maxretries: int = 0, + upsideDown: bool = False, tags: list = None, - notification_ids: list = None, + notificationIDList: list = None, # HTTP, KEYWORD url: str = None, - certificate_expiry_notification: bool = False, - ignore_tls_error: bool = False, - max_redirects: int = 10, - accepted_status_codes: list = None, - proxy_id: int = None, - http_method: str = "GET", - http_body: str = None, - http_headers: str = None, - auth_method: AuthMethod = AuthMethod.NONE, - auth_user: str = None, - auth_pass: str = None, - auth_domain: str = None, - auth_workstation: str = None, + expiryNotification: bool = False, + ignoreTls: bool = False, + maxredirects: int = 10, + accepted_statuscodes: list = None, + proxyId: int = None, + method: str = "GET", + body: str = None, + headers: str = None, + authMethod: AuthMethod = AuthMethod.NONE, + basic_auth_user: str = None, + basic_auth_pass: str = None, + authDomain: str = None, + authWorkstation: str = None, # KEYWORD keyword: str = None, @@ -71,38 +61,38 @@ def _build_monitor_data( dns_resolve_type: str = "A", # MQTT - mqtt_username: str = None, - mqtt_password: str = None, - mqtt_topic: str = None, - mqtt_success_message: str = None, + mqttUsername: str = None, + mqttPassword: str = None, + mqttTopic: str = None, + mqttSuccessMessage: str = None, # SQLSERVER - sqlserver_connection_string: str = "Server=,;" - "Database=;" - "User Id=;" - "Password=;" - "Encrypt=;" - "TrustServerCertificate=;" - "Connection Timeout=", - sqlserver_query: str = None + databaseConnectionString: str = "Server=,;" + "Database=;" + "User Id=;" + "Password=;" + "Encrypt=;" + "TrustServerCertificate=;" + "Connection Timeout=", + databaseQuery: str = None ): - if not accepted_status_codes: - accepted_status_codes = ["200-299"] + if not accepted_statuscodes: + accepted_statuscodes = ["200-299"] dict_notification_ids = {} - if notification_ids: - for notification_id in notification_ids: + if notificationIDList: + for notification_id in notificationIDList: dict_notification_ids[notification_id] = True - notification_ids = dict_notification_ids + notificationIDList = dict_notification_ids data = { - "type_": type_, + "type": type, "name": name, - "heartbeat_interval": heartbeat_interval, - "heartbeat_retry_interval": heartbeat_retry_interval, - "retries": retries, - "notification_ids": notification_ids, - "upside_down_mode": upside_down_mode, + "interval": interval, + "retryInterval": retryInterval, + "maxretries": maxretries, + "notificationIDList": notificationIDList, + "upsideDown": upsideDown, } if tags: @@ -110,7 +100,7 @@ def _build_monitor_data( "tags": tags }) - if type_ == MonitorType.KEYWORD: + if type == MonitorType.KEYWORD: data.update({ "keyword": keyword, }) @@ -118,27 +108,27 @@ def _build_monitor_data( # HTTP, KEYWORD data.update({ "url": url, - "certificate_expiry_notification": certificate_expiry_notification, - "ignore_tls_error": ignore_tls_error, - "max_redirects": max_redirects, - "accepted_status_codes": accepted_status_codes, - "proxy_id": proxy_id, - "http_method": http_method, - "http_body": http_body, - "http_headers": http_headers, - "auth_method": auth_method, + "expiryNotification": expiryNotification, + "ignoreTls": ignoreTls, + "maxredirects": maxredirects, + "accepted_statuscodes": accepted_statuscodes, + "proxyId": proxyId, + "method": method, + "body": body, + "headers": headers, + "authMethod": authMethod, }) - if auth_method in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]: + if authMethod in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]: data.update({ - "auth_user": auth_user, - "auth_pass": auth_pass, + "basic_auth_user": basic_auth_user, + "basic_auth_pass": basic_auth_pass, }) - if auth_method == AuthMethod.NTLM: + if authMethod == AuthMethod.NTLM: data.update({ - "auth_domain": auth_domain, - "auth_workstation": auth_workstation, + "authDomain": authDomain, + "authWorkstation": authWorkstation, }) # DNS, PING, STEAM, MQTT @@ -159,42 +149,38 @@ def _build_monitor_data( # MQTT data.update({ - "mqtt_username": mqtt_username, - "mqtt_password": mqtt_password, - "mqtt_topic": mqtt_topic, - "mqtt_success_message": mqtt_success_message, + "mqttUsername": mqttUsername, + "mqttPassword": mqttPassword, + "mqttTopic": mqttTopic, + "mqttSuccessMessage": mqttSuccessMessage, }) # SQLSERVER data.update({ - "sqlserver_connection_string": sqlserver_connection_string + "databaseConnectionString": databaseConnectionString }) - if type_ == MonitorType.SQLSERVER: + if type == MonitorType.SQLSERVER: data.update({ - "sqlserver_query": sqlserver_query, + "databaseQuery": databaseQuery, }) - data = convert_to_socket(params_map_monitor, data) return data def _build_notification_data( name: str, - type_: NotificationType, - default: bool = False, - apply_existing: bool = False, + type: NotificationType, + isDefault: bool = False, + applyExisting: bool = False, **kwargs ): - params_map = get_params_map_notification(type_) - type_ = convert_to_socket(params_map, type_) data = { "name": name, - "type_": type_, - "default": default, - "apply_existing": apply_existing, + "type": type, + "isDefault": isDefault, + "applyExisting": applyExisting, **kwargs } - data = convert_to_socket(params_map, data) return data @@ -207,7 +193,7 @@ def _build_proxy_data( password: str = None, active: bool = True, default: bool = False, - apply_existing: bool = False, + applyExisting: bool = False, ): data = { "protocol": protocol, @@ -218,9 +204,8 @@ def _build_proxy_data( "password": password, "active": active, "default": default, - "apply_existing": apply_existing + "applyExisting": applyExisting } - data = convert_to_socket(params_map_proxy, data) return data @@ -228,24 +213,24 @@ def _build_status_page_data( slug: str, # config - id_: int, + id: int, title: str, description: str = None, theme: str = "light", published: bool = True, - show_tags: bool = False, - domain_name_list: list = None, - custom_css: str = "", - footer_text: str = None, - show_powered_by: bool = True, + showTags: bool = False, + domainNameList: list = None, + customCSS: str = "", + footerText: str = None, + showPoweredBy: bool = True, - img_data_url: str = "/icon.svg", + icon: str = "/icon.svg", monitors: list = None ): if theme not in ["light", "dark"]: raise ValueError - if not domain_name_list: - domain_name_list = [] + if not domainNameList: + domainNameList = [] public_group_list = [] if monitors: public_group_list.append({ @@ -253,39 +238,36 @@ def _build_status_page_data( "monitorList": monitors }) config = { - "id_": id_, + "id": id, "slug": slug, "title": title, "description": description, - "img_data_url": img_data_url, + "icon": icon, "theme": theme, "published": published, - "show_tags": show_tags, - "domain_name_list": domain_name_list, - "custom_css": custom_css, - "footer_text": footer_text, - "show_powered_by": show_powered_by + "showTags": showTags, + "domainNameList": domainNameList, + "customCSS": customCSS, + "footerText": footerText, + "showPoweredBy": showPoweredBy } - config = convert_to_socket(params_map_status_page, config) - return slug, config, img_data_url, public_group_list + return slug, config, icon, public_group_list -def _check_missing_arguments(required_params, kwargs, params_map): +def _check_missing_arguments(required_params, kwargs): missing_arguments = [] for required_param in required_params: - required_param_sock = convert_to_socket(params_map, required_param) - if kwargs.get(required_param_sock) is None: + if kwargs.get(required_param) is None: missing_arguments.append(required_param) if missing_arguments: missing_arguments_str = ", ".join([f"'{i}'" for i in missing_arguments]) raise TypeError(f"missing {len(missing_arguments)} required argument: {missing_arguments_str}") -def _check_argument_conditions(valid_params, kwargs, params_map): +def _check_argument_conditions(valid_params, kwargs): for valid_param in valid_params: - valid_param_sock = convert_to_socket(params_map, valid_param) - if valid_param_sock in kwargs: - value = kwargs[valid_param_sock] + if valid_param in kwargs: + value = kwargs[valid_param] conditions = valid_params[valid_param] min_ = conditions.get("min") max_ = conditions.get("max") @@ -297,40 +279,40 @@ def _check_argument_conditions(valid_params, kwargs, params_map): def _check_arguments_monitor(kwargs): required_args = [ - "type_", + "type", "name", - "heartbeat_interval", - "retries", - "heartbeat_retry_interval" + "interval", + "maxretries", + "retryInterval" ] - _check_missing_arguments(required_args, kwargs, params_map_monitor) + _check_missing_arguments(required_args, kwargs) required_args_by_type = { - MonitorType.HTTP: ["url", "max_redirects"], + MonitorType.HTTP: ["url", "maxredirects"], MonitorType.PORT: ["hostname", "port"], MonitorType.PING: ["hostname"], - MonitorType.KEYWORD: ["url", "keyword", "max_redirects"], + MonitorType.KEYWORD: ["url", "keyword", "maxredirects"], MonitorType.DNS: ["hostname", "dns_resolve_server", "port"], MonitorType.PUSH: [], MonitorType.STEAM: ["hostname", "port"], - MonitorType.MQTT: ["hostname", "port", "mqtt_topic"], + MonitorType.MQTT: ["hostname", "port", "mqttTopic"], MonitorType.SQLSERVER: [], } - type_ = kwargs[convert_to_socket(params_map_monitor, "type")] + type_ = kwargs["type"] required_args = required_args_by_type[type_] - _check_missing_arguments(required_args, kwargs, params_map_monitor) + _check_missing_arguments(required_args, kwargs) conditions = { - "heartbeat_interval": { + "interval": { "min": 20 }, - "retries": { + "maxretries": { "min": 0 }, - "heartbeat_retry_interval": { + "retryInterval": { "min": 20 }, - "max_redirects": { + "maxredirects": { "min": 0 }, "port": { @@ -338,41 +320,39 @@ def _check_arguments_monitor(kwargs): "max": 65535 } } - _check_argument_conditions(conditions, kwargs, params_map_monitor) + _check_argument_conditions(conditions, kwargs) def _check_arguments_notification(kwargs): - required_args = ["type_", "name"] - _check_missing_arguments(required_args, kwargs, params_map_notification) + required_args = ["type", "name"] + _check_missing_arguments(required_args, kwargs) - type_ = kwargs[convert_to_socket(params_map_notification, "type")] - required_args_sock = notification_provider_options[type_] - params_map = get_params_map_notification(type_sock=type_) - required_args = convert_from_socket(params_map, required_args_sock) - _check_missing_arguments(required_args, kwargs, params_map) + type_ = kwargs["type"] + required_args = notification_provider_options[type_] + _check_missing_arguments(required_args, kwargs) provider_conditions = { - 'gotify_priority': { + 'gotifyPriority': { 'max': 10, 'min': 0 }, - 'ntfy_priority': { + 'ntfyPriority': { 'max': 5, 'min': 1 }, - 'smtp_smtp_port': { + 'smtpPort': { 'max': 65535, 'min': 0 } } - _check_argument_conditions(provider_conditions, kwargs, params_map) + _check_argument_conditions(provider_conditions, kwargs) def _check_arguments_proxy(kwargs): required_args = ["protocol", "host", "port"] if kwargs.get("auth"): required_args.extend(["username", "password"]) - _check_missing_arguments(required_args, kwargs, params_map_proxy) + _check_missing_arguments(required_args, kwargs) conditions = { "port": { @@ -380,7 +360,7 @@ def _check_arguments_proxy(kwargs): "max": 65535 } } - _check_argument_conditions(conditions, kwargs, params_map_proxy) + _check_argument_conditions(conditions, kwargs) class UptimeKumaApi(object): @@ -505,13 +485,11 @@ class UptimeKumaApi(object): def get_monitors(self): r = list(self._get_event_data("monitorList").values()) - r = convert_from_socket(params_map_monitor, r) int_to_bool(r, ["active"]) return r def get_monitor(self, id_: int): r = self._call('getMonitor', id_)["monitor"] - r = convert_from_socket(params_map_monitor, r) int_to_bool(r, ["active"]) return r @@ -531,22 +509,15 @@ class UptimeKumaApi(object): def add_monitor(self, **kwargs): data = _build_monitor_data(**kwargs) - _check_arguments_monitor(data) r = self._call('add', data) - - r = convert_from_socket(params_map_monitor, r) return r def edit_monitor(self, id_: int, **kwargs): data = self.get_monitor(id_) data.update(kwargs) - data = convert_to_socket(params_map_monitor, data) - _check_arguments_monitor(data) r = self._call('editMonitor', data) - - r = convert_from_socket(params_map_monitor, r) return r # monitor tags @@ -571,10 +542,6 @@ class UptimeKumaApi(object): config = json.loads(notification["config"]) del notification["config"] notification.update(config) - - notification["type"] = convert_from_socket(params_map_notification_providers, notification["type"]) - params_map = get_params_map_notification(notification["type"]) - notification = convert_from_socket(params_map, notification) r.append(notification) return r @@ -600,24 +567,16 @@ class UptimeKumaApi(object): def edit_notification(self, id_: int, **kwargs): notification = self.get_notification(id_) - if "type_" in kwargs and kwargs["type_"] != notification["type_"]: - # remove old notification provider options from notification object + # remove old notification provider options from notification object + if "type" in kwargs and kwargs["type"] != notification["type"]: for provider in notification_provider_options: provider_options = notification_provider_options[provider] - params_map = get_params_map_notification(type_sock=provider) - provider_options = convert_from_socket(params_map, provider_options) - if provider != kwargs["type_"]: + if provider != kwargs["type"]: for option in provider_options: if option in notification: del notification[option] - # convert type from py to sock - kwargs["type_"] = convert_to_socket(params_map_notification_providers, kwargs["type_"]) - notification.update(kwargs) - params_map = get_params_map_notification(type_sock=kwargs["type_"]) - notification = convert_to_socket(params_map, notification) - _check_arguments_notification(notification) return self._call('addNotification', (notification, id_)) @@ -631,8 +590,7 @@ class UptimeKumaApi(object): def get_proxies(self): r = self._get_event_data("proxyList") - r = convert_from_socket(params_map_proxy, r) - int_to_bool(r, ["auth", "active", "default", "apply_existing"]) + int_to_bool(r, ["auth", "active", "default", "applyExisting"]) return r def get_proxy(self, id_: int): @@ -651,8 +609,6 @@ class UptimeKumaApi(object): def edit_proxy(self, id_: int, **kwargs): proxy = self.get_proxy(id_) proxy.update(kwargs) - proxy = convert_to_socket(params_map_proxy, proxy) - _check_arguments_proxy(proxy) return self._call('addProxy', (proxy, id_)) @@ -663,7 +619,6 @@ class UptimeKumaApi(object): def get_status_pages(self): r = list(self._get_event_data("statusPageList").values()) - r = convert_from_socket(params_map_status_page, r) return r def get_status_page(self, slug: str): @@ -671,7 +626,6 @@ class UptimeKumaApi(object): config = r["config"] del r["config"] r.update(config) - r = convert_from_socket(params_map_status_page, r) return r def add_status_page(self, slug: str, title: str): @@ -699,7 +653,6 @@ class UptimeKumaApi(object): "style": style } r = self._call('postIncident', (slug, incident))["incident"] - r = convert_from_socket(params_map_status_page, r) self.save_status_page(slug) return r @@ -741,7 +694,6 @@ class UptimeKumaApi(object): def info(self): r = self._get_event_data("info") - r = convert_from_socket(params_map_info, r) return r # clear @@ -789,7 +741,6 @@ class UptimeKumaApi(object): def get_settings(self): r = self._call('getSettings')["data"] - r = convert_from_socket(params_map_settings, r) return r def set_settings( @@ -797,39 +748,38 @@ class UptimeKumaApi(object): password: str, # about - check_update: bool = True, - check_beta: bool = False, + checkUpdate: bool = True, + checkBeta: bool = False, # monitor history - keep_data_period_days: int = 180, + keepDataPeriodDays: int = 180, # general - entry_page: str = "dashboard", - search_engine_index: bool = False, - primary_base_url: str = "", - steam_api_key: str = "", + entryPage: str = "dashboard", + searchEngineIndex: bool = False, + primaryBaseURL: str = "", + steamAPIKey: str = "", # notifications - tls_expiry_notify_days: list = None, + tlsExpiryNotifyDays: list = None, # security - disable_auth: bool = False + disableAuth: bool = False ): - if not tls_expiry_notify_days: - tls_expiry_notify_days = [7, 14, 21] + if not tlsExpiryNotifyDays: + tlsExpiryNotifyDays = [7, 14, 21] data = { - "check_update": check_update, - "check_beta": check_beta, - "keep_data_period_days": keep_data_period_days, - "entry_page": entry_page, - "search_engine_index": search_engine_index, - "primary_base_url": primary_base_url, - "steam_api_key": steam_api_key, - "tls_expiry_notify_days": tls_expiry_notify_days, - "disable_auth": disable_auth + "checkUpdate": checkUpdate, + "checkBeta": checkBeta, + "keepDataPeriodDays": keepDataPeriodDays, + "entryPage": entryPage, + "searchEngineIndex": searchEngineIndex, + "primaryBaseURL": primaryBaseURL, + "steamAPIKey": steamAPIKey, + "tlsExpiryNotifyDays": tlsExpiryNotifyDays, + "disableAuth": disableAuth } - data = convert_to_socket(params_map_settings, data) return self._call('setSettings', (data, password)) def change_password(self, old_password: str, new_password: str): diff --git a/uptime_kuma_api/converter.py b/uptime_kuma_api/converter.py deleted file mode 100644 index 6efedaa..0000000 --- a/uptime_kuma_api/converter.py +++ /dev/null @@ -1,355 +0,0 @@ -# socket -> python -params_map_monitor = { - "type": "type_", - "interval": "heartbeat_interval", - "retryInterval": "heartbeat_retry_interval", - "maxretries": "retries", - "notificationIDList": "notification_ids", - "upsideDown": "upside_down_mode", - "expiryNotification": "certificate_expiry_notification", - "ignoreTls": "ignore_tls_error", - "maxredirects": "max_redirects", - "accepted_statuscodes": "accepted_status_codes", - "proxyId": "proxy_id", - "method": "http_method", - "body": "http_body", - "headers": "http_headers", - "authMethod": "auth_method", - "basicauth-user": "auth_user", - "basicauth-pass": "auth_pass", - "basicauth-domain": "auth_domain", - "basicauth-workstation": "auth_workstation", - "mqttUsername": "mqtt_username", - "mqttPassword": "mqtt_password", - "mqttTopic": "mqtt_topic", - "mqttSuccessMessage": "mqtt_success_message", - "databaseConnectionString": "sqlserver_connection_string", - "sqlserverQuery": "sqlserver_query", - "authDomain": "auth_domain", - "authWorkstation": "auth_workstation", - "databaseQuery": "database_query", - "monitorID": "monitor_id" -} - -params_map_notification = { - "type": "type_", - "isDefault": "default", - "userId": "user_id", - "applyExisting": "apply_existing", -} - -params_map_notification_providers = { - 'alerta': 'alerta', - 'AliyunSMS': 'aliyun_sms', - 'apprise': 'apprise', - 'Bark': 'bark', - 'clicksendsms': 'clicksendsms', - 'DingDing': 'ding_ding', - 'discord': 'discord', - 'Feishu': 'feishu', - 'GoogleChat': 'google_chat', - 'gorush': 'gorush', - 'gotify': 'gotify', - 'line': 'line', - 'lunasea': 'lunasea', - 'matrix': 'matrix', - 'mattermost': 'mattermost', - 'ntfy': 'ntfy', - 'octopush': 'octopush', - 'OneBot': 'one_bot', - 'PagerDuty': 'pager_duty', - 'promosms': 'promosms', - 'pushbullet': 'pushbullet', - 'PushDeer': 'push_deer', - 'pushover': 'pushover', - 'pushy': 'pushy', - 'rocket.chat': 'rocket_chat', - 'serwersms': 'serwersms', - 'signal': 'signal', - 'slack': 'slack', - 'smtp': 'smtp', - 'stackfield': 'stackfield', - 'teams': 'teams', - 'PushByTechulus': 'push_by_techulus', - 'telegram': 'telegram', - 'webhook': 'webhook', - 'WeCom': 'we_com' -} - -params_map_notification_provider_options = { - 'alerta': { - 'alertaApiEndpoint': 'alerta_api_endpoint', - 'alertaApiKey': 'alerta_api_key', - 'alertaEnvironment': 'alerta_environment', - 'alertaAlertState': 'alerta_alert_state', - 'alertaRecoverState': 'alerta_recover_state', - }, - 'aliyun_sms': { - 'phonenumber': 'aliyun_sms_phonenumber', - 'templateCode': 'aliyun_sms_template_code', - 'signName': 'aliyun_sms_sign_name', - 'accessKeyId': 'aliyun_sms_access_key_id', - 'secretAccessKey': 'aliyun_sms_secret_access_key', - }, - 'apprise': { - 'appriseURL': 'apprise_url', - 'title': 'apprise_title', - }, - 'bark': { - 'barkEndpoint': 'bark_endpoint', - }, - 'clicksendsms': { - 'clicksendsmsLogin': 'clicksendsms_login', - 'clicksendsmsPassword': 'clicksendsms_password', - 'clicksendsmsToNumber': 'clicksendsms_to_number', - 'clicksendsmsSenderName': 'clicksendsms_sender_name', - }, - 'ding_ding': { - 'webHookUrl': 'ding_ding_web_hook_url', - 'secretKey': 'ding_ding_secret_key', - }, - 'discord': { - 'discordUsername': 'discord_username', - 'discordWebhookUrl': 'discord_webhook_url', - 'discordPrefixMessage': 'discord_prefix_message', - }, - 'feishu': { - 'feishuWebHookUrl': 'feishu_web_hook_url', - }, - 'google_chat': { - 'googleChatWebhookURL': 'google_chat_chat_webhook_url', - }, - 'gorush': { - 'gorushDeviceToken': 'gorush_device_token', - 'gorushPlatform': 'gorush_platform', - 'gorushTitle': 'gorush_title', - 'gorushPriority': 'gorush_priority', - 'gorushRetry': 'gorush_retry', - 'gorushTopic': 'gorush_topic', - 'gorushServerURL': 'gorush_server_url', - }, - 'gotify': { - 'gotifyserverurl': 'gotify_serverurl', - 'gotifyapplicationToken': 'gotify_application_token', - 'gotifyPriority': 'gotify_priority', - }, - 'line': { - 'lineChannelAccessToken': 'line_channel_access_token', - 'lineUserID': 'line_user_id', - }, - 'lunasea': { - 'lunaseaDevice': 'lunasea_device', - }, - 'matrix': { - 'internalRoomId': 'matrix_internal_room_id', - 'accessToken': 'matrix_access_token', - 'homeserverUrl': 'matrix_homeserver_url', - }, - 'mattermost': { - 'mattermostusername': 'mattermost_username', - 'mattermostWebhookUrl': 'mattermost_webhook_url', - 'mattermostchannel': 'mattermost_channel', - 'mattermosticonemo': 'mattermost_iconemo', - 'mattermosticonurl': 'mattermost_iconurl', - }, - 'ntfy': { - 'ntfyserverurl': 'ntfy_serverurl', - 'ntfytopic': 'ntfy_topic', - 'ntfyPriority': 'ntfy_priority', - }, - 'octopush': { - 'octopushVersion': 'octopush_version', - 'octopushAPIKey': 'octopush_apikey', - 'octopushLogin': 'octopush_login', - 'octopushPhoneNumber': 'octopush_phone_number', - 'octopushSMSType': 'octopush_smstype', - 'octopushSenderName': 'octopush_sender_name', - 'octopushDMLogin': 'octopush_dmlogin', - 'octopushDMAPIKey': 'octopush_dmapikey', - 'octopushDMPhoneNumber': 'octopush_dmphone_number', - 'octopushDMSenderName': 'octopush_dmsender_name', - 'octopushDMSMSType': 'octopush_dmsmstype', - }, - 'one_bot': { - 'httpAddr': 'one_bot_http_addr', - 'accessToken': 'one_bot_access_token', - 'msgType': 'one_bot_msg_type', - 'recieverId': 'one_bot_reciever_id', - }, - 'pager_duty': { - 'pagerdutyAutoResolve': 'pager_duty_duty_auto_resolve', - 'pagerdutyIntegrationUrl': 'pager_duty_duty_integration_url', - 'pagerdutyPriority': 'pager_duty_duty_priority', - 'pagerdutyIntegrationKey': 'pager_duty_duty_integration_key', - }, - 'promosms': { - 'promosmsLogin': 'promosms_login', - 'promosmsPassword': 'promosms_password', - 'promosmsPhoneNumber': 'promosms_phone_number', - 'promosmsSMSType': 'promosms_smstype', - 'promosmsSenderName': 'promosms_sender_name', - }, - 'pushbullet': { - 'pushbulletAccessToken': 'pushbullet_access_token', - }, - 'push_deer': { - 'pushdeerKey': 'push_deer_deer_key', - }, - 'pushover': { - 'pushoveruserkey': 'pushover_userkey', - 'pushoverapptoken': 'pushover_apptoken', - 'pushoversounds': 'pushover_sounds', - 'pushoverpriority': 'pushover_priority', - 'pushovertitle': 'pushover_title', - 'pushoverdevice': 'pushover_device', - }, - 'pushy': { - 'pushyAPIKey': 'pushy_apikey', - 'pushyToken': 'pushy_token', - }, - 'rocket_chat': { - 'rocketchannel': 'rocket_chat_channel', - 'rocketusername': 'rocket_chat_username', - 'rocketiconemo': 'rocket_chat_iconemo', - 'rocketwebhookURL': 'rocket_chat_webhook_url', - 'rocketbutton': 'rocket_chat_button', - }, - 'serwersms': { - 'serwersmsUsername': 'serwersms_username', - 'serwersmsPassword': 'serwersms_password', - 'serwersmsPhoneNumber': 'serwersms_phone_number', - 'serwersmsSenderName': 'serwersms_sender_name', - }, - 'signal': { - 'signalNumber': 'signal_number', - 'signalRecipients': 'signal_recipients', - 'signalURL': 'signal_url', - }, - 'slack': { - 'slackbutton': 'slack_button', - 'slackchannel': 'slack_channel', - 'slackusername': 'slack_username', - 'slackiconemo': 'slack_iconemo', - 'slackwebhookURL': 'slack_webhook_url', - }, - 'smtp': { - 'smtpHost': 'smtp_host', - 'smtpPort': 'smtp_port', - 'smtpSecure': 'smtp_secure', - 'smtpIgnoreTLSError': 'smtp_ignore_tlserror', - 'smtpDkimDomain': 'smtp_dkim_domain', - 'smtpDkimKeySelector': 'smtp_dkim_key_selector', - 'smtpDkimPrivateKey': 'smtp_dkim_private_key', - 'smtpDkimHashAlgo': 'smtp_dkim_hash_algo', - 'smtpDkimheaderFieldNames': 'smtp_dkimheader_field_names', - 'smtpDkimskipFields': 'smtp_dkimskip_fields', - 'smtpUsername': 'smtp_username', - 'smtpPassword': 'smtp_password', - 'customSubject': 'smtp_custom_subject', - 'smtpFrom': 'smtp_from', - 'smtpCC': 'smtp_cc', - 'smtpBCC': 'smtp_bcc', - 'smtpTo': 'smtp_to', - }, - 'stackfield': { - 'stackfieldwebhookURL': 'stackfield_webhook_url', - }, - 'teams': { - 'webhookUrl': 'teams_webhook_url', - }, - 'push_by_techulus': { - 'pushAPIKey': 'push_by_techulus_apikey', - }, - 'telegram': { - 'telegramBotToken': 'telegram_bot_token', - 'telegramChatID': 'telegram_chat_id', - }, - 'webhook': { - 'webhookContentType': 'webhook_content_type', - 'webhookURL': 'webhook_url', - }, - 'we_com': { - 'weComBotKey': 'we_com_com_bot_key', - }, -} - -params_map_proxy = { - "applyExisting": "apply_existing", - "createdDate": "created_date", - "userId": "user_id" -} - -params_map_status_page = { - "id": "id_", - "slug": "slug", - "title": "title", - "description": "description", - "icon": "img_data_url", - "published": "published", - "showTags": "show_tags", - "domainNameList": "domain_name_list", - "customCSS": "custom_css", - "footerText": "footer_text", - "showPoweredBy": "show_powered_by", - "createdDate": "created_date" -} - -params_map_info = { - "latestVersion": "latest_version", - "primaryBaseURL": "primary_base_url" -} - -params_map_settings = { - # about - "checkUpdate": "check_update", - "checkBeta": "check_beta", - # monitor history - "keepDataPeriodDays": "keep_data_period_days", - # general - "entryPage": "entry_page", - "searchEngineIndex": "search_engine_index", - "primaryBaseURL": "primary_base_url", - "steamAPIKey": "steam_api_key", - # notifications - "tlsExpiryNotifyDays": "tls_expiry_notify_days", - # security - "disableAuth": "disable_auth" -} - - -def _convert_to_from_socket(params_map: dict, params, to_socket=False): - if type(params) == list: - out = [] - params_list = params - for params in params_list: - params_py = _convert_to_from_socket(params_map, params, to_socket) - out.append(params_py) - else: - if to_socket: - params_map = {v: k for k, v in params_map.items()} - if type(params) == dict: - out = {} - for key, value in params.items(): - key = params_map.get(key, key) - out[key] = value - else: - return params_map.get(params, params) - return out - - -def convert_from_socket(params_map, params): - return _convert_to_from_socket(params_map, params) - - -def convert_to_socket(params_map, params): - return _convert_to_from_socket(params_map, params, to_socket=True) - - -def get_params_map_notification(type_py=None, type_sock=None): - if not type_py: - type_py = convert_from_socket(params_map_notification_providers, type_sock) - return { - **params_map_notification, - **params_map_notification_providers, - **params_map_notification_provider_options[type_py] - }