diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..9b90258 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,4 @@ +[run] +omit = + *tests* + *scripts* diff --git a/tests/test_monitor.py b/tests/test_monitor.py new file mode 100644 index 0000000..f92bfeb --- /dev/null +++ b/tests/test_monitor.py @@ -0,0 +1,59 @@ +import unittest +from uptime_kuma_test_case import UptimeKumaTestCase +from uptime_kuma_api import UptimeKumaException + + +class TestMonitor(UptimeKumaTestCase): + def test_monitor(self): + expected_monitor = { + "type_": "http", + "name": "monitor 1", + "url": "http://192.168.20.135" + } + + # add monitor + r = self.api.add_monitor( + type_=expected_monitor["type_"], + name=expected_monitor["name"], + url=expected_monitor["url"] + ) + self.assertEqual(r["msg"], "Added Successfully.") + monitor_id = r["monitor_id"] + + # get monitor + monitor = self.api.get_monitor(monitor_id) + self.compare(monitor, expected_monitor) + + # get monitors + monitors = self.api.get_monitors() + monitor = self.find_by_id(monitors, monitor_id) + self.assertIsNotNone(monitor) + self.compare(monitor, expected_monitor) + + # edit monitor + expected_monitor["type_"] = "ping" + expected_monitor["name"] = "monitor 1 new" + expected_monitor["hostname"] = "127.0.0.1" + del expected_monitor["url"] + r = self.api.edit_monitor(monitor_id, **expected_monitor) + self.assertEqual(r["msg"], "Saved.") + monitor = self.api.get_monitor(monitor_id) + self.compare(monitor, expected_monitor) + + # pause monitor + r = self.api.pause_monitor(monitor_id) + self.assertEqual(r["msg"], "Paused Successfully.") + + # resume monitor + r = self.api.resume_monitor(monitor_id) + self.assertEqual(r["msg"], "Resumed Successfully.") + + # delete monitor + r = self.api.delete_monitor(monitor_id) + self.assertEqual(r["msg"], "Deleted Successfully.") + with self.assertRaises(UptimeKumaException): + self.api.get_monitor(monitor_id) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_monitor_tag.py b/tests/test_monitor_tag.py new file mode 100644 index 0000000..c946a04 --- /dev/null +++ b/tests/test_monitor_tag.py @@ -0,0 +1,28 @@ +import unittest +from uptime_kuma_test_case import UptimeKumaTestCase + + +class TestMonitorTag(UptimeKumaTestCase): + def test_monitor_tag(self): + r = self.api.add_tag(name="tag 1", color="#ffffff") + tag_id = r["id"] + r = self.api.add_monitor(type_="http", name="monitor 1", url="http://127.0.0.1") + monitor_id = r["monitor_id"] + + expected_monitor_tag = { + "tag_id": tag_id, + "monitor_id": monitor_id, + "value": "value 1" + } + + # add monitor tag + r = self.api.add_monitor_tag(**expected_monitor_tag) + self.assertEqual(r["msg"], "Added Successfully.") + + # delete monitor tag + r = self.api.delete_monitor_tag(**expected_monitor_tag) + self.assertEqual(r["msg"], "Deleted Successfully.") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_notification.py b/tests/test_notification.py new file mode 100644 index 0000000..dd62d82 --- /dev/null +++ b/tests/test_notification.py @@ -0,0 +1,55 @@ +import unittest +from uptime_kuma_test_case import UptimeKumaTestCase +from uptime_kuma_api import UptimeKumaException + + +class TestNotification(UptimeKumaTestCase): + def test_notification(self): + expected_notification = { + "name": "notification 1", + "default": True, + "apply_existing": True, + "type_": "push_by_techulus", + "push_by_techulus_apikey": "123456789" + } + + # test notification + with self.assertRaisesRegex(UptimeKumaException, r'Invalid API key'): + self.api.test_notification(**expected_notification) + + # add notification + r = self.api.add_notification(**expected_notification) + self.assertEqual(r["msg"], "Saved") + notification_id = r["id"] + + # get notification + notification = self.api.get_notification(notification_id) + self.compare(notification, expected_notification) + + # get notifications + notifications = self.api.get_notifications() + notification = self.find_by_id(notifications, notification_id) + self.assertIsNotNone(notification) + self.compare(notification, expected_notification) + + # edit notification + expected_notification["name"] = "notification 1 new" + expected_notification["default"] = False + expected_notification["apply_existing"] = False + expected_notification["type_"] = "push_deer" + expected_notification["push_deer_deer_key"] = "987654321" + del expected_notification["push_by_techulus_apikey"] + r = self.api.edit_notification(notification_id, **expected_notification) + self.assertEqual(r["msg"], "Saved") + notification = self.api.get_notification(notification_id) + self.compare(notification, expected_notification) + + # delete notification + r = self.api.delete_notification(notification_id) + self.assertEqual(r["msg"], "Deleted") + with self.assertRaises(UptimeKumaException): + self.api.delete_notification(notification_id) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_proxy.py b/tests/test_proxy.py new file mode 100644 index 0000000..295d8e6 --- /dev/null +++ b/tests/test_proxy.py @@ -0,0 +1,43 @@ +import unittest +from uptime_kuma_test_case import UptimeKumaTestCase +from uptime_kuma_api import UptimeKumaException + + +class TestProxy(UptimeKumaTestCase): + def test_proxy(self): + expected_proxy = { + "protocol": "http", + "host": "127.0.0.1", + "port": 8080, + "active": True + } + + # add proxy + r = self.api.add_proxy(**expected_proxy) + self.assertEqual(r["msg"], "Saved") + proxy_id = r["id"] + + # get proxy + proxy = self.api.get_proxy(proxy_id) + self.compare(proxy, expected_proxy) + + # edit proxy + expected_proxy["protocol"] = "https" + expected_proxy["host"] = "127.0.0.2" + expected_proxy["port"] = 8888 + expected_proxy["active"] = False + r = self.api.edit_proxy(proxy_id, **expected_proxy) + self.assertEqual(r["msg"], "Saved") + proxy = self.api.get_proxy(proxy_id) + self.compare(proxy, expected_proxy) + + # delete proxy + r = self.api.delete_proxy(proxy_id) + self.assertEqual(r["msg"], "Deleted") + print(r) + with self.assertRaises(UptimeKumaException): + self.api.get_proxy(proxy_id) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_status_page.py b/tests/test_status_page.py new file mode 100644 index 0000000..2d986a5 --- /dev/null +++ b/tests/test_status_page.py @@ -0,0 +1,65 @@ +import unittest +from uptime_kuma_test_case import UptimeKumaTestCase +from uptime_kuma_api import UptimeKumaException, IncidentStyle + + +class TestStatusPage(UptimeKumaTestCase): + def test_status_page(self): + slug = "slug1" + expected_status_page = { + "slug": slug, + "title": "status page 1", + "description": "description 1", + "show_powered_by": False + } + + # slug must be unique + try: + self.api.delete_status_page(slug) + except UptimeKumaException: + pass + + # add status page + r = self.api.add_status_page(slug, expected_status_page["title"]) + self.assertEqual(r["msg"], "OK!") + + # save status page + self.api.save_status_page(**expected_status_page) + + # get status page + status_page = self.api.get_status_page(slug) + self.compare(status_page, expected_status_page) + + # get status pages + status_pages = self.api.get_status_pages() + status_page = self.find_by_id(status_pages, slug, "slug") + self.assertIsNotNone(status_page) + self.compare(status_page, expected_status_page) + + # edit status page + expected_status_page["title"] = "status page 1 new" + expected_status_page["theme"] = "dark" + self.api.save_status_page(**expected_status_page) + status_page = self.api.get_status_page(slug) + self.compare(status_page, expected_status_page) + + # pin incident + incident_expected = { + "title": "title 1", + "content": "content 1", + "style": IncidentStyle.DANGER + } + incident = self.api.post_incident(slug, **incident_expected) + self.compare(incident, incident_expected) + + # unpin incident + self.api.unpin_incident(slug) + + # delete status page + self.api.delete_status_page(slug) + with self.assertRaises(UptimeKumaException): + self.api.get_status_page(slug) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_tag.py b/tests/test_tag.py new file mode 100644 index 0000000..39df788 --- /dev/null +++ b/tests/test_tag.py @@ -0,0 +1,37 @@ +import unittest +from uptime_kuma_test_case import UptimeKumaTestCase +from uptime_kuma_api import UptimeKumaException + + +class TestTag(UptimeKumaTestCase): + def test_tag(self): + expected_tag = { + "name": "tag 1", + "color": "#ffffff" + } + + # add tag + tag = self.api.add_tag(**expected_tag) + self.compare(tag, expected_tag) + tag_id = tag["id"] + + # get tag + tag = self.api.get_tag(tag_id) + self.compare(tag, expected_tag) + + # get tags + tags = self.api.get_tags() + tag = self.find_by_id(tags, tag_id) + self.assertIsNotNone(tag) + self.compare(tag, expected_tag) + + # delete tag + r = self.api.delete_tag(tag_id) + self.assertEqual(r["msg"], "Deleted Successfully.") + print(r) + with self.assertRaises(UptimeKumaException): + self.api.get_proxy(tag_id) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/uptime_kuma_test_case.py b/tests/uptime_kuma_test_case.py new file mode 100644 index 0000000..a0d4bfc --- /dev/null +++ b/tests/uptime_kuma_test_case.py @@ -0,0 +1,28 @@ +import unittest +from uptime_kuma_api import UptimeKumaApi + + +class UptimeKumaTestCase(unittest.TestCase): + api = None + + @classmethod + def setUpClass(cls): + cls.api = UptimeKumaApi("http://127.0.0.1:3001") + username = "testuser" + password = "zS7zhQSc" + if cls.api.need_setup(): + cls.api.setup(username, password) + cls.api.login(username, password) + + @classmethod + def tearDownClass(cls): + cls.api.logout() + cls.api.disconnect() + + def compare(self, superset, subset): + return subset.items() <= superset.items() + + def find_by_id(self, objects, value, key="id"): + for obj in objects: + if obj[key] == value: + return obj diff --git a/uptime_kuma_api/api.py b/uptime_kuma_api/api.py index 50cfb34..144e62b 100644 --- a/uptime_kuma_api/api.py +++ b/uptime_kuma_api/api.py @@ -370,7 +370,7 @@ def _check_arguments_notification(kwargs): def _check_arguments_proxy(kwargs): required_args = ["protocol", "host", "port"] - if "auth" in kwargs: + if kwargs.get("auth"): required_args.extend(["username", "password"]) _check_missing_arguments(required_args, kwargs, params_map_proxy)