add support for uptime kuma 1.18.0

This commit is contained in:
lucasheld 2022-09-07 13:03:10 +02:00
parent 45b8b88166
commit a7f571f508
12 changed files with 425 additions and 178 deletions

View file

@ -8,6 +8,8 @@ This package was developed to configure Uptime Kuma with Ansible. The Ansible co
Python version 3.6+ is required. Python version 3.6+ is required.
Supported Uptime Kuma versions: 1.18, 1.17
Installation Installation
--- ---
uptime-kuma-api is available on the [Python Package Index (PyPI)](https://pypi.org/project/uptime-kuma-api/). uptime-kuma-api is available on the [Python Package Index (PyPI)](https://pypi.org/project/uptime-kuma-api/).

View file

@ -1 +1,2 @@
python-socketio[client]>=5.0.0 python-socketio[client]>=5.0.0
packaging

View file

@ -30,7 +30,10 @@ setup(
license=info["__license__"], license=info["__license__"],
packages=["uptime_kuma_api"], packages=["uptime_kuma_api"],
python_requires=">=3.6, <4", python_requires=">=3.6, <4",
install_requires=["python-socketio[client]>=5.0.0"], install_requires=[
"python-socketio[client]>=5.0.0",
"packaging"
],
classifiers=[ classifiers=[
"Development Status :: 1 - Planning", "Development Status :: 1 - Planning",
"Environment :: Web Environment", "Environment :: Web Environment",

56
tests/test_docker_host.py Normal file
View file

@ -0,0 +1,56 @@
import unittest
from packaging.version import parse as parse_version
from uptime_kuma_api import DockerType, UptimeKumaException
from uptime_kuma_test_case import UptimeKumaTestCase
class TestDockerHost(UptimeKumaTestCase):
def setUp(self):
super(TestDockerHost, self).setUp()
if parse_version(self.api.version) < parse_version("1.18"):
super(TestDockerHost, self).tearDown()
self.skipTest("Unsupported in this Uptime Kuma version")
def test_docker_host(self):
expected_docker_host = {
"name": "name 1",
"dockerType": DockerType.SOCKET,
"dockerDaemon": "/var/run/docker.sock"
}
# test docker host
with self.assertRaisesRegex(UptimeKumaException, r'connect ENOENT /var/run/docker.sock'):
self.api.test_docker_host(**expected_docker_host)
# add docker host
r = self.api.add_docker_host(**expected_docker_host)
self.assertEqual(r["msg"], "Saved")
docker_host_id = r["id"]
# get docker host
docker_host = self.api.get_docker_host(docker_host_id)
self.compare(docker_host, expected_docker_host)
# get docker hosts
docker_hosts = self.api.get_docker_hosts()
docker_host = self.find_by_id(docker_hosts, docker_host_id)
self.assertIsNotNone(docker_host)
self.compare(docker_host, expected_docker_host)
# edit docker host
r = self.api.edit_docker_host(docker_host_id, name="name 2")
self.assertEqual(r["msg"], "Saved")
docker_host = self.api.get_docker_host(docker_host_id)
expected_docker_host["name"] = "name 2"
self.compare(docker_host, expected_docker_host)
# delete docker host
r = self.api.delete_docker_host(docker_host_id)
self.assertEqual(r["msg"], "Deleted")
with self.assertRaises(UptimeKumaException):
self.api.get_docker_host(docker_host_id)
if __name__ == '__main__':
unittest.main()

View file

@ -1,4 +1,5 @@
import unittest import unittest
from packaging.version import parse as parse_version
from uptime_kuma_api import UptimeKumaException, MonitorType from uptime_kuma_api import UptimeKumaException, MonitorType
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
@ -63,10 +64,13 @@ class TestMonitor(UptimeKumaTestCase):
self.compare(monitor, expected_monitor) self.compare(monitor, expected_monitor)
def test_monitor_type_http(self): def test_monitor_type_http(self):
json_data = '{"key": "value"}'
expected_monitor = { expected_monitor = {
"type": MonitorType.HTTP, "type": MonitorType.HTTP,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1" "url": "http://127.0.0.1",
"body": json_data,
"headers": json_data
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
@ -83,7 +87,7 @@ class TestMonitor(UptimeKumaTestCase):
expected_monitor = { expected_monitor = {
"type": MonitorType.PING, "type": MonitorType.PING,
"name": "monitor 1", "name": "monitor 1",
"hostname": "127.0.0.1", "hostname": "127.0.0.1"
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
@ -100,18 +104,16 @@ class TestMonitor(UptimeKumaTestCase):
expected_monitor = { expected_monitor = {
"type": MonitorType.DNS, "type": MonitorType.DNS,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1",
"hostname": "127.0.0.1", "hostname": "127.0.0.1",
"port": 8888, "port": 8888,
"dns_resolve_server": "1.1.1.1", "dns_resolve_server": "1.1.1.1"
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
def test_monitor_type_push(self): def test_monitor_type_push(self):
expected_monitor = { expected_monitor = {
"type": MonitorType.PUSH, "type": MonitorType.PUSH,
"name": "monitor 1", "name": "monitor 1"
"url": "http://127.0.0.1"
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
@ -119,9 +121,8 @@ class TestMonitor(UptimeKumaTestCase):
expected_monitor = { expected_monitor = {
"type": MonitorType.STEAM, "type": MonitorType.STEAM,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1",
"hostname": "127.0.0.1", "hostname": "127.0.0.1",
"port": 8888, "port": 8888
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
@ -129,7 +130,6 @@ class TestMonitor(UptimeKumaTestCase):
expected_monitor = { expected_monitor = {
"type": MonitorType.MQTT, "type": MonitorType.MQTT,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1",
"hostname": "127.0.0.1", "hostname": "127.0.0.1",
"port": 8888, "port": 8888,
"mqttTopic": "test" "mqttTopic": "test"
@ -140,13 +140,52 @@ class TestMonitor(UptimeKumaTestCase):
expected_monitor = { expected_monitor = {
"type": MonitorType.SQLSERVER, "type": MonitorType.SQLSERVER,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1",
"databaseConnectionString": "Server=127.0.0.1,8888;Database=test;User Id=1;Password=secret123;Encrypt=true;" "databaseConnectionString": "Server=127.0.0.1,8888;Database=test;User Id=1;Password=secret123;Encrypt=true;"
"TrustServerCertificate=Yes;Connection Timeout=5", "TrustServerCertificate=Yes;Connection Timeout=5",
"databaseQuery": "select getdate()" "databaseQuery": "select getdate()"
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
def test_monitor_type_postgres(self):
if parse_version(self.api.version) < parse_version("1.18"):
self.skipTest("Unsupported in this Uptime Kuma version")
expected_monitor = {
"type": MonitorType.POSTGRES,
"name": "monitor 1",
"databaseConnectionString": "postgres://username:password@host:port/database",
"databaseQuery": "select getdate()"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_docker(self):
if parse_version(self.api.version) < parse_version("1.18"):
self.skipTest("Unsupported in this Uptime Kuma version")
docker_host_id = self.add_docker_host()
expected_monitor = {
"type": MonitorType.DOCKER,
"name": "monitor 1",
"docker_container": "test",
"docker_host": docker_host_id
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_radius(self):
if parse_version(self.api.version) < parse_version("1.18"):
self.skipTest("Unsupported in this Uptime Kuma version")
expected_monitor = {
"type": MonitorType.RADIUS,
"name": "monitor 1",
"radiusUsername": "123",
"radiusPassword": "456",
"radiusSecret": "789",
"radiusCalledStationId": "1",
"radiusCallingStationId": "2"
}
self.do_test_monitor_type(expected_monitor)
def test_edit_notification_id_list(self): def test_edit_notification_id_list(self):
# https://github.com/lucasheld/uptime-kuma-api/issues/3 # https://github.com/lucasheld/uptime-kuma-api/issues/3

View file

@ -1,7 +1,7 @@
import json import json
import unittest import unittest
from uptime_kuma_api import UptimeKumaApi, Event, MonitorType from uptime_kuma_api import UptimeKumaApi, Event, MonitorType, DockerType
token = None token = None
@ -83,3 +83,12 @@ class UptimeKumaTestCase(unittest.TestCase):
r = self.api.add_notification(name="notification 1", type="PushByTechulus", pushAPIKey="123456789") r = self.api.add_notification(name="notification 1", type="PushByTechulus", pushAPIKey="123456789")
notification_id = r["id"] notification_id = r["id"]
return notification_id return notification_id
def add_docker_host(self):
expected_docker_host = {
"name": "docker host 1",
"dockerType": DockerType.SOCKET
}
r = self.api.add_docker_host(**expected_docker_host)
docker_host_id = r["id"]
return docker_host_id

View file

@ -1,9 +1,10 @@
from .__version__ import __title__, __version__, __author__, __copyright__ from .__version__ import __title__, __version__, __author__, __copyright__
from .auth_method import AuthMethod from .auth_method import AuthMethod
from .monitor_type import MonitorType from .monitor_type import MonitorType
from .notification_providers import NotificationType, notification_provider_options from .notification_providers import NotificationType, notification_provider_options, notification_provider_conditions
from .proxy_protocol import ProxyProtocol from .proxy_protocol import ProxyProtocol
from .incident_style import IncidentStyle from .incident_style import IncidentStyle
from .docker_type import DockerType
from .exceptions import UptimeKumaException from .exceptions import UptimeKumaException
from .event import Event from .event import Event
from .api import UptimeKumaApi from .api import UptimeKumaApi

View file

@ -3,12 +3,14 @@ import time
import requests import requests
import socketio import socketio
from packaging.version import parse as parse_version
from . import AuthMethod from . import AuthMethod
from . import MonitorType from . import MonitorType
from . import NotificationType, notification_provider_options from . import NotificationType, notification_provider_options, notification_provider_conditions
from . import ProxyProtocol from . import ProxyProtocol
from . import IncidentStyle from . import IncidentStyle
from . import DockerType
from . import Event from . import Event
from . import UptimeKumaException from . import UptimeKumaException
@ -23,7 +25,7 @@ def int_to_bool(data, keys):
data[key] = True if data[key] == 1 else False data[key] = True if data[key] == 1 else False
def _convert_monitor_data(**kwargs): def _convert_monitor_data(kwargs):
if not kwargs["accepted_statuscodes"]: if not kwargs["accepted_statuscodes"]:
kwargs["accepted_statuscodes"] = ["200-299"] kwargs["accepted_statuscodes"] = ["200-299"]
@ -32,146 +34,15 @@ def _convert_monitor_data(**kwargs):
for notification_id in kwargs["notificationIDList"]: for notification_id in kwargs["notificationIDList"]:
dict_notification_ids[notification_id] = True dict_notification_ids[notification_id] = True
kwargs["notificationIDList"] = dict_notification_ids kwargs["notificationIDList"] = dict_notification_ids
if not kwargs["databaseConnectionString"]:
if kwargs["type"] == MonitorType.SQLSERVER:
kwargs["databaseConnectionString"] = "Server=<hostname>,<port>;Database=<your database>;User Id=<your user id>;Password=<your password>;Encrypt=<true/false>;TrustServerCertificate=<Yes/No>;Connection Timeout=<int>"
elif kwargs["type"] == MonitorType.POSTGRES:
kwargs["databaseConnectionString"] = "postgres://username:password@host:port/database"
return kwargs return kwargs
def _build_monitor_data(
type: MonitorType,
name: str,
interval: int = 60,
retryInterval: int = 60,
maxretries: int = 0,
upsideDown: bool = False,
tags: list = None,
notificationIDList: list = None,
# HTTP, KEYWORD
url: str = None,
expiryNotification: bool = False,
ignoreTls: bool = False,
maxredirects: int = 10,
accepted_statuscodes: list = None,
proxyId: int = None,
method: str = "GET",
body: str = None,
headers: str = None,
authMethod: AuthMethod = AuthMethod.NONE,
basic_auth_user: str = None,
basic_auth_pass: str = None,
authDomain: str = None,
authWorkstation: str = None,
# KEYWORD
keyword: str = None,
# DNS, PING, STEAM, MQTT
hostname: str = None,
# DNS, STEAM, MQTT
port: int = 53,
# DNS
dns_resolve_server: str = "1.1.1.1",
dns_resolve_type: str = "A",
# MQTT
mqttUsername: str = None,
mqttPassword: str = None,
mqttTopic: str = None,
mqttSuccessMessage: str = None,
# SQLSERVER
databaseConnectionString: str = "Server=<hostname>,<port>;"
"Database=<your database>;"
"User Id=<your user id>;"
"Password=<your password>;"
"Encrypt=<true/false>;"
"TrustServerCertificate=<Yes/No>;"
"Connection Timeout=<int>",
databaseQuery: str = None
):
data = {
"type": type,
"name": name,
"interval": interval,
"retryInterval": retryInterval,
"maxretries": maxretries,
"notificationIDList": notificationIDList,
"upsideDown": upsideDown,
}
if tags:
data.update({
"tags": tags
})
if type == MonitorType.KEYWORD:
data.update({
"keyword": keyword,
})
# HTTP, KEYWORD
data.update({
"url": url,
"expiryNotification": expiryNotification,
"ignoreTls": ignoreTls,
"maxredirects": maxredirects,
"accepted_statuscodes": accepted_statuscodes,
"proxyId": proxyId,
"method": method,
"body": body,
"headers": headers,
"authMethod": authMethod,
})
if authMethod in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]:
data.update({
"basic_auth_user": basic_auth_user,
"basic_auth_pass": basic_auth_pass,
})
if authMethod == AuthMethod.NTLM:
data.update({
"authDomain": authDomain,
"authWorkstation": authWorkstation,
})
# DNS, PING, STEAM, MQTT
data.update({
"hostname": hostname,
})
# DNS, STEAM, MQTT
data.update({
"port": port,
})
# DNS
data.update({
"dns_resolve_server": dns_resolve_server,
"dns_resolve_type": dns_resolve_type,
})
# MQTT
data.update({
"mqttUsername": mqttUsername,
"mqttPassword": mqttPassword,
"mqttTopic": mqttTopic,
"mqttSuccessMessage": mqttSuccessMessage,
})
# SQLSERVER
data.update({
"databaseConnectionString": databaseConnectionString
})
if type == MonitorType.SQLSERVER:
data.update({
"databaseQuery": databaseQuery,
})
return data
def _build_notification_data( def _build_notification_data(
name: str, name: str,
type: NotificationType, type: NotificationType,
@ -255,6 +126,28 @@ def _build_status_page_data(
return slug, config, icon, publicGroupList return slug, config, icon, publicGroupList
def _convert_docker_host_data(kwargs):
if not kwargs["dockerDaemon"]:
if kwargs["dockerType"] == DockerType.SOCKET:
kwargs["dockerDaemon"] = "/var/run/docker.sock"
elif kwargs["dockerType"] == DockerType.TCP:
kwargs["dockerDaemon"] = "tcp://localhost:2375"
return kwargs
def _build_docker_host_data(
name: str,
dockerType: DockerType,
dockerDaemon: str = None
):
data = {
"name": name,
"dockerType": dockerType,
"dockerDaemon": dockerDaemon
}
return data
def _check_missing_arguments(required_params, kwargs): def _check_missing_arguments(required_params, kwargs):
missing_arguments = [] missing_arguments = []
for required_param in required_params: for required_param in required_params:
@ -298,6 +191,9 @@ def _check_arguments_monitor(kwargs):
MonitorType.STEAM: ["hostname", "port"], MonitorType.STEAM: ["hostname", "port"],
MonitorType.MQTT: ["hostname", "port", "mqttTopic"], MonitorType.MQTT: ["hostname", "port", "mqttTopic"],
MonitorType.SQLSERVER: [], MonitorType.SQLSERVER: [],
MonitorType.POSTGRES: [],
MonitorType.DOCKER: ["docker_container", "docker_host"],
MonitorType.RADIUS: ["radiusUsername", "radiusPassword", "radiusSecret", "radiusCalledStationId", "radiusCallingStationId"]
} }
type_ = kwargs["type"] type_ = kwargs["type"]
required_args = required_args_by_type[type_] required_args = required_args_by_type[type_]
@ -331,22 +227,7 @@ def _check_arguments_notification(kwargs):
type_ = kwargs["type"] type_ = kwargs["type"]
required_args = notification_provider_options[type_] required_args = notification_provider_options[type_]
_check_missing_arguments(required_args, kwargs) _check_missing_arguments(required_args, kwargs)
_check_argument_conditions(notification_provider_conditions, kwargs)
provider_conditions = {
'gotifyPriority': {
'max': 10,
'min': 0
},
'ntfyPriority': {
'max': 5,
'min': 1
},
'smtpPort': {
'max': 65535,
'min': 0
}
}
_check_argument_conditions(provider_conditions, kwargs)
def _check_arguments_proxy(kwargs): def _check_arguments_proxy(kwargs):
@ -380,7 +261,8 @@ class UptimeKumaApi(object):
Event.UPTIME: None, Event.UPTIME: None,
Event.HEARTBEAT: None, Event.HEARTBEAT: None,
Event.INFO: None, Event.INFO: None,
Event.CERT_INFO: None Event.CERT_INFO: None,
Event.DOCKER_HOST_LIST: None
} }
self.sio.on(Event.CONNECT, self._event_connect) self.sio.on(Event.CONNECT, self._event_connect)
@ -396,6 +278,7 @@ class UptimeKumaApi(object):
self.sio.on(Event.HEARTBEAT, self._event_heartbeat) self.sio.on(Event.HEARTBEAT, self._event_heartbeat)
self.sio.on(Event.INFO, self._event_info) self.sio.on(Event.INFO, self._event_info)
self.sio.on(Event.CERT_INFO, self._event_cert_info) self.sio.on(Event.CERT_INFO, self._event_cert_info)
self.sio.on(Event.DOCKER_HOST_LIST, self._event_docker_host_list)
self.connect() self.connect()
@ -488,6 +371,9 @@ class UptimeKumaApi(object):
"data": data, "data": data,
}) })
def _event_docker_host_list(self, data):
self._event_data[Event.DOCKER_HOST_LIST] = data
# connection # connection
def connect(self): def connect(self):
@ -500,6 +386,178 @@ class UptimeKumaApi(object):
def disconnect(self): def disconnect(self):
self.sio.disconnect() self.sio.disconnect()
# builder
@property
def version(self):
info = self.info()
return info["version"]
def _build_monitor_data(
self,
type: MonitorType,
name: str,
interval: int = 60,
retryInterval: int = 60,
resendInterval: int = 0,
maxretries: int = 0,
upsideDown: bool = False,
tags: list = None,
notificationIDList: list = None,
# HTTP, KEYWORD
url: str = None,
expiryNotification: bool = False,
ignoreTls: bool = False,
maxredirects: int = 10,
accepted_statuscodes: list = None,
proxyId: int = None,
method: str = "GET",
body: str = None,
headers: str = None,
authMethod: AuthMethod = AuthMethod.NONE,
basic_auth_user: str = None,
basic_auth_pass: str = None,
authDomain: str = None,
authWorkstation: str = None,
# KEYWORD
keyword: str = None,
# DNS, PING, STEAM, MQTT
hostname: str = None,
# DNS, STEAM, MQTT
port: int = 53,
# DNS
dns_resolve_server: str = "1.1.1.1",
dns_resolve_type: str = "A",
# MQTT
mqttUsername: str = None,
mqttPassword: str = None,
mqttTopic: str = None,
mqttSuccessMessage: str = None,
# SQLSERVER, POSTGRES
databaseConnectionString: str = None,
databaseQuery: str = None,
# DOCKER
docker_container: str = "",
docker_host: int = None,
# RADIUS
radiusUsername: str = None,
radiusPassword: str = None,
radiusSecret: str = None,
radiusCalledStationId: str = None,
radiusCallingStationId: str = None
):
data = {
"type": type,
"name": name,
"interval": interval,
"retryInterval": retryInterval,
"maxretries": maxretries,
"notificationIDList": notificationIDList,
"upsideDown": upsideDown,
}
if parse_version(self.version) >= parse_version("1.18"):
data.update({
"resendInterval": resendInterval
})
if tags:
data.update({
"tags": tags
})
if type == MonitorType.KEYWORD:
data.update({
"keyword": keyword,
})
# HTTP, KEYWORD
data.update({
"url": url,
"expiryNotification": expiryNotification,
"ignoreTls": ignoreTls,
"maxredirects": maxredirects,
"accepted_statuscodes": accepted_statuscodes,
"proxyId": proxyId,
"method": method,
"body": body,
"headers": headers,
"authMethod": authMethod,
})
if authMethod in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]:
data.update({
"basic_auth_user": basic_auth_user,
"basic_auth_pass": basic_auth_pass,
})
if authMethod == AuthMethod.NTLM:
data.update({
"authDomain": authDomain,
"authWorkstation": authWorkstation,
})
# DNS, PING, STEAM, MQTT
data.update({
"hostname": hostname,
})
# DNS, STEAM, MQTT
data.update({
"port": port,
})
# DNS
data.update({
"dns_resolve_server": dns_resolve_server,
"dns_resolve_type": dns_resolve_type,
})
# MQTT
data.update({
"mqttUsername": mqttUsername,
"mqttPassword": mqttPassword,
"mqttTopic": mqttTopic,
"mqttSuccessMessage": mqttSuccessMessage,
})
# SQLSERVER, POSTGRES
data.update({
"databaseConnectionString": databaseConnectionString
})
if type in [MonitorType.SQLSERVER, MonitorType.POSTGRES]:
data.update({
"databaseQuery": databaseQuery,
})
# DOCKER
if type == MonitorType.DOCKER:
data.update({
"docker_container": docker_container,
"docker_host": docker_host
})
# RADIUS
if type == MonitorType.RADIUS:
data.update({
"radiusUsername": radiusUsername,
"radiusPassword": radiusPassword,
"radiusSecret": radiusSecret,
"radiusCalledStationId": radiusCalledStationId,
"radiusCallingStationId": radiusCallingStationId
})
return data
# monitors # monitors
def get_monitors(self): def get_monitors(self):
@ -527,8 +585,8 @@ class UptimeKumaApi(object):
return r return r
def add_monitor(self, **kwargs): def add_monitor(self, **kwargs):
data = _build_monitor_data(**kwargs) data = self._build_monitor_data(**kwargs)
data = _convert_monitor_data(**data) data = _convert_monitor_data(data)
_check_arguments_monitor(data) _check_arguments_monitor(data)
r = self._call('add', data) r = self._call('add', data)
return r return r
@ -536,7 +594,7 @@ class UptimeKumaApi(object):
def edit_monitor(self, id_: int, **kwargs): def edit_monitor(self, id_: int, **kwargs):
data = self.get_monitor(id_) data = self.get_monitor(id_)
data.update(kwargs) data.update(kwargs)
data = _convert_monitor_data(**data) data = _convert_monitor_data(data)
_check_arguments_monitor(data) _check_arguments_monitor(data)
r = self._call('editMonitor', data) r = self._call('editMonitor', data)
return r return r
@ -724,7 +782,7 @@ class UptimeKumaApi(object):
# info # info
def info(self): def info(self) -> dict:
r = self._get_event_data(Event.INFO) r = self._get_event_data(Event.INFO)
return r return r
@ -796,7 +854,10 @@ class UptimeKumaApi(object):
tlsExpiryNotifyDays: list = None, tlsExpiryNotifyDays: list = None,
# security # security
disableAuth: bool = False disableAuth: bool = False,
# reverse proxy
trustProxy: bool = False
): ):
if not tlsExpiryNotifyDays: if not tlsExpiryNotifyDays:
tlsExpiryNotifyDays = [7, 14, 21] tlsExpiryNotifyDays = [7, 14, 21]
@ -812,6 +873,10 @@ class UptimeKumaApi(object):
"tlsExpiryNotifyDays": tlsExpiryNotifyDays, "tlsExpiryNotifyDays": tlsExpiryNotifyDays,
"disableAuth": disableAuth "disableAuth": disableAuth
} }
if parse_version(self.version) >= parse_version("1.18"):
data.update({
"trustProxy": trustProxy
})
return self._call('setSettings', (data, password)) return self._call('setSettings', (data, password))
def change_password(self, old_password: str, new_password: str): def change_password(self, old_password: str, new_password: str):
@ -872,3 +937,34 @@ class UptimeKumaApi(object):
def shrink_database(self): def shrink_database(self):
return self._call('shrinkDatabase') return self._call('shrinkDatabase')
# docker host
def get_docker_hosts(self):
r = self._get_event_data(Event.DOCKER_HOST_LIST)
return r
def get_docker_host(self, id_: int):
docker_hosts = self.get_docker_hosts()
for docker_host in docker_hosts:
if docker_host["id"] == id_:
return docker_host
raise UptimeKumaException("docker host does not exist")
def test_docker_host(self, **kwargs):
data = _build_docker_host_data(**kwargs)
return self._call('testDockerHost', data)
def add_docker_host(self, **kwargs):
data = _build_docker_host_data(**kwargs)
data = _convert_docker_host_data(data)
return self._call('addDockerHost', (data, None))
def edit_docker_host(self, id_: int, **kwargs):
data = self.get_docker_host(id_)
data.update(kwargs)
data = _convert_docker_host_data(data)
return self._call('addDockerHost', (data, id_))
def delete_docker_host(self, id_: int):
return self._call('deleteDockerHost', id_)

View file

@ -0,0 +1,6 @@
from enum import Enum
class DockerType(str, Enum):
SOCKET = "socket"
TCP = "tcp"

View file

@ -15,3 +15,4 @@ class Event(str, Enum):
HEARTBEAT = "heartbeat" HEARTBEAT = "heartbeat"
INFO = "info" INFO = "info"
CERT_INFO = "certInfo" CERT_INFO = "certInfo"
DOCKER_HOST_LIST = "dockerHostList"

View file

@ -7,7 +7,10 @@ class MonitorType(str, Enum):
PING = "ping" PING = "ping"
KEYWORD = "keyword" KEYWORD = "keyword"
DNS = "dns" DNS = "dns"
DOCKER = "docker"
PUSH = "push" PUSH = "push"
STEAM = "steam" STEAM = "steam"
MQTT = "mqtt" MQTT = "mqtt"
SQLSERVER = "sqlserver" SQLSERVER = "sqlserver"
POSTGRES = "postgres"
RADIUS = "radius"

View file

@ -5,7 +5,6 @@ class NotificationType(str, Enum):
ALERTA = "alerta" ALERTA = "alerta"
ALIYUNSMS = "AliyunSMS" ALIYUNSMS = "AliyunSMS"
APPRISE = "apprise" APPRISE = "apprise"
BARK = "Bark"
CLICKSENDSMS = "clicksendsms" CLICKSENDSMS = "clicksendsms"
DINGDING = "DingDing" DINGDING = "DingDing"
DISCORD = "discord" DISCORD = "discord"
@ -37,6 +36,10 @@ class NotificationType(str, Enum):
TELEGRAM = "telegram" TELEGRAM = "telegram"
WEBHOOK = "webhook" WEBHOOK = "webhook"
WECOM = "WeCom" WECOM = "WeCom"
ALERTNOW = "AlertNow"
BARK = "Bark"
HOMEASSISTANT = "HomeAssistant"
LINENOTIFY = "LineNotify"
notification_provider_options = { notification_provider_options = {
@ -58,9 +61,6 @@ notification_provider_options = {
"appriseURL", "appriseURL",
"title", "title",
], ],
NotificationType.BARK: [
"barkEndpoint",
],
NotificationType.CLICKSENDSMS: [ NotificationType.CLICKSENDSMS: [
"clicksendsmsLogin", "clicksendsmsLogin",
"clicksendsmsPassword", "clicksendsmsPassword",
@ -235,4 +235,34 @@ notification_provider_options = {
NotificationType.WECOM: [ NotificationType.WECOM: [
"weComBotKey", "weComBotKey",
], ],
NotificationType.ALERTNOW: [
"alertNowWebhookURL",
],
NotificationType.BARK: [
"barkEndpoint",
"barkGroup",
"barkSound",
],
NotificationType.HOMEASSISTANT: [
"homeAssistantUrl",
"longLivedAccessToken",
],
NotificationType.LINENOTIFY: [
"lineNotifyAccessToken",
],
}
notification_provider_conditions = {
"gotifyPriority": {
"min": 0,
"max": 10,
},
"ntfyPriority": {
"min": 1,
"max": 5,
},
"smtpPort": {
"min": 0,
"max": 65535,
},
} }