convert data from socket to python format

This commit is contained in:
lucasheld 2022-07-07 13:29:06 +02:00
parent 2aa6ac25a7
commit 71d6f5cdba
4 changed files with 202 additions and 104 deletions

View file

@ -4,5 +4,7 @@ from .notification_providers import NotificationType, notification_provider_opti
from .proxy_protocol import ProxyProtocol
from .incident_style import IncidentStyle
from .converter import convert_from_socket, convert_to_socket, params_map_monitor, params_map_notification, \
params_map_notification_provider, params_map_proxy, params_map_status_page
params_map_notification_provider, params_map_proxy, params_map_status_page, params_map_info, \
params_map_settings
from .exceptions import UptimeKumaException
from .api import UptimeKumaApi

View file

@ -8,7 +8,10 @@ from . import MonitorType
from . import NotificationType, notification_provider_options
from . import ProxyProtocol
from . import IncidentStyle
from . import convert_from_socket, convert_to_socket, params_map_monitor, params_map_notification, params_map_notification_provider, params_map_proxy, params_map_status_page
from . import convert_from_socket, convert_to_socket, params_map_monitor, params_map_notification, \
params_map_notification_provider, params_map_proxy, params_map_status_page, params_map_info, \
params_map_settings
from . import UptimeKumaException
def int_to_bool(data: list[dict] | dict, keys):
@ -273,6 +276,13 @@ class UptimeKumaApi(object):
time.sleep(0.01) # wait for multiple messages
return self._event_data[event]
def _call(self, event, data=None):
r = self.sio.call(event, data)
if type(r) == dict and not r["ok"]:
raise UptimeKumaException(r["msg"])
r.pop("ok")
return r
# event handlers
def _event_connect(self):
@ -348,160 +358,163 @@ class UptimeKumaApi(object):
# monitors
def get_monitors(self):
monitors = list(self._get_event_data("monitorList").values())
int_to_bool(monitors, ["active"])
return monitors
r = list(self._get_event_data("monitorList").values())
r = convert_from_socket(params_map_monitor, r)
int_to_bool(r, ["active"])
return r
def get_monitor(self, id_: int):
return self.sio.call('getMonitor', id_)
r = self._call('getMonitor', id_)["monitor"]
r = convert_from_socket(params_map_monitor, r)
int_to_bool(r, ["active"])
return r
def pause_monitor(self, id_: int):
return self.sio.call('pauseMonitor', id_)
r = self._call('pauseMonitor', id_)
return r
def resume_monitor(self, id_: int):
return self.sio.call('resumeMonitor', id_)
r = self._call('resumeMonitor', id_)
return r
def delete_monitor(self, id_: int):
return self.sio.call('deleteMonitor', id_)
return self._call('deleteMonitor', id_)
def get_monitor_beats(self, id_: int, period):
return self.sio.call('getMonitorBeats', (id_, period))
def get_monitor_beats(self, id_: int, hours: int):
r = self._call('getMonitorBeats', (id_, hours))["data"]
int_to_bool(r, ["important", "status"])
return r
def add_monitor(self, *args, **kwargs):
data = _build_monitor_data(*args, **kwargs)
return self.sio.call('add', data)
r = self._call('add', data)
r = convert_from_socket(params_map_monitor, r)
return r
def edit_monitor(self, id_: int, **kwargs):
monitor = self.get_monitor(id_)["monitor"]
data = self.get_monitor(id_)
kwargs_sock = convert_to_socket(params_map_monitor, kwargs)
monitor.update(kwargs_sock)
return self.sio.call('editMonitor', monitor)
data.update(kwargs_sock)
r = self._call('editMonitor', data)
r = convert_from_socket(params_map_monitor, r)
return r
# monitor tags
def add_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
return self.sio.call('addMonitorTag', (tag_id, monitor_id, value))
return self._call('addMonitorTag', (tag_id, monitor_id, value))
# TODO: check!
# editMonitorTag is unused in uptime-kuma
# def edit_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
# return self.sio.call('editMonitorTag', (tag_id, monitor_id, value))
# return self._call('editMonitorTag', (tag_id, monitor_id, value))
def delete_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
return self.sio.call('deleteMonitorTag', (tag_id, monitor_id, value))
return self._call('deleteMonitorTag', (tag_id, monitor_id, value))
# notifications
def get_notifications(self):
notifications_raw = self._get_event_data("notificationList")
notifications = []
for notification_raw in notifications_raw:
notifications = self._get_event_data("notificationList")
r = []
for notification_raw in notifications:
notification = notification_raw.copy()
config = json.loads(notification["config"])
del notification["config"]
notification.update(config)
notifications.append(notification)
return notifications
r.append(notification)
r = convert_from_socket(params_map_notification, r)
r = convert_from_socket(params_map_notification_provider, r)
return r
def get_notification(self, id_: int):
notifications = self.get_notifications()
for notification in notifications:
if notification["id"] == id_:
return notification
raise UptimeKumaException("notification does not exist")
def test_notification(self, *args, **kwargs):
data = _build_notification_data(*args, **kwargs)
return self.sio.call('testNotification', data)
return self._call('testNotification', data)
def add_notification(self, *args, **kwargs):
data = _build_notification_data(*args, **kwargs)
return self.sio.call('addNotification', (data, None))
return self._call('addNotification', (data, None))
def edit_notification(self, id_: int, **kwargs):
notification = self.get_notification(id_)
kwargs_sock = convert_to_socket(params_map_notification, kwargs)
kwargs_sock = convert_to_socket(params_map_notification_provider, kwargs_sock)
# remove old notification provider options from notification object
old_type = notification["type"]
new_type = kwargs_sock["type"]
if "type" in kwargs_sock and new_type != old_type:
if "type_" in kwargs and kwargs != notification["type_"]:
for provider in notification_provider_options:
provider_options = notification_provider_options[provider]
if provider != new_type:
if provider != kwargs:
for option in provider_options:
if option in notification:
del notification[option]
notification.update(kwargs_sock)
return self.sio.call('addNotification', (notification, id_))
notification.update(kwargs)
return self._call('addNotification', (notification, id_))
def delete_notification(self, id_: int):
return self.sio.call('deleteNotification', id_)
return self._call('deleteNotification', id_)
def check_apprise(self):
return self.sio.call('checkApprise')
return self._call('checkApprise')
# proxy
def get_proxies(self):
proxies = self._get_event_data("proxyList")
proxies = convert_from_socket(params_map_proxy, proxies)
int_to_bool(proxies, ["auth", "active", "default", "apply_existing"])
proxies = convert_to_socket(params_map_proxy, proxies)
return proxies
r = self._get_event_data("proxyList")
r = convert_from_socket(params_map_proxy, r)
int_to_bool(r, ["auth", "active", "default", "apply_existing"])
return r
def get_proxy(self, id_: int):
proxies = self.get_proxies()
for proxy in proxies:
if proxy["id"] == id_:
return proxy
raise UptimeKumaException("proxy does not exist")
def add_proxy(self, *args, **kwargs):
data = _build_proxy_data(*args, **kwargs)
return self.sio.call('addProxy', (data, None))
return self._call('addProxy', (data, None))
def edit_proxy(self, id_: int, **kwargs):
proxy_sock = self.get_proxy(id_)
proxy = convert_from_socket(params_map_proxy, proxy_sock)
for key in ["auth", "active", "default", "apply_existing"]:
if key in proxy:
proxy[key] = True if proxy[key] == 1 else False
kwargs_sock = convert_to_socket(params_map_proxy, kwargs)
proxy_sock = convert_to_socket(params_map_proxy, proxy)
proxy_sock.update(kwargs_sock)
return self.sio.call('addProxy', (proxy_sock, id_))
# return proxy_sock, id_
proxy = self.get_proxy(id_)
proxy.update(kwargs)
return self._call('addProxy', (proxy, id_))
def delete_proxy(self, id_: int):
return self.sio.call('deleteProxy', id_)
return self._call('deleteProxy', id_)
# status page
def get_status_pages(self):
return list(self._get_event_data("statusPageList").values())
r = list(self._get_event_data("statusPageList").values())
r = convert_from_socket(params_map_status_page, r)
return r
def get_status_page(self, slug: str):
r = self.sio.call('getStatusPage', slug)
if r["ok"]:
config = r["config"]
del r["config"]
r.update(config)
r = self._call('getStatusPage', slug)
config = r["config"]
del r["config"]
r.update(config)
r = convert_from_socket(params_map_status_page, r)
return r
def add_status_page(self, slug: str, title: str):
return self.sio.call('addStatusPage', (title, slug))
return self._call('addStatusPage', (title, slug))
def delete_status_page(self, slug: str):
return self.sio.call('deleteStatusPage', slug)
return self._call('deleteStatusPage', slug)
def save_status_page(self, slug: str, **kwargs):
status_page = self.get_status_page(slug)
status_page.pop("ok")
kwargs_sock = convert_to_socket(params_map_status_page, kwargs)
status_page.update(kwargs_sock)
status_page = convert_from_socket(params_map_status_page, status_page)
status_page.update(kwargs)
data = _build_status_page_data(**status_page)
return self.sio.call('saveStatusPage', data)
return self._call('saveStatusPage', data)
def post_incident(
self,
@ -515,25 +528,34 @@ class UptimeKumaApi(object):
"content": content,
"style": style
}
r = self.sio.call('postIncident', (slug, incident))
r = self._call('postIncident', (slug, incident))["incident"]
r = convert_from_socket(params_map_status_page, r)
self.save_status_page(slug)
return r
def unpin_incident(self, slug: str):
r = self.sio.call('unpinIncident', slug)
r = self._call('unpinIncident', slug)
self.save_status_page(slug)
return r
# heartbeat
def get_heartbeats(self):
return self._get_event_data("heartbeatList")
r = self._get_event_data("heartbeatList")
for i in r:
int_to_bool(i["data"], ["important", "status"])
return r
def get_important_heartbeats(self):
return self._get_event_data("importantHeartbeatList")
r = self._get_event_data("importantHeartbeatList")
for i in r:
int_to_bool(i["data"], ["important", "status"])
return r
def get_heartbeat(self):
return self._get_event_data("heartbeat")
r = self._get_event_data("heartbeat")
int_to_bool(r, ["important", "status"])
return r
# avg ping
@ -548,58 +570,100 @@ class UptimeKumaApi(object):
# info
def info(self):
return self._get_event_data("info")
r = self._get_event_data("info")
r = convert_from_socket(params_map_info, r)
return r
# clear
def clear_events(self):
return self.sio.call('clearEvents')
def clear_events(self, monitor_id: int):
return self._call('clearEvents', monitor_id)
def clear_heartbeats(self):
return self.sio.call('clearHeartbeats')
def clear_heartbeats(self, monitor_id: int):
return self._call('clearHeartbeats', monitor_id)
def clear_statistics(self):
return self.sio.call('clearStatistics')
return self._call('clearStatistics')
# tags
def get_tags(self):
return self.sio.call('getTags')
return self._call('getTags')["tags"]
def get_tag(self, id_: int):
tags = self.get_tags()
for tag in tags:
if tag["id"] == id_:
return tag
raise UptimeKumaException("tag does not exist")
# TODO: not working, monitor id required?
# not working, monitor id required?
# def edit_tag(self, id_: int, name: str, color: str):
# return self.sio.call('editTag', {
# return self._call('editTag', {
# "id": id_,
# "name": name,
# "color": color
# })
def delete_tag(self, id_: int):
return self.sio.call('deleteTag', id_)
return self._call('deleteTag', id_)
def add_tag(self, name: str, color: str):
return self.sio.call('addTag', {
return self._call('addTag', {
"name": name,
"color": color,
"new": True
})
})["tag"]
# settings
def get_settings(self):
return self.sio.call('getSettings')
r = self._call('getSettings')["data"]
r = convert_from_socket(params_map_settings, r)
return r
def set_settings(self, data, password: str):
return self.sio.call('setSettings', (data, password))
def set_settings(
self,
password: str,
# about
check_update: bool = True,
check_beta: bool = False,
# monitor history
keep_data_period_days: int = 180,
# general
entry_page: str = "dashboard",
search_engine_index: bool = False,
primary_base_url: str = "",
steam_api_key: str = "",
# notifications
tls_expiry_notify_days: list[int] = None,
# security
disable_auth: bool = False
):
if not tls_expiry_notify_days:
tls_expiry_notify_days = [7, 14, 21]
data = {
"check_update": check_update,
"check_beta": check_beta,
"keep_data_period_days": keep_data_period_days,
"entry_page": entry_page,
"search_engine_index": search_engine_index,
"primary_base_url": primary_base_url,
"steam_api_key": steam_api_key,
"tls_expiry_notify_days": tls_expiry_notify_days,
"disable_auth": disable_auth
}
data = convert_to_socket(params_map_settings, data)
return self._call('setSettings', (data, password))
def change_password(self, old_password: str, new_password: str):
return self.sio.call('changePassword', {
return self._call('changePassword', {
"currentPassword": old_password,
"newPassword": new_password,
})
@ -607,52 +671,52 @@ class UptimeKumaApi(object):
def upload_backup(self, json_data, import_handle: str):
if import_handle not in ["overwrite", "skip", "keep"]:
raise ValueError()
return self.sio.call('uploadBackup', (json_data, import_handle))
return self._call('uploadBackup', (json_data, import_handle))
# 2FA
def twofa_status(self):
return self.sio.call('twoFAStatus')
return self._call('twoFAStatus')
def prepare_2fa(self, password: str):
return self.sio.call('prepare2FA', password)
return self._call('prepare2FA', password)
def save_2fa(self, password: str):
return self.sio.call('save2FA', password)
return self._call('save2FA', password)
def disable_2fa(self, password: str):
return self.sio.call('disable2FA', password)
return self._call('disable2FA', password)
# login
def login(self, username: str, password: str):
return self.sio.call('login', {
return self._call('login', {
"username": username,
"password": password,
"token": ""
})
def login_by_token(self, token: str):
return self.sio.call('loginByToken', token)
return self._call('loginByToken', token)
def verify_token(self, token: str, password: str):
return self.sio.call('verifyToken', (token, password))
return self._call('verifyToken', (token, password))
def logout(self):
return self.sio.call('logout')
return self._call('logout')
# setup
def need_setup(self):
return self.sio.call('needSetup')
return self._call('needSetup')
def setup(self, username: str, password: str):
return self.sio.call("setup", (username, password))
return self._call("setup", (username, password))
# database
def get_database_size(self):
return self.sio.call('getDatabaseSize')
return self._call('getDatabaseSize')
def shrink_database(self):
return self.sio.call('shrinkDatabase')
return self._call('shrinkDatabase')

View file

@ -24,12 +24,17 @@ params_map_monitor = {
"mqttTopic": "mqtt_topic",
"mqttSuccessMessage": "mqtt_success_message",
"databaseConnectionString": "sqlserver_connection_string",
"sqlserverQuery": "sqlserver_query"
"sqlserverQuery": "sqlserver_query",
"authDomain": "auth_domain",
"authWorkstation": "auth_workstation",
"databaseQuery": "database_query",
"monitorID": "monitor_id"
}
params_map_notification = {
"type": "type_",
"isDefault": "default"
"isDefault": "default",
"userId": "user_id"
}
params_map_notification_provider = {
@ -159,7 +164,9 @@ params_map_notification_provider = {
}
params_map_proxy = {
"applyExisting": "apply_existing"
"applyExisting": "apply_existing",
"createdDate": "created_date",
"userId": "user_id"
}
params_map_status_page = {
@ -174,7 +181,30 @@ params_map_status_page = {
"domainNameList": "domain_name_list",
"customCSS": "custom_css",
"footerText": "footer_text",
"showPoweredBy": "show_powered_by"
"showPoweredBy": "show_powered_by",
"createdDate": "created_date"
}
params_map_info = {
"latestVersion": "latest_version",
"primaryBaseURL": "primary_base_url"
}
params_map_settings = {
# about
"checkUpdate": "check_update",
"checkBeta": "check_beta",
# monitor history
"keepDataPeriodDays": "keep_data_period_days",
# general
"entryPage": "entry_page",
"searchEngineIndex": "search_engine_index",
"primaryBaseURL": "primary_base_url",
"steamAPIKey": "steam_api_key",
# notifications
"tlsExpiryNotifyDays": "tls_expiry_notify_days",
# security
"disableAuth": "disable_auth"
}

View file

@ -0,0 +1,2 @@
class UptimeKumaException(Exception):
pass