Merge branch 'dev'

This commit is contained in:
lucasheld 2022-08-26 17:07:55 +02:00
commit def2143cf2
13 changed files with 418 additions and 103 deletions

View file

@ -15,10 +15,15 @@ here = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(here, "uptime_kuma_api", "__version__.py"), "r", "utf-8") as f: with open(os.path.join(here, "uptime_kuma_api", "__version__.py"), "r", "utf-8") as f:
exec(f.read(), info) exec(f.read(), info)
with open("README.md", "r", "utf-8") as f:
readme = f.read()
setup( setup(
name=info["__title__"], name=info["__title__"],
version=info["__version__"], version=info["__version__"],
description="A python wrapper for the Uptime Kuma WebSocket API", description="A python wrapper for the Uptime Kuma WebSocket API",
long_description=readme,
long_description_content_type="text/markdown",
url="https://github.com/lucasheld/uptime-kuma-api", url="https://github.com/lucasheld/uptime-kuma-api",
author=info["__author__"], author=info["__author__"],
author_email="lucasheld@hotmail.de", author_email="lucasheld@hotmail.de",

1
tests/requirements.txt Normal file
View file

@ -0,0 +1 @@
pyotp==2.6.0

56
tests/test_2fa.py Normal file
View file

@ -0,0 +1,56 @@
import unittest
from urllib import parse
import pyotp
from uptime_kuma_test_case import UptimeKumaTestCase
def parse_secret(uri):
query = parse.urlsplit(uri).query
params = dict(parse.parse_qsl(query))
return params["secret"]
def generate_token(secret):
totp = pyotp.TOTP(secret)
return totp.now()
class Test2FA(UptimeKumaTestCase):
def test_2fa(self):
# check 2fa is disabled
r = self.api.twofa_status()
self.assertEqual(r["status"], False)
# prepare 2fa
r = self.api.prepare_2fa(self.password)
uri = r["uri"]
self.assertTrue(uri.startswith("otpauth://totp/"))
secret = parse_secret(uri)
# verify token
token = generate_token(secret)
r = self.api.verify_token(token, self.password)
self.assertEqual(r["valid"], True)
# save 2fa
r = self.api.save_2fa(self.password)
self.assertEqual(r["msg"], "2FA Enabled.")
# check 2fa is enabled
r = self.api.twofa_status()
self.assertEqual(r["status"], True)
# relogin using the totp token
self.api.logout()
token = generate_token(secret)
self.api.login(self.username, self.password, token)
# disable 2fa
r = self.api.disable_2fa(self.password)
self.assertEqual(r["msg"], "2FA Disabled.")
if __name__ == '__main__':
unittest.main()

13
tests/test_avg_ping.py Normal file
View file

@ -0,0 +1,13 @@
import unittest
from uptime_kuma_test_case import UptimeKumaTestCase
class TestAvgPing(UptimeKumaTestCase):
def test_avg_ping(self):
self.add_monitor()
self.api.avg_ping()
if __name__ == '__main__':
unittest.main()

16
tests/test_database.py Normal file
View file

@ -0,0 +1,16 @@
import unittest
from uptime_kuma_test_case import UptimeKumaTestCase
class TestDatabase(UptimeKumaTestCase):
def test_get_database_size(self):
r = self.api.get_database_size()
self.assertIn("size", r)
def test_shrink_database(self):
self.api.shrink_database()
if __name__ == '__main__':
unittest.main()

View file

@ -1,23 +1,19 @@
import unittest import unittest
from uptime_kuma_api import UptimeKumaException from uptime_kuma_api import UptimeKumaException, MonitorType
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
class TestMonitor(UptimeKumaTestCase): class TestMonitor(UptimeKumaTestCase):
def test_monitor(self): def test_monitor(self):
expected_monitor = { expected_monitor = {
"type": "http", "type": MonitorType.HTTP,
"name": "monitor 1", "name": "monitor 1",
"url": "http://192.168.20.135" "url": "http://127.0.0.1"
} }
# add monitor # add monitor
r = self.api.add_monitor( r = self.api.add_monitor(**expected_monitor)
type=expected_monitor["type"],
name=expected_monitor["name"],
url=expected_monitor["url"]
)
self.assertEqual(r["msg"], "Added Successfully.") self.assertEqual(r["msg"], "Added Successfully.")
monitor_id = r["monitorID"] monitor_id = r["monitorID"]
@ -34,7 +30,7 @@ class TestMonitor(UptimeKumaTestCase):
# edit monitor # edit monitor
expected_monitor["type"] = "ping" expected_monitor["type"] = "ping"
expected_monitor["name"] = "monitor 1 new" expected_monitor["name"] = "monitor 1 new"
expected_monitor["hostname"] = "127.0.0.1" expected_monitor["hostname"] = "127.0.0.10"
del expected_monitor["url"] del expected_monitor["url"]
r = self.api.edit_monitor(monitor_id, **expected_monitor) r = self.api.edit_monitor(monitor_id, **expected_monitor)
self.assertEqual(r["msg"], "Saved.") self.assertEqual(r["msg"], "Saved.")
@ -49,12 +45,108 @@ class TestMonitor(UptimeKumaTestCase):
r = self.api.resume_monitor(monitor_id) r = self.api.resume_monitor(monitor_id)
self.assertEqual(r["msg"], "Resumed Successfully.") self.assertEqual(r["msg"], "Resumed Successfully.")
# get monitor beats
self.api.get_monitor_beats(monitor_id, 6)
# delete monitor # delete monitor
r = self.api.delete_monitor(monitor_id) r = self.api.delete_monitor(monitor_id)
self.assertEqual(r["msg"], "Deleted Successfully.") self.assertEqual(r["msg"], "Deleted Successfully.")
with self.assertRaises(UptimeKumaException): with self.assertRaises(UptimeKumaException):
self.api.get_monitor(monitor_id) self.api.get_monitor(monitor_id)
def do_test_monitor_type(self, expected_monitor):
r = self.api.add_monitor(**expected_monitor)
self.assertEqual(r["msg"], "Added Successfully.")
monitor_id = r["monitorID"]
monitor = self.api.get_monitor(monitor_id)
self.compare(monitor, expected_monitor)
def test_monitor_type_http(self):
expected_monitor = {
"type": MonitorType.HTTP,
"name": "monitor 1",
"url": "http://127.0.0.1"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_port(self):
expected_monitor = {
"type": MonitorType.PORT,
"name": "monitor 1",
"hostname": "127.0.0.1",
"port": 8888
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_ping(self):
expected_monitor = {
"type": MonitorType.PING,
"name": "monitor 1",
"hostname": "127.0.0.1",
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_keyword(self):
expected_monitor = {
"type": MonitorType.KEYWORD,
"name": "monitor 1",
"url": "http://127.0.0.1",
"keyword": "healthy"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_dns(self):
expected_monitor = {
"type": MonitorType.DNS,
"name": "monitor 1",
"url": "http://127.0.0.1",
"hostname": "127.0.0.1",
"port": 8888,
"dns_resolve_server": "1.1.1.1",
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_push(self):
expected_monitor = {
"type": MonitorType.PUSH,
"name": "monitor 1",
"url": "http://127.0.0.1"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_steam(self):
expected_monitor = {
"type": MonitorType.STEAM,
"name": "monitor 1",
"url": "http://127.0.0.1",
"hostname": "127.0.0.1",
"port": 8888,
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_mqtt(self):
expected_monitor = {
"type": MonitorType.MQTT,
"name": "monitor 1",
"url": "http://127.0.0.1",
"hostname": "127.0.0.1",
"port": 8888,
"mqttTopic": "test"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_sqlserver(self):
expected_monitor = {
"type": MonitorType.SQLSERVER,
"name": "monitor 1",
"url": "http://127.0.0.1",
"databaseConnectionString": "Server=127.0.0.1,8888;Database=test;User Id=1;Password=secret123;Encrypt=true;"
"TrustServerCertificate=Yes;Connection Timeout=5",
"databaseQuery": "select getdate()"
}
self.do_test_monitor_type(expected_monitor)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -1,4 +1,5 @@
import unittest import unittest
import json
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
@ -13,6 +14,32 @@ class TestSettings(UptimeKumaTestCase):
settings = self.api.get_settings() settings = self.api.get_settings()
self.assertEqual(settings["checkUpdate"], expected_check_update) self.assertEqual(settings["checkUpdate"], expected_check_update)
def test_change_password(self):
new_password = "321terces"
# change password
r = self.api.change_password(self.password, new_password)
self.assertEqual(r["msg"], "Password has been updated successfully.")
# check login
r = self.api.login(self.username, new_password)
self.assertIn("token", r)
# restore password
r = self.api.change_password(new_password, self.password)
self.assertEqual(r["msg"], "Password has been updated successfully.")
def test_upload_backup(self):
data = {
"version": "1.17.1",
"notificationList": [],
"monitorList": [],
"proxyList": []
}
data_str = json.dumps(data)
r = self.api.upload_backup(data_str, "overwrite")
self.assertEqual(r["msg"], "Backup successfully restored.")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -6,12 +6,25 @@ from uptime_kuma_test_case import UptimeKumaTestCase
class TestStatusPage(UptimeKumaTestCase): class TestStatusPage(UptimeKumaTestCase):
def test_status_page(self): def test_status_page(self):
monitor_id = self.add_monitor()
slug = "slug1" slug = "slug1"
expected_status_page = { expected_status_page = {
"slug": slug, "slug": slug,
"title": "status page 1", "title": "status page 1",
"description": "description 1", "description": "description 1",
"showPoweredBy": False "showPoweredBy": False,
"publicGroupList": [
{
'name': 'Services',
'weight': 1,
'monitorList': [
{
"id": monitor_id
}
]
}
]
} }
# slug must be unique # slug must be unique
@ -35,7 +48,9 @@ class TestStatusPage(UptimeKumaTestCase):
status_pages = self.api.get_status_pages() status_pages = self.api.get_status_pages()
status_page = self.find_by_id(status_pages, slug, "slug") status_page = self.find_by_id(status_pages, slug, "slug")
self.assertIsNotNone(status_page) self.assertIsNotNone(status_page)
self.compare(status_page, expected_status_page) # publicGroupList and incident is not available in status pages
expected_status_page_config = {i: expected_status_page[i] for i in expected_status_page if i != "publicGroupList"}
self.compare(status_page, expected_status_page_config)
# edit status page # edit status page
expected_status_page["title"] = "status page 1 new" expected_status_page["title"] = "status page 1 new"
@ -52,9 +67,13 @@ class TestStatusPage(UptimeKumaTestCase):
} }
incident = self.api.post_incident(slug, **incident_expected) incident = self.api.post_incident(slug, **incident_expected)
self.compare(incident, incident_expected) self.compare(incident, incident_expected)
status_page = self.api.get_status_page(slug)
self.compare(status_page["incident"], incident)
# unpin incident # unpin incident
self.api.unpin_incident(slug) self.api.unpin_incident(slug)
status_page = self.api.get_status_page(slug)
self.assertIsNone(status_page["incident"])
# delete status page # delete status page
self.api.delete_status_page(slug) self.api.delete_status_page(slug)

16
tests/test_uptime.py Normal file
View file

@ -0,0 +1,16 @@
import unittest
from uptime_kuma_test_case import UptimeKumaTestCase
class TestUptime(UptimeKumaTestCase):
def test_uptime_without_monitor(self):
self.api.uptime()
def test_uptime_with_monitor(self):
self.add_monitor()
self.api.uptime()
if __name__ == '__main__':
unittest.main()

View file

@ -1,37 +1,62 @@
import json
import unittest import unittest
from uptime_kuma_api import UptimeKumaApi from uptime_kuma_api import UptimeKumaApi, Event, MonitorType
token = None token = None
def compare(subset, superset):
for key, value in subset.items():
value2 = superset.get(key)
if type(value) == list:
for i in range(len(value)):
if not value2 or not compare(value[i], value2[i]):
return False
elif type(value) == dict:
if not compare(value, value2):
return False
else:
if value != value2:
return False
return True
class UptimeKumaTestCase(unittest.TestCase): class UptimeKumaTestCase(unittest.TestCase):
api = None api = None
url = "http://127.0.0.1:3001" url = "http://127.0.0.1:3001"
username = "testuser" username = "admin"
password = "zS7zhQSc" password = "secret123"
@classmethod def setUp(self):
def setUpClass(cls): self.api = UptimeKumaApi(self.url)
cls.api = UptimeKumaApi(cls.url)
global token global token
if not token: if not token:
if cls.api.need_setup(): if self.api.need_setup():
cls.api.setup(cls.username, cls.password) self.api.setup(self.username, self.password)
r = cls.api.login(cls.username, cls.password) r = self.api.login(self.username, self.password)
token = r["token"] token = r["token"]
cls.api.login_by_token(token) self.api.login_by_token(token)
@classmethod data = {
def tearDownClass(cls): "version": "1.17.1",
cls.api.logout() "notificationList": [],
cls.api.disconnect() "monitorList": [],
"proxyList": []
}
data_str = json.dumps(data)
r = self.api.upload_backup(data_str, "overwrite")
self.assertEqual(r["msg"], "Backup successfully restored.")
self.api._event_data[Event.MONITOR_LIST] = {}
def tearDown(self):
self.api.disconnect()
def compare(self, superset, subset): def compare(self, superset, subset):
self.assertTrue(subset.items() <= superset.items()) self.assertTrue(compare(subset, superset))
def find_by_id(self, objects, value, key="id"): def find_by_id(self, objects, value, key="id"):
for obj in objects: for obj in objects:
@ -39,7 +64,7 @@ class UptimeKumaTestCase(unittest.TestCase):
return obj return obj
def add_monitor(self): def add_monitor(self):
r = self.api.add_monitor(type="http", name="monitor 1", url="http://127.0.0.1") r = self.api.add_monitor(type=MonitorType.HTTP, name="monitor 1", url="http://127.0.0.1")
monitor_id = r["monitorID"] monitor_id = r["monitorID"]
return monitor_id return monitor_id

View file

@ -5,4 +5,5 @@ from .notification_providers import NotificationType, notification_provider_opti
from .proxy_protocol import ProxyProtocol from .proxy_protocol import ProxyProtocol
from .incident_style import IncidentStyle from .incident_style import IncidentStyle
from .exceptions import UptimeKumaException from .exceptions import UptimeKumaException
from .event import Event
from .api import UptimeKumaApi from .api import UptimeKumaApi

View file

@ -1,6 +1,7 @@
import json import json
import time import time
import requests
import socketio import socketio
from . import AuthMethod from . import AuthMethod
@ -8,6 +9,7 @@ from . import MonitorType
from . import NotificationType, notification_provider_options from . import NotificationType, notification_provider_options
from . import ProxyProtocol from . import ProxyProtocol
from . import IncidentStyle from . import IncidentStyle
from . import Event
from . import UptimeKumaException from . import UptimeKumaException
@ -225,18 +227,14 @@ def _build_status_page_data(
showPoweredBy: bool = True, showPoweredBy: bool = True,
icon: str = "/icon.svg", icon: str = "/icon.svg",
monitors: list = None publicGroupList: list = None
): ):
if theme not in ["light", "dark"]: if theme not in ["light", "dark"]:
raise ValueError raise ValueError
if not domainNameList: if not domainNameList:
domainNameList = [] domainNameList = []
public_group_list = [] if not publicGroupList:
if monitors: publicGroupList = []
public_group_list.append({
"name": "Services",
"monitorList": monitors
})
config = { config = {
"id": id, "id": id,
"slug": slug, "slug": slug,
@ -251,7 +249,7 @@ def _build_status_page_data(
"footerText": footerText, "footerText": footerText,
"showPoweredBy": showPoweredBy "showPoweredBy": showPoweredBy
} }
return slug, config, icon, public_group_list return slug, config, icon, publicGroupList
def _check_missing_arguments(required_params, kwargs): def _check_missing_arguments(required_params, kwargs):
@ -365,38 +363,45 @@ def _check_arguments_proxy(kwargs):
class UptimeKumaApi(object): class UptimeKumaApi(object):
def __init__(self, url): def __init__(self, url):
self.url = url
self.sio = socketio.Client() self.sio = socketio.Client()
self._event_data: dict = { self._event_data: dict = {
"monitorList": None, Event.MONITOR_LIST: None,
"notificationList": None, Event.NOTIFICATION_LIST: None,
"proxyList": None, Event.PROXY_LIST: None,
"statusPageList": None, Event.STATUS_PAGE_LIST: None,
"heartbeatList": None, Event.HEARTBEAT_LIST: None,
"importantHeartbeatList": None, Event.IMPORTANT_HEARTBEAT_LIST: None,
"avgPing": None, Event.AVG_PING: None,
"uptime": None, Event.UPTIME: None,
"heartbeat": None, Event.HEARTBEAT: None,
"info": None, Event.INFO: None,
Event.CERT_INFO: None
} }
self.sio.on("connect", self._event_connect) self.sio.on(Event.CONNECT, self._event_connect)
self.sio.on("disconnect", self._event_disconnect) self.sio.on(Event.DISCONNECT, self._event_disconnect)
self.sio.on("monitorList", self._event_monitor_list) self.sio.on(Event.MONITOR_LIST, self._event_monitor_list)
self.sio.on("notificationList", self._event_notification_list) self.sio.on(Event.NOTIFICATION_LIST, self._event_notification_list)
self.sio.on("proxyList", self._event_proxy_list) self.sio.on(Event.PROXY_LIST, self._event_proxy_list)
self.sio.on("statusPageList", self._event_status_page_list) self.sio.on(Event.STATUS_PAGE_LIST, self._event_status_page_list)
self.sio.on("heartbeatList", self._event_heartbeat_list) self.sio.on(Event.HEARTBEAT_LIST, self._event_heartbeat_list)
self.sio.on("importantHeartbeatList", self._event_important_heartbeat_list) self.sio.on(Event.IMPORTANT_HEARTBEAT_LIST, self._event_important_heartbeat_list)
self.sio.on("avgPing", self._event_avg_ping) self.sio.on(Event.AVG_PING, self._event_avg_ping)
self.sio.on("uptime", self._event_uptime) self.sio.on(Event.UPTIME, self._event_uptime)
self.sio.on("heartbeat", self._event_heartbeat) self.sio.on(Event.HEARTBEAT, self._event_heartbeat)
self.sio.on("info", self._event_info) self.sio.on(Event.INFO, self._event_info)
self.sio.on(Event.CERT_INFO, self._event_cert_info)
self.connect(url) self.connect()
def _get_event_data(self, event): def _get_event_data(self, event):
monitor_events = [Event.AVG_PING, Event.UPTIME, Event.HEARTBEAT_LIST, Event.IMPORTANT_HEARTBEAT_LIST, Event.CERT_INFO, Event.HEARTBEAT]
while self._event_data[event] is None: while self._event_data[event] is None:
# do not wait for events that are not sent
if self._event_data[Event.MONITOR_LIST] == {} and event in monitor_events:
return []
time.sleep(0.01) time.sleep(0.01)
time.sleep(0.01) # wait for multiple messages time.sleep(0.01) # wait for multiple messages
return self._event_data[event] return self._event_data[event]
@ -418,65 +423,76 @@ class UptimeKumaApi(object):
pass pass
def _event_monitor_list(self, data): def _event_monitor_list(self, data):
self._event_data["monitorList"] = data self._event_data[Event.MONITOR_LIST] = data
def _event_notification_list(self, data): def _event_notification_list(self, data):
self._event_data["notificationList"] = data self._event_data[Event.NOTIFICATION_LIST] = data
def _event_proxy_list(self, data): def _event_proxy_list(self, data):
self._event_data["proxyList"] = data self._event_data[Event.PROXY_LIST] = data
def _event_status_page_list(self, data): def _event_status_page_list(self, data):
self._event_data["statusPageList"] = data self._event_data[Event.STATUS_PAGE_LIST] = data
def _event_heartbeat_list(self, id_, data, bool_): def _event_heartbeat_list(self, id_, data, bool_):
if self._event_data["heartbeatList"] is None: if self._event_data[Event.HEARTBEAT_LIST] is None:
self._event_data["heartbeatList"] = [] self._event_data[Event.HEARTBEAT_LIST] = []
self._event_data["heartbeatList"].append({ self._event_data[Event.HEARTBEAT_LIST].append({
"id": id_, "id": id_,
"data": data, "data": data,
"bool": bool_, "bool": bool_,
}) })
def _event_important_heartbeat_list(self, id_, data, bool_): def _event_important_heartbeat_list(self, id_, data, bool_):
if self._event_data["importantHeartbeatList"] is None: if self._event_data[Event.IMPORTANT_HEARTBEAT_LIST] is None:
self._event_data["importantHeartbeatList"] = [] self._event_data[Event.IMPORTANT_HEARTBEAT_LIST] = []
self._event_data["importantHeartbeatList"].append({ self._event_data[Event.IMPORTANT_HEARTBEAT_LIST].append({
"id": id_, "id": id_,
"data": data, "data": data,
"bool": bool_, "bool": bool_,
}) })
def _event_avg_ping(self, id_, data): def _event_avg_ping(self, id_, data):
if self._event_data["avgPing"] is None: if self._event_data[Event.AVG_PING] is None:
self._event_data["avgPing"] = [] self._event_data[Event.AVG_PING] = []
self._event_data["avgPing"].append({ self._event_data[Event.AVG_PING].append({
"id": id_, "id": id_,
"data": data, "data": data,
}) })
def _event_uptime(self, id_, hours_24, days_30): def _event_uptime(self, id_, hours_24, days_30):
if self._event_data["uptime"] is None: if self._event_data[Event.UPTIME] is None:
self._event_data["uptime"] = [] self._event_data[Event.UPTIME] = []
self._event_data["uptime"].append({ self._event_data[Event.UPTIME].append({
"id": id_, "id": id_,
"hours_24": hours_24, "hours_24": hours_24,
"days_30": days_30, "days_30": days_30,
}) })
def _event_heartbeat(self, data): def _event_heartbeat(self, data):
if self._event_data["heartbeat"] is None: if self._event_data[Event.HEARTBEAT] is None:
self._event_data["heartbeat"] = [] self._event_data[Event.HEARTBEAT] = []
self._event_data["heartbeat"].append(data) self._event_data[Event.HEARTBEAT].append(data)
def _event_info(self, data): def _event_info(self, data):
self._event_data["info"] = data self._event_data[Event.INFO] = data
def _event_cert_info(self, id_, data):
if self._event_data[Event.CERT_INFO] is None:
self._event_data[Event.CERT_INFO] = []
self._event_data[Event.CERT_INFO].append({
"id": id_,
"data": data,
})
# connection # connection
def connect(self, url: str): def connect(self):
url = url.rstrip("/") url = self.url.rstrip("/")
self.sio.connect(f'{url}/socket.io/') try:
self.sio.connect(f'{url}/socket.io/')
except:
print("")
def disconnect(self): def disconnect(self):
self.sio.disconnect() self.sio.disconnect()
@ -484,7 +500,7 @@ class UptimeKumaApi(object):
# monitors # monitors
def get_monitors(self): def get_monitors(self):
r = list(self._get_event_data("monitorList").values()) r = list(self._get_event_data(Event.MONITOR_LIST).values())
int_to_bool(r, ["active"]) int_to_bool(r, ["active"])
return r return r
@ -535,7 +551,7 @@ class UptimeKumaApi(object):
# notifications # notifications
def get_notifications(self): def get_notifications(self):
notifications = self._get_event_data("notificationList") notifications = self._get_event_data(Event.NOTIFICATION_LIST)
r = [] r = []
for notification_raw in notifications: for notification_raw in notifications:
notification = notification_raw.copy() notification = notification_raw.copy()
@ -589,7 +605,7 @@ class UptimeKumaApi(object):
# proxy # proxy
def get_proxies(self): def get_proxies(self):
r = self._get_event_data("proxyList") r = self._get_event_data(Event.PROXY_LIST)
int_to_bool(r, ["auth", "active", "default", "applyExisting"]) int_to_bool(r, ["auth", "active", "default", "applyExisting"])
return r return r
@ -618,15 +634,20 @@ class UptimeKumaApi(object):
# status page # status page
def get_status_pages(self): def get_status_pages(self):
r = list(self._get_event_data("statusPageList").values()) r = list(self._get_event_data(Event.STATUS_PAGE_LIST).values())
return r return r
def get_status_page(self, slug: str): def get_status_page(self, slug: str):
r = self._call('getStatusPage', slug) r1 = self._call('getStatusPage', slug)
config = r["config"] r2 = requests.get(f"{self.url}/api/status-page/{slug}").json()
del r["config"]
r.update(config) config = r1["config"]
return r config.update(r2["config"])
return {
**config,
"incident": r2["incident"],
"publicGroupList": r2["publicGroupList"]
}
def add_status_page(self, slug: str, title: str): def add_status_page(self, slug: str, title: str):
return self._call('addStatusPage', (title, slug)) return self._call('addStatusPage', (title, slug))
@ -636,6 +657,7 @@ class UptimeKumaApi(object):
def save_status_page(self, slug: str, **kwargs): def save_status_page(self, slug: str, **kwargs):
status_page = self.get_status_page(slug) status_page = self.get_status_page(slug)
status_page.pop("incident")
status_page.update(kwargs) status_page.update(kwargs)
data = _build_status_page_data(**status_page) data = _build_status_page_data(**status_page)
return self._call('saveStatusPage', data) return self._call('saveStatusPage', data)
@ -664,36 +686,41 @@ class UptimeKumaApi(object):
# heartbeat # heartbeat
def get_heartbeats(self): def get_heartbeats(self):
r = self._get_event_data("heartbeatList") r = self._get_event_data(Event.HEARTBEAT_LIST)
for i in r: for i in r:
int_to_bool(i["data"], ["important", "status"]) int_to_bool(i["data"], ["important", "status"])
return r return r
def get_important_heartbeats(self): def get_important_heartbeats(self):
r = self._get_event_data("importantHeartbeatList") r = self._get_event_data(Event.IMPORTANT_HEARTBEAT_LIST)
for i in r: for i in r:
int_to_bool(i["data"], ["important", "status"]) int_to_bool(i["data"], ["important", "status"])
return r return r
def get_heartbeat(self): def get_heartbeat(self):
r = self._get_event_data("heartbeat") r = self._get_event_data(Event.HEARTBEAT)
int_to_bool(r, ["important", "status"]) int_to_bool(r, ["important", "status"])
return r return r
# avg ping # avg ping
def avg_ping(self): def avg_ping(self):
return self._get_event_data("avgPing") return self._get_event_data(Event.AVG_PING)
# cert info
def cert_info(self):
return self._get_event_data(Event.CERT_INFO)
# uptime # uptime
def uptime(self): def uptime(self):
return self._get_event_data("uptime") return self._get_event_data(Event.UPTIME)
# info # info
def info(self): def info(self):
r = self._get_event_data("info") r = self._get_event_data(Event.INFO)
return r return r
# clear # clear
@ -788,7 +815,7 @@ class UptimeKumaApi(object):
"newPassword": new_password, "newPassword": new_password,
}) })
def upload_backup(self, json_data, import_handle: str): def upload_backup(self, json_data, import_handle: str = "skip"):
if import_handle not in ["overwrite", "skip", "keep"]: if import_handle not in ["overwrite", "skip", "keep"]:
raise ValueError() raise ValueError()
return self._call('uploadBackup', (json_data, import_handle)) return self._call('uploadBackup', (json_data, import_handle))
@ -801,6 +828,9 @@ class UptimeKumaApi(object):
def prepare_2fa(self, password: str): def prepare_2fa(self, password: str):
return self._call('prepare2FA', password) return self._call('prepare2FA', password)
def verify_token(self, token: str, password: str):
return self._call('verifyToken', (token, password))
def save_2fa(self, password: str): def save_2fa(self, password: str):
return self._call('save2FA', password) return self._call('save2FA', password)
@ -809,19 +839,16 @@ class UptimeKumaApi(object):
# login # login
def login(self, username: str, password: str): def login(self, username: str, password: str, token: str = ""):
return self._call('login', { return self._call('login', {
"username": username, "username": username,
"password": password, "password": password,
"token": "" "token": token
}) })
def login_by_token(self, token: str): def login_by_token(self, token: str):
return self._call('loginByToken', token) return self._call('loginByToken', token)
def verify_token(self, token: str, password: str):
return self._call('verifyToken', (token, password))
def logout(self): def logout(self):
return self._call('logout') return self._call('logout')

17
uptime_kuma_api/event.py Normal file
View file

@ -0,0 +1,17 @@
from enum import Enum
class Event(str, Enum):
CONNECT = "connect"
DISCONNECT = "disconnect"
MONITOR_LIST = "monitorList"
NOTIFICATION_LIST = "notificationList"
PROXY_LIST = "proxyList"
STATUS_PAGE_LIST = "statusPageList"
HEARTBEAT_LIST = "heartbeatList"
IMPORTANT_HEARTBEAT_LIST = "importantHeartbeatList"
AVG_PING = "avgPing"
UPTIME = "uptime"
HEARTBEAT = "heartbeat"
INFO = "info"
CERT_INFO = "certInfo"