From 71d6f5cdbafe3f78b71eb4e7ac2ee361fe5578cc Mon Sep 17 00:00:00 2001 From: lucasheld Date: Thu, 7 Jul 2022 13:29:06 +0200 Subject: [PATCH] convert data from socket to python format --- uptimekumaapi/__init__.py | 4 +- uptimekumaapi/api.py | 262 ++++++++++++++++++++++-------------- uptimekumaapi/converter.py | 38 +++++- uptimekumaapi/exceptions.py | 2 + 4 files changed, 202 insertions(+), 104 deletions(-) create mode 100644 uptimekumaapi/exceptions.py diff --git a/uptimekumaapi/__init__.py b/uptimekumaapi/__init__.py index 7f501f1..2d94c86 100644 --- a/uptimekumaapi/__init__.py +++ b/uptimekumaapi/__init__.py @@ -4,5 +4,7 @@ from .notification_providers import NotificationType, notification_provider_opti from .proxy_protocol import ProxyProtocol from .incident_style import IncidentStyle from .converter import convert_from_socket, convert_to_socket, params_map_monitor, params_map_notification, \ - params_map_notification_provider, params_map_proxy, params_map_status_page + params_map_notification_provider, params_map_proxy, params_map_status_page, params_map_info, \ + params_map_settings +from .exceptions import UptimeKumaException from .api import UptimeKumaApi diff --git a/uptimekumaapi/api.py b/uptimekumaapi/api.py index 250bb18..57bdd09 100644 --- a/uptimekumaapi/api.py +++ b/uptimekumaapi/api.py @@ -8,7 +8,10 @@ from . import MonitorType from . import NotificationType, notification_provider_options from . import ProxyProtocol from . import IncidentStyle -from . import convert_from_socket, convert_to_socket, params_map_monitor, params_map_notification, params_map_notification_provider, params_map_proxy, params_map_status_page +from . import convert_from_socket, convert_to_socket, params_map_monitor, params_map_notification, \ + params_map_notification_provider, params_map_proxy, params_map_status_page, params_map_info, \ + params_map_settings +from . import UptimeKumaException def int_to_bool(data: list[dict] | dict, keys): @@ -273,6 +276,13 @@ class UptimeKumaApi(object): time.sleep(0.01) # wait for multiple messages return self._event_data[event] + def _call(self, event, data=None): + r = self.sio.call(event, data) + if type(r) == dict and not r["ok"]: + raise UptimeKumaException(r["msg"]) + r.pop("ok") + return r + # event handlers def _event_connect(self): @@ -348,160 +358,163 @@ class UptimeKumaApi(object): # monitors def get_monitors(self): - monitors = list(self._get_event_data("monitorList").values()) - int_to_bool(monitors, ["active"]) - return monitors + r = list(self._get_event_data("monitorList").values()) + r = convert_from_socket(params_map_monitor, r) + int_to_bool(r, ["active"]) + return r def get_monitor(self, id_: int): - return self.sio.call('getMonitor', id_) + r = self._call('getMonitor', id_)["monitor"] + r = convert_from_socket(params_map_monitor, r) + int_to_bool(r, ["active"]) + return r def pause_monitor(self, id_: int): - return self.sio.call('pauseMonitor', id_) + r = self._call('pauseMonitor', id_) + return r def resume_monitor(self, id_: int): - return self.sio.call('resumeMonitor', id_) + r = self._call('resumeMonitor', id_) + return r def delete_monitor(self, id_: int): - return self.sio.call('deleteMonitor', id_) + return self._call('deleteMonitor', id_) - def get_monitor_beats(self, id_: int, period): - return self.sio.call('getMonitorBeats', (id_, period)) + def get_monitor_beats(self, id_: int, hours: int): + r = self._call('getMonitorBeats', (id_, hours))["data"] + int_to_bool(r, ["important", "status"]) + return r def add_monitor(self, *args, **kwargs): data = _build_monitor_data(*args, **kwargs) - return self.sio.call('add', data) + r = self._call('add', data) + r = convert_from_socket(params_map_monitor, r) + return r def edit_monitor(self, id_: int, **kwargs): - monitor = self.get_monitor(id_)["monitor"] + data = self.get_monitor(id_) kwargs_sock = convert_to_socket(params_map_monitor, kwargs) - monitor.update(kwargs_sock) - return self.sio.call('editMonitor', monitor) + data.update(kwargs_sock) + r = self._call('editMonitor', data) + r = convert_from_socket(params_map_monitor, r) + return r # monitor tags def add_monitor_tag(self, tag_id: int, monitor_id: int, value=""): - return self.sio.call('addMonitorTag', (tag_id, monitor_id, value)) + return self._call('addMonitorTag', (tag_id, monitor_id, value)) - # TODO: check! + # editMonitorTag is unused in uptime-kuma # def edit_monitor_tag(self, tag_id: int, monitor_id: int, value=""): - # return self.sio.call('editMonitorTag', (tag_id, monitor_id, value)) + # return self._call('editMonitorTag', (tag_id, monitor_id, value)) def delete_monitor_tag(self, tag_id: int, monitor_id: int, value=""): - return self.sio.call('deleteMonitorTag', (tag_id, monitor_id, value)) + return self._call('deleteMonitorTag', (tag_id, monitor_id, value)) # notifications def get_notifications(self): - notifications_raw = self._get_event_data("notificationList") - notifications = [] - for notification_raw in notifications_raw: + notifications = self._get_event_data("notificationList") + r = [] + for notification_raw in notifications: notification = notification_raw.copy() config = json.loads(notification["config"]) del notification["config"] notification.update(config) - notifications.append(notification) - return notifications + r.append(notification) + r = convert_from_socket(params_map_notification, r) + r = convert_from_socket(params_map_notification_provider, r) + return r def get_notification(self, id_: int): notifications = self.get_notifications() for notification in notifications: if notification["id"] == id_: return notification + raise UptimeKumaException("notification does not exist") def test_notification(self, *args, **kwargs): data = _build_notification_data(*args, **kwargs) - return self.sio.call('testNotification', data) + return self._call('testNotification', data) def add_notification(self, *args, **kwargs): data = _build_notification_data(*args, **kwargs) - return self.sio.call('addNotification', (data, None)) + return self._call('addNotification', (data, None)) def edit_notification(self, id_: int, **kwargs): notification = self.get_notification(id_) - kwargs_sock = convert_to_socket(params_map_notification, kwargs) - kwargs_sock = convert_to_socket(params_map_notification_provider, kwargs_sock) # remove old notification provider options from notification object - old_type = notification["type"] - new_type = kwargs_sock["type"] - if "type" in kwargs_sock and new_type != old_type: + if "type_" in kwargs and kwargs != notification["type_"]: for provider in notification_provider_options: provider_options = notification_provider_options[provider] - if provider != new_type: + if provider != kwargs: for option in provider_options: if option in notification: del notification[option] - notification.update(kwargs_sock) - return self.sio.call('addNotification', (notification, id_)) + notification.update(kwargs) + return self._call('addNotification', (notification, id_)) def delete_notification(self, id_: int): - return self.sio.call('deleteNotification', id_) + return self._call('deleteNotification', id_) def check_apprise(self): - return self.sio.call('checkApprise') + return self._call('checkApprise') # proxy def get_proxies(self): - proxies = self._get_event_data("proxyList") - proxies = convert_from_socket(params_map_proxy, proxies) - int_to_bool(proxies, ["auth", "active", "default", "apply_existing"]) - proxies = convert_to_socket(params_map_proxy, proxies) - return proxies + r = self._get_event_data("proxyList") + r = convert_from_socket(params_map_proxy, r) + int_to_bool(r, ["auth", "active", "default", "apply_existing"]) + return r def get_proxy(self, id_: int): proxies = self.get_proxies() for proxy in proxies: if proxy["id"] == id_: return proxy + raise UptimeKumaException("proxy does not exist") def add_proxy(self, *args, **kwargs): data = _build_proxy_data(*args, **kwargs) - return self.sio.call('addProxy', (data, None)) + return self._call('addProxy', (data, None)) def edit_proxy(self, id_: int, **kwargs): - proxy_sock = self.get_proxy(id_) - proxy = convert_from_socket(params_map_proxy, proxy_sock) - for key in ["auth", "active", "default", "apply_existing"]: - if key in proxy: - proxy[key] = True if proxy[key] == 1 else False - kwargs_sock = convert_to_socket(params_map_proxy, kwargs) - proxy_sock = convert_to_socket(params_map_proxy, proxy) - proxy_sock.update(kwargs_sock) - return self.sio.call('addProxy', (proxy_sock, id_)) - # return proxy_sock, id_ + proxy = self.get_proxy(id_) + proxy.update(kwargs) + return self._call('addProxy', (proxy, id_)) def delete_proxy(self, id_: int): - return self.sio.call('deleteProxy', id_) + return self._call('deleteProxy', id_) # status page def get_status_pages(self): - return list(self._get_event_data("statusPageList").values()) + r = list(self._get_event_data("statusPageList").values()) + r = convert_from_socket(params_map_status_page, r) + return r def get_status_page(self, slug: str): - r = self.sio.call('getStatusPage', slug) - if r["ok"]: - config = r["config"] - del r["config"] - r.update(config) + r = self._call('getStatusPage', slug) + config = r["config"] + del r["config"] + r.update(config) + r = convert_from_socket(params_map_status_page, r) return r def add_status_page(self, slug: str, title: str): - return self.sio.call('addStatusPage', (title, slug)) + return self._call('addStatusPage', (title, slug)) def delete_status_page(self, slug: str): - return self.sio.call('deleteStatusPage', slug) + return self._call('deleteStatusPage', slug) def save_status_page(self, slug: str, **kwargs): status_page = self.get_status_page(slug) - status_page.pop("ok") - kwargs_sock = convert_to_socket(params_map_status_page, kwargs) - status_page.update(kwargs_sock) - status_page = convert_from_socket(params_map_status_page, status_page) + status_page.update(kwargs) data = _build_status_page_data(**status_page) - return self.sio.call('saveStatusPage', data) + return self._call('saveStatusPage', data) def post_incident( self, @@ -515,25 +528,34 @@ class UptimeKumaApi(object): "content": content, "style": style } - r = self.sio.call('postIncident', (slug, incident)) + r = self._call('postIncident', (slug, incident))["incident"] + r = convert_from_socket(params_map_status_page, r) self.save_status_page(slug) return r def unpin_incident(self, slug: str): - r = self.sio.call('unpinIncident', slug) + r = self._call('unpinIncident', slug) self.save_status_page(slug) return r # heartbeat def get_heartbeats(self): - return self._get_event_data("heartbeatList") + r = self._get_event_data("heartbeatList") + for i in r: + int_to_bool(i["data"], ["important", "status"]) + return r def get_important_heartbeats(self): - return self._get_event_data("importantHeartbeatList") + r = self._get_event_data("importantHeartbeatList") + for i in r: + int_to_bool(i["data"], ["important", "status"]) + return r def get_heartbeat(self): - return self._get_event_data("heartbeat") + r = self._get_event_data("heartbeat") + int_to_bool(r, ["important", "status"]) + return r # avg ping @@ -548,58 +570,100 @@ class UptimeKumaApi(object): # info def info(self): - return self._get_event_data("info") + r = self._get_event_data("info") + r = convert_from_socket(params_map_info, r) + return r # clear - def clear_events(self): - return self.sio.call('clearEvents') + def clear_events(self, monitor_id: int): + return self._call('clearEvents', monitor_id) - def clear_heartbeats(self): - return self.sio.call('clearHeartbeats') + def clear_heartbeats(self, monitor_id: int): + return self._call('clearHeartbeats', monitor_id) def clear_statistics(self): - return self.sio.call('clearStatistics') + return self._call('clearStatistics') # tags def get_tags(self): - return self.sio.call('getTags') + return self._call('getTags')["tags"] def get_tag(self, id_: int): tags = self.get_tags() for tag in tags: if tag["id"] == id_: return tag + raise UptimeKumaException("tag does not exist") - # TODO: not working, monitor id required? + # not working, monitor id required? # def edit_tag(self, id_: int, name: str, color: str): - # return self.sio.call('editTag', { + # return self._call('editTag', { # "id": id_, # "name": name, # "color": color # }) def delete_tag(self, id_: int): - return self.sio.call('deleteTag', id_) + return self._call('deleteTag', id_) def add_tag(self, name: str, color: str): - return self.sio.call('addTag', { + return self._call('addTag', { "name": name, "color": color, "new": True - }) + })["tag"] # settings def get_settings(self): - return self.sio.call('getSettings') + r = self._call('getSettings')["data"] + r = convert_from_socket(params_map_settings, r) + return r - def set_settings(self, data, password: str): - return self.sio.call('setSettings', (data, password)) + def set_settings( + self, + password: str, + + # about + check_update: bool = True, + check_beta: bool = False, + + # monitor history + keep_data_period_days: int = 180, + + # general + entry_page: str = "dashboard", + search_engine_index: bool = False, + primary_base_url: str = "", + steam_api_key: str = "", + + # notifications + tls_expiry_notify_days: list[int] = None, + + # security + disable_auth: bool = False + ): + if not tls_expiry_notify_days: + tls_expiry_notify_days = [7, 14, 21] + + data = { + "check_update": check_update, + "check_beta": check_beta, + "keep_data_period_days": keep_data_period_days, + "entry_page": entry_page, + "search_engine_index": search_engine_index, + "primary_base_url": primary_base_url, + "steam_api_key": steam_api_key, + "tls_expiry_notify_days": tls_expiry_notify_days, + "disable_auth": disable_auth + } + data = convert_to_socket(params_map_settings, data) + return self._call('setSettings', (data, password)) def change_password(self, old_password: str, new_password: str): - return self.sio.call('changePassword', { + return self._call('changePassword', { "currentPassword": old_password, "newPassword": new_password, }) @@ -607,52 +671,52 @@ class UptimeKumaApi(object): def upload_backup(self, json_data, import_handle: str): if import_handle not in ["overwrite", "skip", "keep"]: raise ValueError() - return self.sio.call('uploadBackup', (json_data, import_handle)) + return self._call('uploadBackup', (json_data, import_handle)) # 2FA def twofa_status(self): - return self.sio.call('twoFAStatus') + return self._call('twoFAStatus') def prepare_2fa(self, password: str): - return self.sio.call('prepare2FA', password) + return self._call('prepare2FA', password) def save_2fa(self, password: str): - return self.sio.call('save2FA', password) + return self._call('save2FA', password) def disable_2fa(self, password: str): - return self.sio.call('disable2FA', password) + return self._call('disable2FA', password) # login def login(self, username: str, password: str): - return self.sio.call('login', { + return self._call('login', { "username": username, "password": password, "token": "" }) def login_by_token(self, token: str): - return self.sio.call('loginByToken', token) + return self._call('loginByToken', token) def verify_token(self, token: str, password: str): - return self.sio.call('verifyToken', (token, password)) + return self._call('verifyToken', (token, password)) def logout(self): - return self.sio.call('logout') + return self._call('logout') # setup def need_setup(self): - return self.sio.call('needSetup') + return self._call('needSetup') def setup(self, username: str, password: str): - return self.sio.call("setup", (username, password)) + return self._call("setup", (username, password)) # database def get_database_size(self): - return self.sio.call('getDatabaseSize') + return self._call('getDatabaseSize') def shrink_database(self): - return self.sio.call('shrinkDatabase') + return self._call('shrinkDatabase') diff --git a/uptimekumaapi/converter.py b/uptimekumaapi/converter.py index 8aee2dc..ad14c96 100644 --- a/uptimekumaapi/converter.py +++ b/uptimekumaapi/converter.py @@ -24,12 +24,17 @@ params_map_monitor = { "mqttTopic": "mqtt_topic", "mqttSuccessMessage": "mqtt_success_message", "databaseConnectionString": "sqlserver_connection_string", - "sqlserverQuery": "sqlserver_query" + "sqlserverQuery": "sqlserver_query", + "authDomain": "auth_domain", + "authWorkstation": "auth_workstation", + "databaseQuery": "database_query", + "monitorID": "monitor_id" } params_map_notification = { "type": "type_", - "isDefault": "default" + "isDefault": "default", + "userId": "user_id" } params_map_notification_provider = { @@ -159,7 +164,9 @@ params_map_notification_provider = { } params_map_proxy = { - "applyExisting": "apply_existing" + "applyExisting": "apply_existing", + "createdDate": "created_date", + "userId": "user_id" } params_map_status_page = { @@ -174,7 +181,30 @@ params_map_status_page = { "domainNameList": "domain_name_list", "customCSS": "custom_css", "footerText": "footer_text", - "showPoweredBy": "show_powered_by" + "showPoweredBy": "show_powered_by", + "createdDate": "created_date" +} + +params_map_info = { + "latestVersion": "latest_version", + "primaryBaseURL": "primary_base_url" +} + +params_map_settings = { + # about + "checkUpdate": "check_update", + "checkBeta": "check_beta", + # monitor history + "keepDataPeriodDays": "keep_data_period_days", + # general + "entryPage": "entry_page", + "searchEngineIndex": "search_engine_index", + "primaryBaseURL": "primary_base_url", + "steamAPIKey": "steam_api_key", + # notifications + "tlsExpiryNotifyDays": "tls_expiry_notify_days", + # security + "disableAuth": "disable_auth" } diff --git a/uptimekumaapi/exceptions.py b/uptimekumaapi/exceptions.py new file mode 100644 index 0000000..b817fbc --- /dev/null +++ b/uptimekumaapi/exceptions.py @@ -0,0 +1,2 @@ +class UptimeKumaException(Exception): + pass