fix: update event list data after changes

This commit is contained in:
lucasheld 2022-10-04 18:38:17 +02:00
parent fee0a1dd8e
commit 06fa29cd41
7 changed files with 115 additions and 29 deletions

View file

@ -13,6 +13,9 @@ class TestDockerHost(UptimeKumaTestCase):
self.skipTest("Unsupported in this Uptime Kuma version")
def test_docker_host(self):
# get empty list to make sure that future accesses will also work
self.api.get_docker_hosts()
expected_docker_host = {
"name": "name 1",
"dockerType": DockerType.SOCKET,

View file

@ -7,6 +7,9 @@ from uptime_kuma_test_case import UptimeKumaTestCase
class TestMonitor(UptimeKumaTestCase):
def test_monitor(self):
# get empty list to make sure that future accesses will also work
self.api.get_monitors()
notification_id_1 = self.add_notification()
notification_id_2 = self.add_notification()
@ -75,6 +78,9 @@ class TestMonitor(UptimeKumaTestCase):
monitor = self.api.get_monitor(monitor_id)
self.compare(monitor, expected_monitor)
expected_monitor.update({
"name": "monitor 2"
})
r = self.api.edit_monitor(monitor_id, **expected_monitor)
self.assertEqual(r["msg"], "Saved.")
monitor = self.api.get_monitor(monitor_id)

View file

@ -18,10 +18,20 @@ class TestMonitorTag(UptimeKumaTestCase):
r = self.api.add_monitor_tag(**expected_monitor_tag)
self.assertEqual(r["msg"], "Added Successfully.")
# check if tag is listed in monitor tags
monitors = self.api.get_monitors()
monitor = self.find_by_id(monitors, monitor_id)
self.assertEqual(monitor["tags"][0]["tag_id"], tag_id)
# delete monitor tag
r = self.api.delete_monitor_tag(**expected_monitor_tag)
self.assertEqual(r["msg"], "Deleted Successfully.")
# check if tag is not listed in monitor tags
monitors = self.api.get_monitors()
monitor = self.find_by_id(monitors, monitor_id)
self.assertEqual(monitor["tags"], [])
if __name__ == '__main__':
unittest.main()

View file

@ -6,6 +6,9 @@ from uptime_kuma_test_case import UptimeKumaTestCase
class TestNotification(UptimeKumaTestCase):
def test_notification(self):
# get empty list to make sure that future accesses will also work
self.api.get_notifications()
expected_notification = {
"name": "notification 1",
"isDefault": True,

View file

@ -6,6 +6,9 @@ from uptime_kuma_test_case import UptimeKumaTestCase
class TestProxy(UptimeKumaTestCase):
def test_proxy(self):
# get empty list to make sure that future accesses will also work
self.api.get_proxies()
expected_proxy = {
"protocol": ProxyProtocol.HTTP,
"host": "127.0.0.1",

View file

@ -6,6 +6,9 @@ from uptime_kuma_test_case import UptimeKumaTestCase
class TestStatusPage(UptimeKumaTestCase):
def test_status_page(self):
# get empty list to make sure that future accesses will also work
self.api.get_status_pages()
monitor_id = self.add_monitor()
slug = "slug1"
@ -81,6 +84,10 @@ class TestStatusPage(UptimeKumaTestCase):
with self.assertRaises(UptimeKumaException):
self.api.get_status_page(slug)
status_pages = self.api.get_status_pages()
status_page = self.find_by_id(status_pages, slug, "slug")
self.assertIsNone(status_page)
if __name__ == '__main__':
unittest.main()

View file

@ -1,19 +1,21 @@
import json
import time
import string
import random
import string
import time
from contextlib import contextmanager
from copy import deepcopy
import requests
import socketio
from packaging.version import parse as parse_version
from . import AuthMethod
from . import DockerType
from . import Event
from . import IncidentStyle
from . import MonitorType
from . import NotificationType, notification_provider_options, notification_provider_conditions
from . import ProxyProtocol
from . import IncidentStyle
from . import DockerType
from . import Event
from . import UptimeKumaException
@ -305,6 +307,24 @@ class UptimeKumaApi(object):
self.connect()
@contextmanager
def wait_for_event(self, event: Event):
retries = 200
event_data_before = deepcopy(self._event_data)
try:
yield
except:
raise
else:
counter = 0
while event_data_before[event] == self._event_data[event]:
time.sleep(0.01)
counter += 1
if counter >= retries:
print("wait_for_event timeout")
break
def _get_event_data(self, event):
monitor_events = [Event.AVG_PING, Event.UPTIME, Event.HEARTBEAT_LIST, Event.IMPORTANT_HEARTBEAT_LIST, Event.CERT_INFO, Event.HEARTBEAT]
while self._event_data[event] is None:
@ -313,7 +333,7 @@ class UptimeKumaApi(object):
return []
time.sleep(0.01)
time.sleep(0.05) # wait for multiple messages
return self._event_data[event]
return deepcopy(self._event_data[event])
def _call(self, event, data=None):
r = self.sio.call(event, data)
@ -578,7 +598,7 @@ class UptimeKumaApi(object):
return data
# monitors
# monitor
def get_monitors(self):
r = list(self._get_event_data(Event.MONITOR_LIST).values())
@ -600,7 +620,8 @@ class UptimeKumaApi(object):
return self._call('resumeMonitor', id_)
def delete_monitor(self, id_: int):
return self._call('deleteMonitor', id_)
with self.wait_for_event(Event.MONITOR_LIST):
return self._call('deleteMonitor', id_)
def get_monitor_beats(self, id_: int, hours: int):
r = self._call('getMonitorBeats', (id_, hours))["data"]
@ -611,30 +632,36 @@ class UptimeKumaApi(object):
data = self._build_monitor_data(**kwargs)
_convert_monitor_input(data)
_check_arguments_monitor(data)
r = self._call('add', data)
return r
with self.wait_for_event(Event.MONITOR_LIST):
return self._call('add', data)
def edit_monitor(self, id_: int, **kwargs):
data = self.get_monitor(id_)
data.update(kwargs)
_convert_monitor_input(data)
_check_arguments_monitor(data)
r = self._call('editMonitor', data)
return r
with self.wait_for_event(Event.MONITOR_LIST):
return self._call('editMonitor', data)
# monitor tags
def add_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
return self._call('addMonitorTag', (tag_id, monitor_id, value))
r = self._call('addMonitorTag', (tag_id, monitor_id, value))
# the monitor list event does not send the updated tags
self._event_data[Event.MONITOR_LIST][str(monitor_id)] = self.get_monitor(monitor_id)
return r
# editMonitorTag is unused in uptime-kuma
# def edit_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
# return self._call('editMonitorTag', (tag_id, monitor_id, value))
def delete_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
return self._call('deleteMonitorTag', (tag_id, monitor_id, value))
r = self._call('deleteMonitorTag', (tag_id, monitor_id, value))
# the monitor list event does not send the updated tags
self._event_data[Event.MONITOR_LIST][str(monitor_id)] = self.get_monitor(monitor_id)
return r
# notifications
# notification
def get_notifications(self):
notifications = self._get_event_data(Event.NOTIFICATION_LIST)
@ -664,7 +691,8 @@ class UptimeKumaApi(object):
data = _build_notification_data(**kwargs)
_check_arguments_notification(data)
return self._call('addNotification', (data, None))
with self.wait_for_event(Event.NOTIFICATION_LIST):
return self._call('addNotification', (data, None))
def edit_notification(self, id_: int, **kwargs):
notification = self.get_notification(id_)
@ -680,10 +708,12 @@ class UptimeKumaApi(object):
notification.update(kwargs)
_check_arguments_notification(notification)
return self._call('addNotification', (notification, id_))
with self.wait_for_event(Event.NOTIFICATION_LIST):
return self._call('addNotification', (notification, id_))
def delete_notification(self, id_: int):
return self._call('deleteNotification', id_)
with self.wait_for_event(Event.NOTIFICATION_LIST):
return self._call('deleteNotification', id_)
def check_apprise(self):
return self._call('checkApprise')
@ -706,22 +736,24 @@ class UptimeKumaApi(object):
data = _build_proxy_data(**kwargs)
_check_arguments_proxy(data)
return self._call('addProxy', (data, None))
with self.wait_for_event(Event.PROXY_LIST):
return self._call('addProxy', (data, None))
def edit_proxy(self, id_: int, **kwargs):
proxy = self.get_proxy(id_)
proxy.update(kwargs)
_check_arguments_proxy(proxy)
return self._call('addProxy', (proxy, id_))
with self.wait_for_event(Event.PROXY_LIST):
return self._call('addProxy', (proxy, id_))
def delete_proxy(self, id_: int):
return self._call('deleteProxy', id_)
with self.wait_for_event(Event.PROXY_LIST):
return self._call('deleteProxy', id_)
# status page
def get_status_pages(self):
r = list(self._get_event_data(Event.STATUS_PAGE_LIST).values())
return r
return list(self._get_event_data(Event.STATUS_PAGE_LIST).values())
def get_status_page(self, slug: str):
r1 = self._call('getStatusPage', slug)
@ -736,17 +768,36 @@ class UptimeKumaApi(object):
}
def add_status_page(self, slug: str, title: str):
return self._call('addStatusPage', (title, slug))
with self.wait_for_event(Event.STATUS_PAGE_LIST):
return self._call('addStatusPage', (title, slug))
def delete_status_page(self, slug: str):
return self._call('deleteStatusPage', slug)
r = self._call('deleteStatusPage', slug)
# uptime kuma does not send the status page list event when a status page is deleted
for status_page in self._event_data[Event.STATUS_PAGE_LIST].values():
if status_page["slug"] == slug:
status_page_id = status_page["id"]
del self._event_data[Event.STATUS_PAGE_LIST][str(status_page_id)]
break
return r
def save_status_page(self, slug: str, **kwargs):
status_page = self.get_status_page(slug)
status_page.pop("incident")
status_page.update(kwargs)
data = _build_status_page_data(**status_page)
return self._call('saveStatusPage', data)
r = self._call('saveStatusPage', data)
# uptime kuma does not send the status page list event when a status page is saved
status_page = self._call('getStatusPage', slug)["config"]
status_page_id = status_page["id"]
if self._event_data[Event.STATUS_PAGE_LIST] is None:
self._event_data[Event.STATUS_PAGE_LIST] = {}
self._event_data[Event.STATUS_PAGE_LIST][str(status_page_id)] = status_page
return r
def post_incident(
self,
@ -990,13 +1041,16 @@ class UptimeKumaApi(object):
def add_docker_host(self, **kwargs):
data = _build_docker_host_data(**kwargs)
_convert_docker_host_input(data)
return self._call('addDockerHost', (data, None))
with self.wait_for_event(Event.DOCKER_HOST_LIST):
return self._call('addDockerHost', (data, None))
def edit_docker_host(self, id_: int, **kwargs):
data = self.get_docker_host(id_)
data.update(kwargs)
_convert_docker_host_input(data)
return self._call('addDockerHost', (data, id_))
with self.wait_for_event(Event.DOCKER_HOST_LIST):
return self._call('addDockerHost', (data, id_))
def delete_docker_host(self, id_: int):
return self._call('deleteDockerHost', id_)
with self.wait_for_event(Event.DOCKER_HOST_LIST):
return self._call('deleteDockerHost', id_)