forked from DGNum/uptime-kuma-api
test: create objects with all available arguments
This commit is contained in:
parent
de38586bf5
commit
928462c6b2
7 changed files with 136 additions and 36 deletions
|
@ -1,17 +1,29 @@
|
||||||
import unittest
|
import unittest
|
||||||
from packaging.version import parse as parse_version
|
from packaging.version import parse as parse_version
|
||||||
|
|
||||||
from uptime_kuma_api import UptimeKumaException, MonitorType
|
from uptime_kuma_api import UptimeKumaException, MonitorType, AuthMethod
|
||||||
from uptime_kuma_test_case import UptimeKumaTestCase
|
from uptime_kuma_test_case import UptimeKumaTestCase
|
||||||
|
|
||||||
|
|
||||||
class TestMonitor(UptimeKumaTestCase):
|
class TestMonitor(UptimeKumaTestCase):
|
||||||
def test_monitor(self):
|
def test_monitor(self):
|
||||||
|
notification_id_1 = self.add_notification()
|
||||||
|
notification_id_2 = self.add_notification()
|
||||||
|
|
||||||
expected_monitor = {
|
expected_monitor = {
|
||||||
"type": MonitorType.HTTP,
|
"type": MonitorType.HTTP,
|
||||||
"name": "monitor 1",
|
"name": "monitor 1",
|
||||||
|
"interval": 60,
|
||||||
|
"retryInterval": 60,
|
||||||
|
"maxretries": 0,
|
||||||
|
"notificationIDList": [notification_id_1, notification_id_2],
|
||||||
|
"upsideDown": False,
|
||||||
"url": "http://127.0.0.1"
|
"url": "http://127.0.0.1"
|
||||||
}
|
}
|
||||||
|
if parse_version(self.api.version) >= parse_version("1.18"):
|
||||||
|
expected_monitor.update({
|
||||||
|
"resendInterval": 0
|
||||||
|
})
|
||||||
|
|
||||||
# add monitor
|
# add monitor
|
||||||
r = self.api.add_monitor(**expected_monitor)
|
r = self.api.add_monitor(**expected_monitor)
|
||||||
|
@ -60,18 +72,57 @@ class TestMonitor(UptimeKumaTestCase):
|
||||||
self.assertEqual(r["msg"], "Added Successfully.")
|
self.assertEqual(r["msg"], "Added Successfully.")
|
||||||
monitor_id = r["monitorID"]
|
monitor_id = r["monitorID"]
|
||||||
|
|
||||||
|
monitor = self.api.get_monitor(monitor_id)
|
||||||
|
self.compare(monitor, expected_monitor)
|
||||||
|
|
||||||
|
r = self.api.edit_monitor(monitor_id, **expected_monitor)
|
||||||
|
self.assertEqual(r["msg"], "Saved.")
|
||||||
|
monitor = self.api.get_monitor(monitor_id)
|
||||||
|
self.compare(monitor, expected_monitor)
|
||||||
|
|
||||||
monitor = self.api.get_monitor(monitor_id)
|
monitor = self.api.get_monitor(monitor_id)
|
||||||
self.compare(monitor, expected_monitor)
|
self.compare(monitor, expected_monitor)
|
||||||
return monitor
|
return monitor
|
||||||
|
|
||||||
def test_monitor_type_http(self):
|
def test_monitor_type_http(self):
|
||||||
|
proxy_id = self.add_proxy()
|
||||||
|
|
||||||
json_data = '{"key": "value"}'
|
json_data = '{"key": "value"}'
|
||||||
expected_monitor = {
|
expected_monitor = {
|
||||||
"type": MonitorType.HTTP,
|
"type": MonitorType.HTTP,
|
||||||
"name": "monitor 1",
|
"name": "monitor 1",
|
||||||
"url": "http://127.0.0.1",
|
"url": "http://127.0.0.1",
|
||||||
|
"expiryNotification": False,
|
||||||
|
"ignoreTls": False,
|
||||||
|
"maxredirects": 10,
|
||||||
|
"accepted_statuscodes": ["200-299"],
|
||||||
|
"proxyId": proxy_id,
|
||||||
|
"method": "GET",
|
||||||
"body": json_data,
|
"body": json_data,
|
||||||
"headers": json_data
|
"headers": json_data,
|
||||||
|
"authMethod": AuthMethod.NONE
|
||||||
|
}
|
||||||
|
self.do_test_monitor_type(expected_monitor)
|
||||||
|
|
||||||
|
def test_monitor_auth_method(self):
|
||||||
|
for auth_method in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]:
|
||||||
|
expected_monitor = {
|
||||||
|
"type": MonitorType.HTTP,
|
||||||
|
"name": "monitor 1",
|
||||||
|
"url": "http://127.0.0.1",
|
||||||
|
"authMethod": auth_method,
|
||||||
|
"basic_auth_user": "auth user",
|
||||||
|
"basic_auth_pass": "auth pass",
|
||||||
|
}
|
||||||
|
self.do_test_monitor_type(expected_monitor)
|
||||||
|
|
||||||
|
expected_monitor = {
|
||||||
|
"type": MonitorType.HTTP,
|
||||||
|
"name": "monitor 1",
|
||||||
|
"url": "http://127.0.0.1",
|
||||||
|
"authMethod": AuthMethod.NTLM,
|
||||||
|
"authDomain": "auth domain",
|
||||||
|
"authWorkstation": "auth workstation",
|
||||||
}
|
}
|
||||||
self.do_test_monitor_type(expected_monitor)
|
self.do_test_monitor_type(expected_monitor)
|
||||||
|
|
||||||
|
@ -107,7 +158,8 @@ class TestMonitor(UptimeKumaTestCase):
|
||||||
"name": "monitor 1",
|
"name": "monitor 1",
|
||||||
"hostname": "127.0.0.1",
|
"hostname": "127.0.0.1",
|
||||||
"port": 8888,
|
"port": 8888,
|
||||||
"dns_resolve_server": "1.1.1.1"
|
"dns_resolve_server": "1.1.1.1",
|
||||||
|
"dns_resolve_type": "A"
|
||||||
}
|
}
|
||||||
self.do_test_monitor_type(expected_monitor)
|
self.do_test_monitor_type(expected_monitor)
|
||||||
|
|
||||||
|
@ -136,7 +188,10 @@ class TestMonitor(UptimeKumaTestCase):
|
||||||
"name": "monitor 1",
|
"name": "monitor 1",
|
||||||
"hostname": "127.0.0.1",
|
"hostname": "127.0.0.1",
|
||||||
"port": 8888,
|
"port": 8888,
|
||||||
"mqttTopic": "test"
|
"mqttUsername": "mqtt username",
|
||||||
|
"mqttPassword": "mqtt password",
|
||||||
|
"mqttTopic": "mqtt topic",
|
||||||
|
"mqttSuccessMessage": "mqtt success message"
|
||||||
}
|
}
|
||||||
self.do_test_monitor_type(expected_monitor)
|
self.do_test_monitor_type(expected_monitor)
|
||||||
|
|
||||||
|
@ -190,18 +245,6 @@ class TestMonitor(UptimeKumaTestCase):
|
||||||
}
|
}
|
||||||
self.do_test_monitor_type(expected_monitor)
|
self.do_test_monitor_type(expected_monitor)
|
||||||
|
|
||||||
def test_notification_id_list(self):
|
|
||||||
monitor_id = self.add_monitor()
|
|
||||||
notification_id = self.add_notification()
|
|
||||||
|
|
||||||
expected_monitor = self.api.get_monitor(monitor_id)
|
|
||||||
expected_monitor["notificationIDList"] = [notification_id]
|
|
||||||
|
|
||||||
r = self.api.edit_monitor(id_=monitor_id, **expected_monitor)
|
|
||||||
self.assertEqual(r["msg"], "Saved.")
|
|
||||||
monitor = self.api.get_monitor(monitor_id)
|
|
||||||
self.compare(monitor, expected_monitor)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -8,7 +8,7 @@ class TestNotification(UptimeKumaTestCase):
|
||||||
def test_notification(self):
|
def test_notification(self):
|
||||||
expected_notification = {
|
expected_notification = {
|
||||||
"name": "notification 1",
|
"name": "notification 1",
|
||||||
"default": True,
|
"isDefault": True,
|
||||||
"applyExisting": True,
|
"applyExisting": True,
|
||||||
"type": "PushByTechulus",
|
"type": "PushByTechulus",
|
||||||
"pushAPIKey": "123456789"
|
"pushAPIKey": "123456789"
|
||||||
|
|
|
@ -1,20 +1,24 @@
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from uptime_kuma_api import UptimeKumaException
|
from uptime_kuma_api import UptimeKumaException, ProxyProtocol
|
||||||
from uptime_kuma_test_case import UptimeKumaTestCase
|
from uptime_kuma_test_case import UptimeKumaTestCase
|
||||||
|
|
||||||
|
|
||||||
class TestProxy(UptimeKumaTestCase):
|
class TestProxy(UptimeKumaTestCase):
|
||||||
def test_proxy(self):
|
def test_proxy(self):
|
||||||
expected_proxy = {
|
expected_proxy = {
|
||||||
"protocol": "http",
|
"protocol": ProxyProtocol.HTTP,
|
||||||
"host": "127.0.0.1",
|
"host": "127.0.0.1",
|
||||||
"port": 8080,
|
"port": 8080,
|
||||||
"active": True
|
"auth": True,
|
||||||
|
"username": "username",
|
||||||
|
"password": "password",
|
||||||
|
"active": True,
|
||||||
|
"default": False
|
||||||
}
|
}
|
||||||
|
|
||||||
# add proxy
|
# add proxy
|
||||||
r = self.api.add_proxy(**expected_proxy)
|
r = self.api.add_proxy(applyExisting=False, **expected_proxy)
|
||||||
self.assertEqual(r["msg"], "Saved")
|
self.assertEqual(r["msg"], "Saved")
|
||||||
proxy_id = r["id"]
|
proxy_id = r["id"]
|
||||||
|
|
||||||
|
|
|
@ -1,18 +1,36 @@
|
||||||
import unittest
|
|
||||||
import json
|
import json
|
||||||
|
import unittest
|
||||||
|
from packaging.version import parse as parse_version
|
||||||
|
|
||||||
from uptime_kuma_test_case import UptimeKumaTestCase
|
from uptime_kuma_test_case import UptimeKumaTestCase
|
||||||
|
|
||||||
|
|
||||||
class TestSettings(UptimeKumaTestCase):
|
class TestSettings(UptimeKumaTestCase):
|
||||||
def test_settings(self):
|
def test_settings(self):
|
||||||
settings_before = self.api.get_settings()
|
expected_settings = {
|
||||||
|
"checkUpdate": False,
|
||||||
|
"checkBeta": False,
|
||||||
|
"keepDataPeriodDays": 180,
|
||||||
|
"entryPage": "dashboard",
|
||||||
|
"searchEngineIndex": False,
|
||||||
|
"primaryBaseURL": "",
|
||||||
|
"steamAPIKey": "",
|
||||||
|
"tlsExpiryNotifyDays": [7, 14, 21],
|
||||||
|
"disableAuth": False
|
||||||
|
}
|
||||||
|
|
||||||
expected_check_update = not settings_before.get("checkUpdate", True)
|
if parse_version(self.api.version) >= parse_version("1.18"):
|
||||||
self.api.set_settings(self.password, checkUpdate=expected_check_update)
|
expected_settings.update({
|
||||||
|
"trustProxy": False
|
||||||
|
})
|
||||||
|
|
||||||
|
# set settings
|
||||||
|
r = self.api.set_settings(self.password, **expected_settings)
|
||||||
|
self.assertEqual(r["msg"], "Saved")
|
||||||
|
|
||||||
|
# get settings
|
||||||
settings = self.api.get_settings()
|
settings = self.api.get_settings()
|
||||||
self.assertEqual(settings["checkUpdate"], expected_check_update)
|
self.compare(settings, expected_settings)
|
||||||
|
|
||||||
def test_change_password(self):
|
def test_change_password(self):
|
||||||
new_password = "321terces"
|
new_password = "321terces"
|
||||||
|
|
|
@ -13,7 +13,14 @@ class TestStatusPage(UptimeKumaTestCase):
|
||||||
"slug": slug,
|
"slug": slug,
|
||||||
"title": "status page 1",
|
"title": "status page 1",
|
||||||
"description": "description 1",
|
"description": "description 1",
|
||||||
|
"theme": "light",
|
||||||
|
"published": True,
|
||||||
|
"showTags": False,
|
||||||
|
"domainNameList": [],
|
||||||
|
"customCSS": "",
|
||||||
|
"footerText": None,
|
||||||
"showPoweredBy": False,
|
"showPoweredBy": False,
|
||||||
|
"icon": "/icon.svg",
|
||||||
"publicGroupList": [
|
"publicGroupList": [
|
||||||
{
|
{
|
||||||
'name': 'Services',
|
'name': 'Services',
|
||||||
|
|
|
@ -70,25 +70,45 @@ class UptimeKumaTestCase(unittest.TestCase):
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
def add_monitor(self):
|
def add_monitor(self):
|
||||||
r = self.api.add_monitor(type=MonitorType.HTTP, name="monitor 1", url="http://127.0.0.1")
|
r = self.api.add_monitor(
|
||||||
|
type=MonitorType.HTTP,
|
||||||
|
name="monitor 1",
|
||||||
|
url="http://127.0.0.1"
|
||||||
|
)
|
||||||
monitor_id = r["monitorID"]
|
monitor_id = r["monitorID"]
|
||||||
return monitor_id
|
return monitor_id
|
||||||
|
|
||||||
def add_tag(self):
|
def add_tag(self):
|
||||||
r = self.api.add_tag(name="tag 1", color="#ffffff")
|
r = self.api.add_tag(
|
||||||
|
name="tag 1",
|
||||||
|
color="#ffffff"
|
||||||
|
)
|
||||||
tag_id = r["id"]
|
tag_id = r["id"]
|
||||||
return tag_id
|
return tag_id
|
||||||
|
|
||||||
def add_notification(self):
|
def add_notification(self):
|
||||||
r = self.api.add_notification(name="notification 1", type="PushByTechulus", pushAPIKey="123456789")
|
r = self.api.add_notification(
|
||||||
|
name="notification 1",
|
||||||
|
type="PushByTechulus",
|
||||||
|
pushAPIKey="123456789"
|
||||||
|
)
|
||||||
notification_id = r["id"]
|
notification_id = r["id"]
|
||||||
return notification_id
|
return notification_id
|
||||||
|
|
||||||
|
def add_proxy(self):
|
||||||
|
r = self.api.add_proxy(
|
||||||
|
protocol="http",
|
||||||
|
host="127.0.0.1",
|
||||||
|
port=8080,
|
||||||
|
active=True
|
||||||
|
)
|
||||||
|
proxy_id = r["id"]
|
||||||
|
return proxy_id
|
||||||
|
|
||||||
def add_docker_host(self):
|
def add_docker_host(self):
|
||||||
expected_docker_host = {
|
r = self.api.add_docker_host(
|
||||||
"name": "docker host 1",
|
name="docker host 1",
|
||||||
"dockerType": DockerType.SOCKET
|
dockerType=DockerType.SOCKET
|
||||||
}
|
)
|
||||||
r = self.api.add_docker_host(**expected_docker_host)
|
|
||||||
docker_host_id = r["id"]
|
docker_host_id = r["id"]
|
||||||
return docker_host_id
|
return docker_host_id
|
||||||
|
|
|
@ -63,6 +63,14 @@ def _build_notification_data(
|
||||||
applyExisting: bool = False,
|
applyExisting: bool = False,
|
||||||
**kwargs
|
**kwargs
|
||||||
):
|
):
|
||||||
|
allowed_kwargs = []
|
||||||
|
for keys in notification_provider_options.values():
|
||||||
|
allowed_kwargs.extend(keys)
|
||||||
|
|
||||||
|
for key in kwargs.keys():
|
||||||
|
if key not in allowed_kwargs:
|
||||||
|
raise TypeError(f"unknown argument '{key}'")
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"name": name,
|
"name": name,
|
||||||
"type": type,
|
"type": type,
|
||||||
|
@ -512,12 +520,12 @@ class UptimeKumaApi(object):
|
||||||
"authWorkstation": authWorkstation,
|
"authWorkstation": authWorkstation,
|
||||||
})
|
})
|
||||||
|
|
||||||
# DNS, PING, STEAM, MQTT
|
# PORT, PING, DNS, STEAM, MQTT
|
||||||
data.update({
|
data.update({
|
||||||
"hostname": hostname,
|
"hostname": hostname,
|
||||||
})
|
})
|
||||||
|
|
||||||
# DNS, STEAM, MQTT
|
# PORT, DNS, STEAM, MQTT
|
||||||
data.update({
|
data.update({
|
||||||
"port": port,
|
"port": port,
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue