forked from DGNum/uptime-kuma-api
380 lines
13 KiB
Python
380 lines
13 KiB
Python
import unittest
|
|
from packaging.version import parse as parse_version
|
|
|
|
from uptime_kuma_api import UptimeKumaException, MonitorType, AuthMethod, MonitorStatus
|
|
from uptime_kuma_test_case import UptimeKumaTestCase
|
|
|
|
|
|
class TestMonitor(UptimeKumaTestCase):
|
|
def test_monitor(self):
|
|
# get empty list to make sure that future accesses will also work
|
|
self.api.get_monitors()
|
|
|
|
notification_id_1 = self.add_notification()
|
|
notification_id_2 = self.add_notification()
|
|
|
|
expected_monitor = {
|
|
"type": MonitorType.HTTP,
|
|
"name": "monitor 1",
|
|
"interval": 60,
|
|
"retryInterval": 60,
|
|
"maxretries": 0,
|
|
"notificationIDList": [notification_id_1, notification_id_2],
|
|
"upsideDown": False,
|
|
"url": "http://127.0.0.1",
|
|
"resendInterval": 0
|
|
}
|
|
|
|
# add monitor
|
|
r = self.api.add_monitor(**expected_monitor)
|
|
self.assertEqual(r["msg"], "Added Successfully.")
|
|
monitor_id = r["monitorID"]
|
|
|
|
# get monitor
|
|
monitor = self.api.get_monitor(monitor_id)
|
|
self.compare(monitor, expected_monitor)
|
|
|
|
# get monitors
|
|
monitors = self.api.get_monitors()
|
|
self.assertTrue(type(monitors[0]["type"]) == MonitorType)
|
|
self.assertTrue(type(monitors[0]["authMethod"]) == AuthMethod)
|
|
monitor = self.find_by_id(monitors, monitor_id)
|
|
self.assertTrue(type(monitor["type"]) == MonitorType)
|
|
self.assertTrue(type(monitor["authMethod"]) == AuthMethod)
|
|
self.assertIsNotNone(monitor)
|
|
self.compare(monitor, expected_monitor)
|
|
|
|
# edit monitor
|
|
expected_monitor["type"] = MonitorType.PING
|
|
expected_monitor["name"] = "monitor 1 new"
|
|
expected_monitor["hostname"] = "127.0.0.10"
|
|
del expected_monitor["url"]
|
|
r = self.api.edit_monitor(monitor_id, **expected_monitor)
|
|
self.assertEqual(r["msg"], "Saved.")
|
|
monitor = self.api.get_monitor(monitor_id)
|
|
self.compare(monitor, expected_monitor)
|
|
|
|
# pause monitor
|
|
r = self.api.pause_monitor(monitor_id)
|
|
self.assertEqual(r["msg"], "Paused Successfully.")
|
|
|
|
# resume monitor
|
|
r = self.api.resume_monitor(monitor_id)
|
|
self.assertEqual(r["msg"], "Resumed Successfully.")
|
|
|
|
# get monitor beats
|
|
r = self.api.get_monitor_beats(monitor_id, 6)
|
|
self.assertTrue(type(r[0]["status"]) == MonitorStatus)
|
|
|
|
# delete monitor
|
|
r = self.api.delete_monitor(monitor_id)
|
|
self.assertEqual(r["msg"], "Deleted Successfully.")
|
|
with self.assertRaises(UptimeKumaException):
|
|
self.api.get_monitor(monitor_id)
|
|
|
|
def do_test_monitor_type(self, expected_monitor):
|
|
r = self.api.add_monitor(**expected_monitor)
|
|
self.assertEqual(r["msg"], "Added Successfully.")
|
|
monitor_id = r["monitorID"]
|
|
|
|
monitor = self.api.get_monitor(monitor_id)
|
|
self.compare(monitor, expected_monitor)
|
|
|
|
expected_monitor.update({
|
|
"name": "monitor 2"
|
|
})
|
|
r = self.api.edit_monitor(monitor_id, **expected_monitor)
|
|
self.assertEqual(r["msg"], "Saved.")
|
|
monitor = self.api.get_monitor(monitor_id)
|
|
self.compare(monitor, expected_monitor)
|
|
|
|
monitor = self.api.get_monitor(monitor_id)
|
|
self.compare(monitor, expected_monitor)
|
|
return monitor
|
|
|
|
def test_monitor_type_http(self):
|
|
proxy_id = self.add_proxy()
|
|
|
|
json_data = '{"key": "value"}'
|
|
expected_monitor = {
|
|
"type": MonitorType.HTTP,
|
|
"name": "monitor 1",
|
|
"url": "http://127.0.0.1",
|
|
"expiryNotification": False,
|
|
"ignoreTls": False,
|
|
"maxredirects": 10,
|
|
"accepted_statuscodes": ["200-299"],
|
|
"proxyId": proxy_id,
|
|
"method": "GET",
|
|
"body": json_data,
|
|
"headers": json_data,
|
|
"authMethod": AuthMethod.NONE
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_auth_method(self):
|
|
for auth_method in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]:
|
|
expected_monitor = {
|
|
"type": MonitorType.HTTP,
|
|
"name": "monitor 1",
|
|
"url": "http://127.0.0.1",
|
|
"authMethod": auth_method,
|
|
"basic_auth_user": "auth user",
|
|
"basic_auth_pass": "auth pass",
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
expected_monitor = {
|
|
"type": MonitorType.HTTP,
|
|
"name": "monitor 1",
|
|
"url": "http://127.0.0.1",
|
|
"authMethod": AuthMethod.NTLM,
|
|
"authDomain": "auth domain",
|
|
"authWorkstation": "auth workstation",
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
expected_monitor = {
|
|
"type": MonitorType.HTTP,
|
|
"name": "monitor 1",
|
|
"url": "http://127.0.0.1",
|
|
"authMethod": AuthMethod.MTLS,
|
|
"tlsCert": "cert",
|
|
"tlsKey": "key",
|
|
"tlsCa": "ca",
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_port(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.PORT,
|
|
"name": "monitor 1",
|
|
"hostname": "127.0.0.1",
|
|
"port": 8888
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_ping(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.PING,
|
|
"name": "monitor 1",
|
|
"hostname": "127.0.0.1",
|
|
"packetSize": 56
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_keyword(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.KEYWORD,
|
|
"name": "monitor 1",
|
|
"url": "http://127.0.0.1",
|
|
"keyword": "healthy"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_grpc_keyword(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.GRPC_KEYWORD,
|
|
"name": "monitor 1",
|
|
"grpcUrl": "127.0.0.1",
|
|
"keyword": "healthy",
|
|
"grpcServiceName": "health",
|
|
"grpcMethod": "check",
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_dns(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.DNS,
|
|
"name": "monitor 1",
|
|
"hostname": "127.0.0.1",
|
|
"port": 8888,
|
|
"dns_resolve_server": "1.1.1.1",
|
|
"dns_resolve_type": "A"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_docker(self):
|
|
docker_host_id = self.add_docker_host()
|
|
expected_monitor = {
|
|
"type": MonitorType.DOCKER,
|
|
"name": "monitor 1",
|
|
"docker_container": "test",
|
|
"docker_host": docker_host_id
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_push(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.PUSH,
|
|
"name": "monitor 1"
|
|
}
|
|
monitor = self.do_test_monitor_type(expected_monitor)
|
|
|
|
# https://github.com/lucasheld/ansible-uptime-kuma/issues/5
|
|
self.assertIsNotNone(monitor["pushToken"])
|
|
|
|
def test_monitor_type_steam(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.STEAM,
|
|
"name": "monitor 1",
|
|
"hostname": "127.0.0.1",
|
|
"port": 8888
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_gamedig(self):
|
|
game_list = self.api.get_game_list()
|
|
game = game_list[0]["keys"][0]
|
|
expected_monitor = {
|
|
"type": MonitorType.GAMEDIG,
|
|
"name": "monitor 1",
|
|
"hostname": "127.0.0.1",
|
|
"port": 8888,
|
|
"game": game
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_mqtt(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.MQTT,
|
|
"name": "monitor 1",
|
|
"hostname": "127.0.0.1",
|
|
"port": 8888,
|
|
"mqttUsername": "mqtt username",
|
|
"mqttPassword": "mqtt password",
|
|
"mqttTopic": "mqtt topic",
|
|
"mqttSuccessMessage": "mqtt success message"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_sqlserver(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.SQLSERVER,
|
|
"name": "monitor 1",
|
|
"databaseConnectionString": "Server=127.0.0.1,8888;Database=test;User Id=1;Password=secret123;Encrypt=true;"
|
|
"TrustServerCertificate=Yes;Connection Timeout=5",
|
|
"databaseQuery": "select getdate()"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_postgres(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.POSTGRES,
|
|
"name": "monitor 1",
|
|
"databaseConnectionString": "postgres://username:password@host:port/database",
|
|
"databaseQuery": "select getdate()"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_mysql(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.MYSQL,
|
|
"name": "monitor 1",
|
|
"databaseConnectionString": "mysql://username:password@host:port/database",
|
|
"databaseQuery": "select getdate()"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_mongodb(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.MONGODB,
|
|
"name": "monitor 1",
|
|
"databaseConnectionString": "mongodb://username:password@host:port/database"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_radius(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.RADIUS,
|
|
"name": "monitor 1",
|
|
"radiusUsername": "123",
|
|
"radiusPassword": "456",
|
|
"radiusSecret": "789",
|
|
"radiusCalledStationId": "1",
|
|
"radiusCallingStationId": "2"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_redis(self):
|
|
expected_monitor = {
|
|
"type": MonitorType.REDIS,
|
|
"name": "monitor 1",
|
|
"databaseConnectionString": "redis://user:password@host:port"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_group(self):
|
|
if parse_version(self.api.version) < parse_version("1.22"):
|
|
self.skipTest("Unsupported in this Uptime Kuma version")
|
|
|
|
# create monitor group
|
|
expected_monitor = {
|
|
"type": MonitorType.GROUP,
|
|
"name": "monitor 1"
|
|
}
|
|
group_monitor = self.do_test_monitor_type(expected_monitor)
|
|
group_monitor_id = group_monitor["id"]
|
|
|
|
# use monitor group as parent for another monitor
|
|
expected_monitor = {
|
|
"type": MonitorType.PUSH,
|
|
"name": "monitor 1",
|
|
"parent": group_monitor_id
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_json_query(self):
|
|
if parse_version(self.api.version) < parse_version("1.23"):
|
|
self.skipTest("Unsupported in this Uptime Kuma version")
|
|
|
|
expected_monitor = {
|
|
"type": MonitorType.JSON_QUERY,
|
|
"name": "monitor 1",
|
|
"url": "http://127.0.0.1",
|
|
"jsonPath": "address.country",
|
|
"expectedValue": "germany",
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_real_browser(self):
|
|
if parse_version(self.api.version) < parse_version("1.23"):
|
|
self.skipTest("Unsupported in this Uptime Kuma version")
|
|
|
|
expected_monitor = {
|
|
"type": MonitorType.REAL_BROWSER,
|
|
"name": "monitor 1",
|
|
"url": "http://127.0.0.1",
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_kafka_producer(self):
|
|
if parse_version(self.api.version) < parse_version("1.23"):
|
|
self.skipTest("Unsupported in this Uptime Kuma version")
|
|
|
|
expected_monitor = {
|
|
"type": MonitorType.KAFKA_PRODUCER,
|
|
"name": "monitor 1",
|
|
"kafkaProducerTopic": "topic",
|
|
"kafkaProducerMessage": "message",
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_monitor_type_tailscale_ping(self):
|
|
if parse_version(self.api.version) < parse_version("1.23"):
|
|
self.skipTest("Unsupported in this Uptime Kuma version")
|
|
|
|
expected_monitor = {
|
|
"type": MonitorType.TAILSCALE_PING,
|
|
"name": "monitor 1",
|
|
"hostname": "127.0.0.1"
|
|
}
|
|
self.do_test_monitor_type(expected_monitor)
|
|
|
|
def test_delete_not_existing_monitor(self):
|
|
with self.assertRaises(UptimeKumaException):
|
|
self.api.delete_monitor(42)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|