forked from DGNum/uptime-kuma-api
add tests
This commit is contained in:
parent
6b5b54afe5
commit
a1a483efce
9 changed files with 320 additions and 1 deletions
4
.coveragerc
Normal file
4
.coveragerc
Normal file
|
@ -0,0 +1,4 @@
|
|||
[run]
|
||||
omit =
|
||||
*tests*
|
||||
*scripts*
|
59
tests/test_monitor.py
Normal file
59
tests/test_monitor.py
Normal file
|
@ -0,0 +1,59 @@
|
|||
import unittest
|
||||
from uptime_kuma_test_case import UptimeKumaTestCase
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
|
||||
class TestMonitor(UptimeKumaTestCase):
|
||||
def test_monitor(self):
|
||||
expected_monitor = {
|
||||
"type_": "http",
|
||||
"name": "monitor 1",
|
||||
"url": "http://192.168.20.135"
|
||||
}
|
||||
|
||||
# add monitor
|
||||
r = self.api.add_monitor(
|
||||
type_=expected_monitor["type_"],
|
||||
name=expected_monitor["name"],
|
||||
url=expected_monitor["url"]
|
||||
)
|
||||
self.assertEqual(r["msg"], "Added Successfully.")
|
||||
monitor_id = r["monitor_id"]
|
||||
|
||||
# get monitor
|
||||
monitor = self.api.get_monitor(monitor_id)
|
||||
self.compare(monitor, expected_monitor)
|
||||
|
||||
# get monitors
|
||||
monitors = self.api.get_monitors()
|
||||
monitor = self.find_by_id(monitors, monitor_id)
|
||||
self.assertIsNotNone(monitor)
|
||||
self.compare(monitor, expected_monitor)
|
||||
|
||||
# edit monitor
|
||||
expected_monitor["type_"] = "ping"
|
||||
expected_monitor["name"] = "monitor 1 new"
|
||||
expected_monitor["hostname"] = "127.0.0.1"
|
||||
del expected_monitor["url"]
|
||||
r = self.api.edit_monitor(monitor_id, **expected_monitor)
|
||||
self.assertEqual(r["msg"], "Saved.")
|
||||
monitor = self.api.get_monitor(monitor_id)
|
||||
self.compare(monitor, expected_monitor)
|
||||
|
||||
# pause monitor
|
||||
r = self.api.pause_monitor(monitor_id)
|
||||
self.assertEqual(r["msg"], "Paused Successfully.")
|
||||
|
||||
# resume monitor
|
||||
r = self.api.resume_monitor(monitor_id)
|
||||
self.assertEqual(r["msg"], "Resumed Successfully.")
|
||||
|
||||
# delete monitor
|
||||
r = self.api.delete_monitor(monitor_id)
|
||||
self.assertEqual(r["msg"], "Deleted Successfully.")
|
||||
with self.assertRaises(UptimeKumaException):
|
||||
self.api.get_monitor(monitor_id)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
28
tests/test_monitor_tag.py
Normal file
28
tests/test_monitor_tag.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
import unittest
|
||||
from uptime_kuma_test_case import UptimeKumaTestCase
|
||||
|
||||
|
||||
class TestMonitorTag(UptimeKumaTestCase):
|
||||
def test_monitor_tag(self):
|
||||
r = self.api.add_tag(name="tag 1", color="#ffffff")
|
||||
tag_id = r["id"]
|
||||
r = self.api.add_monitor(type_="http", name="monitor 1", url="http://127.0.0.1")
|
||||
monitor_id = r["monitor_id"]
|
||||
|
||||
expected_monitor_tag = {
|
||||
"tag_id": tag_id,
|
||||
"monitor_id": monitor_id,
|
||||
"value": "value 1"
|
||||
}
|
||||
|
||||
# add monitor tag
|
||||
r = self.api.add_monitor_tag(**expected_monitor_tag)
|
||||
self.assertEqual(r["msg"], "Added Successfully.")
|
||||
|
||||
# delete monitor tag
|
||||
r = self.api.delete_monitor_tag(**expected_monitor_tag)
|
||||
self.assertEqual(r["msg"], "Deleted Successfully.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
55
tests/test_notification.py
Normal file
55
tests/test_notification.py
Normal file
|
@ -0,0 +1,55 @@
|
|||
import unittest
|
||||
from uptime_kuma_test_case import UptimeKumaTestCase
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
|
||||
class TestNotification(UptimeKumaTestCase):
|
||||
def test_notification(self):
|
||||
expected_notification = {
|
||||
"name": "notification 1",
|
||||
"default": True,
|
||||
"apply_existing": True,
|
||||
"type_": "push_by_techulus",
|
||||
"push_by_techulus_apikey": "123456789"
|
||||
}
|
||||
|
||||
# test notification
|
||||
with self.assertRaisesRegex(UptimeKumaException, r'Invalid API key'):
|
||||
self.api.test_notification(**expected_notification)
|
||||
|
||||
# add notification
|
||||
r = self.api.add_notification(**expected_notification)
|
||||
self.assertEqual(r["msg"], "Saved")
|
||||
notification_id = r["id"]
|
||||
|
||||
# get notification
|
||||
notification = self.api.get_notification(notification_id)
|
||||
self.compare(notification, expected_notification)
|
||||
|
||||
# get notifications
|
||||
notifications = self.api.get_notifications()
|
||||
notification = self.find_by_id(notifications, notification_id)
|
||||
self.assertIsNotNone(notification)
|
||||
self.compare(notification, expected_notification)
|
||||
|
||||
# edit notification
|
||||
expected_notification["name"] = "notification 1 new"
|
||||
expected_notification["default"] = False
|
||||
expected_notification["apply_existing"] = False
|
||||
expected_notification["type_"] = "push_deer"
|
||||
expected_notification["push_deer_deer_key"] = "987654321"
|
||||
del expected_notification["push_by_techulus_apikey"]
|
||||
r = self.api.edit_notification(notification_id, **expected_notification)
|
||||
self.assertEqual(r["msg"], "Saved")
|
||||
notification = self.api.get_notification(notification_id)
|
||||
self.compare(notification, expected_notification)
|
||||
|
||||
# delete notification
|
||||
r = self.api.delete_notification(notification_id)
|
||||
self.assertEqual(r["msg"], "Deleted")
|
||||
with self.assertRaises(UptimeKumaException):
|
||||
self.api.delete_notification(notification_id)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
43
tests/test_proxy.py
Normal file
43
tests/test_proxy.py
Normal file
|
@ -0,0 +1,43 @@
|
|||
import unittest
|
||||
from uptime_kuma_test_case import UptimeKumaTestCase
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
|
||||
class TestProxy(UptimeKumaTestCase):
|
||||
def test_proxy(self):
|
||||
expected_proxy = {
|
||||
"protocol": "http",
|
||||
"host": "127.0.0.1",
|
||||
"port": 8080,
|
||||
"active": True
|
||||
}
|
||||
|
||||
# add proxy
|
||||
r = self.api.add_proxy(**expected_proxy)
|
||||
self.assertEqual(r["msg"], "Saved")
|
||||
proxy_id = r["id"]
|
||||
|
||||
# get proxy
|
||||
proxy = self.api.get_proxy(proxy_id)
|
||||
self.compare(proxy, expected_proxy)
|
||||
|
||||
# edit proxy
|
||||
expected_proxy["protocol"] = "https"
|
||||
expected_proxy["host"] = "127.0.0.2"
|
||||
expected_proxy["port"] = 8888
|
||||
expected_proxy["active"] = False
|
||||
r = self.api.edit_proxy(proxy_id, **expected_proxy)
|
||||
self.assertEqual(r["msg"], "Saved")
|
||||
proxy = self.api.get_proxy(proxy_id)
|
||||
self.compare(proxy, expected_proxy)
|
||||
|
||||
# delete proxy
|
||||
r = self.api.delete_proxy(proxy_id)
|
||||
self.assertEqual(r["msg"], "Deleted")
|
||||
print(r)
|
||||
with self.assertRaises(UptimeKumaException):
|
||||
self.api.get_proxy(proxy_id)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
65
tests/test_status_page.py
Normal file
65
tests/test_status_page.py
Normal file
|
@ -0,0 +1,65 @@
|
|||
import unittest
|
||||
from uptime_kuma_test_case import UptimeKumaTestCase
|
||||
from uptime_kuma_api import UptimeKumaException, IncidentStyle
|
||||
|
||||
|
||||
class TestStatusPage(UptimeKumaTestCase):
|
||||
def test_status_page(self):
|
||||
slug = "slug1"
|
||||
expected_status_page = {
|
||||
"slug": slug,
|
||||
"title": "status page 1",
|
||||
"description": "description 1",
|
||||
"show_powered_by": False
|
||||
}
|
||||
|
||||
# slug must be unique
|
||||
try:
|
||||
self.api.delete_status_page(slug)
|
||||
except UptimeKumaException:
|
||||
pass
|
||||
|
||||
# add status page
|
||||
r = self.api.add_status_page(slug, expected_status_page["title"])
|
||||
self.assertEqual(r["msg"], "OK!")
|
||||
|
||||
# save status page
|
||||
self.api.save_status_page(**expected_status_page)
|
||||
|
||||
# get status page
|
||||
status_page = self.api.get_status_page(slug)
|
||||
self.compare(status_page, expected_status_page)
|
||||
|
||||
# get status pages
|
||||
status_pages = self.api.get_status_pages()
|
||||
status_page = self.find_by_id(status_pages, slug, "slug")
|
||||
self.assertIsNotNone(status_page)
|
||||
self.compare(status_page, expected_status_page)
|
||||
|
||||
# edit status page
|
||||
expected_status_page["title"] = "status page 1 new"
|
||||
expected_status_page["theme"] = "dark"
|
||||
self.api.save_status_page(**expected_status_page)
|
||||
status_page = self.api.get_status_page(slug)
|
||||
self.compare(status_page, expected_status_page)
|
||||
|
||||
# pin incident
|
||||
incident_expected = {
|
||||
"title": "title 1",
|
||||
"content": "content 1",
|
||||
"style": IncidentStyle.DANGER
|
||||
}
|
||||
incident = self.api.post_incident(slug, **incident_expected)
|
||||
self.compare(incident, incident_expected)
|
||||
|
||||
# unpin incident
|
||||
self.api.unpin_incident(slug)
|
||||
|
||||
# delete status page
|
||||
self.api.delete_status_page(slug)
|
||||
with self.assertRaises(UptimeKumaException):
|
||||
self.api.get_status_page(slug)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
37
tests/test_tag.py
Normal file
37
tests/test_tag.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
import unittest
|
||||
from uptime_kuma_test_case import UptimeKumaTestCase
|
||||
from uptime_kuma_api import UptimeKumaException
|
||||
|
||||
|
||||
class TestTag(UptimeKumaTestCase):
|
||||
def test_tag(self):
|
||||
expected_tag = {
|
||||
"name": "tag 1",
|
||||
"color": "#ffffff"
|
||||
}
|
||||
|
||||
# add tag
|
||||
tag = self.api.add_tag(**expected_tag)
|
||||
self.compare(tag, expected_tag)
|
||||
tag_id = tag["id"]
|
||||
|
||||
# get tag
|
||||
tag = self.api.get_tag(tag_id)
|
||||
self.compare(tag, expected_tag)
|
||||
|
||||
# get tags
|
||||
tags = self.api.get_tags()
|
||||
tag = self.find_by_id(tags, tag_id)
|
||||
self.assertIsNotNone(tag)
|
||||
self.compare(tag, expected_tag)
|
||||
|
||||
# delete tag
|
||||
r = self.api.delete_tag(tag_id)
|
||||
self.assertEqual(r["msg"], "Deleted Successfully.")
|
||||
print(r)
|
||||
with self.assertRaises(UptimeKumaException):
|
||||
self.api.get_proxy(tag_id)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
28
tests/uptime_kuma_test_case.py
Normal file
28
tests/uptime_kuma_test_case.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
import unittest
|
||||
from uptime_kuma_api import UptimeKumaApi
|
||||
|
||||
|
||||
class UptimeKumaTestCase(unittest.TestCase):
|
||||
api = None
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.api = UptimeKumaApi("http://127.0.0.1:3001")
|
||||
username = "testuser"
|
||||
password = "zS7zhQSc"
|
||||
if cls.api.need_setup():
|
||||
cls.api.setup(username, password)
|
||||
cls.api.login(username, password)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.api.logout()
|
||||
cls.api.disconnect()
|
||||
|
||||
def compare(self, superset, subset):
|
||||
return subset.items() <= superset.items()
|
||||
|
||||
def find_by_id(self, objects, value, key="id"):
|
||||
for obj in objects:
|
||||
if obj[key] == value:
|
||||
return obj
|
|
@ -370,7 +370,7 @@ def _check_arguments_notification(kwargs):
|
|||
|
||||
def _check_arguments_proxy(kwargs):
|
||||
required_args = ["protocol", "host", "port"]
|
||||
if "auth" in kwargs:
|
||||
if kwargs.get("auth"):
|
||||
required_args.extend(["username", "password"])
|
||||
_check_missing_arguments(required_args, kwargs, params_map_proxy)
|
||||
|
||||
|
|
Loading…
Reference in a new issue