check argument conditions

This commit is contained in:
lucasheld 2022-07-10 18:07:11 +02:00
parent 69f1942bc8
commit a08d3b4613
4 changed files with 140 additions and 48 deletions

View file

@ -1,9 +1,11 @@
import glob import glob
import re import re
from pprint import pprint
import jinja2 import jinja2
from bs4 import BeautifulSoup
notification_providers = [] from uptime_kuma_api import convert_from_socket, params_map_notification_provider
def deduplicate_list(l): def deduplicate_list(l):
@ -14,6 +16,8 @@ def deduplicate_list(l):
return out return out
def build_notification_providers():
providers = []
for path in glob.glob('uptime-kuma/server/notification-providers/*'): for path in glob.glob('uptime-kuma/server/notification-providers/*'):
with open(path) as f: with open(path) as f:
content = f.read() content = f.read()
@ -26,12 +30,39 @@ for path in glob.glob('uptime-kuma/server/notification-providers/*'):
inputs = deduplicate_list(inputs) inputs = deduplicate_list(inputs)
inputs = [i.strip() for i in inputs] inputs = [i.strip() for i in inputs]
notification_providers.append({ providers.append({
"name": name, "name": name,
"inputs": inputs, "inputs": inputs,
}) })
return providers
print(notification_providers)
def build_notification_provider_conditions():
conditions = {}
for path in glob.glob('uptime-kuma/src/components/notifications/*'):
if path.endswith("index.js"):
continue
with open(path) as f:
content = f.read()
match = re.search(r'<template>[\s\S]+</template>', content, re.MULTILINE)
html = match.group(0)
soup = BeautifulSoup(html, "html.parser")
inputs = soup.find_all("input")
for input in inputs:
condition = {}
attrs = input.attrs
v_model = attrs.get("v-model")
min_ = attrs.get("min")
max_ = attrs.get("max")
if min_:
condition["min"] = int(min_)
if max_:
condition["max"] = int(max_)
param_name = re.match(r'\$parent.notification.(.*)$', v_model).group(1)
if condition:
conditions[param_name] = condition
conditions = convert_from_socket(params_map_notification_provider, conditions)
return conditions
def write_to_file(template, destination, **kwargs): def write_to_file(template, destination, **kwargs):
@ -41,5 +72,7 @@ def write_to_file(template, destination, **kwargs):
with open(destination, "w") as f: with open(destination, "w") as f:
f.write(rendered) f.write(rendered)
conditions = build_notification_provider_conditions()
write_to_file("notification_providers.py.j2", "./../uptimekumaapi/notification_providers.py", notification_providers=notification_providers) pprint(conditions)
# notification_providers = build_notification_providers()
# write_to_file("notification_providers.py.j2", "./../uptimekumaapi/notification_providers.py", notification_providers=notification_providers)

View file

@ -1,5 +1,5 @@
__title__ = "uptime_kuma_api" __title__ = "uptime_kuma_api"
__version__ = "0.0.5" __version__ = "0.0.6"
__author__ = "Lucas Held" __author__ = "Lucas Held"
# __license__ = "" # __license__ = ""
__copyright__ = "Copyright 2022 Lucas Held" __copyright__ = "Copyright 2022 Lucas Held"

View file

@ -13,7 +13,6 @@ from . import convert_from_socket, convert_to_socket, params_map_monitor, params
params_map_settings params_map_settings
from . import UptimeKumaException from . import UptimeKumaException
params_map_notification_and_provider = {**params_map_notification, **params_map_notification_provider} params_map_notification_and_provider = {**params_map_notification, **params_map_notification_provider}
@ -168,7 +167,7 @@ def _build_monitor_data(
return data return data
def _build_notification_data(name: str, type_: NotificationType, default: bool, **kwargs): def _build_notification_data(name: str, type_: NotificationType, default: bool = False, apply_existing: bool = False, **kwargs):
data = { data = {
"name": name, "name": name,
"type_": type_, "type_": type_,
@ -251,7 +250,7 @@ def _build_status_page_data(
return slug, config, img_data_url, public_group_list return slug, config, img_data_url, public_group_list
def _raise_missing_arguments(required_params, kwargs, params_map): def _check_missing_arguments(required_params, kwargs, params_map):
missing_arguments = [] missing_arguments = []
for required_param in required_params: for required_param in required_params:
required_param_sock = convert_to_socket(params_map, required_param) required_param_sock = convert_to_socket(params_map, required_param)
@ -262,17 +261,31 @@ def _raise_missing_arguments(required_params, kwargs, params_map):
raise TypeError(f"missing {len(missing_arguments)} required argument: {missing_arguments_str}") raise TypeError(f"missing {len(missing_arguments)} required argument: {missing_arguments_str}")
def _check_required_arguments_monitor(kwargs): def _check_argument_conditions(valid_params, kwargs, params_map):
required_params = [ for valid_param in valid_params:
valid_param_sock = convert_to_socket(params_map, valid_param)
if valid_param_sock in kwargs:
value = kwargs[valid_param_sock]
conditions = valid_params[valid_param]
min_ = conditions.get("min")
max_ = conditions.get("max")
if min_ is not None and value < min_:
raise ValueError(f"the value of {valid_param} must not be less than {min_}")
if max_ is not None and value > max_:
raise ValueError(f"the value of {valid_param} must not be larger than {max_}")
def _check_arguments_monitor(kwargs):
required_args = [
"type_", "type_",
"name", "name",
"heartbeat_interval", "heartbeat_interval",
"retries", "retries",
"heartbeat_retry_interval" "heartbeat_retry_interval"
] ]
_raise_missing_arguments(required_params, kwargs, params_map_monitor) _check_missing_arguments(required_args, kwargs, params_map_monitor)
required_params_type_specific = { required_args_by_type = {
MonitorType.HTTP: ["url", "max_redirects"], MonitorType.HTTP: ["url", "max_redirects"],
MonitorType.PORT: ["hostname", "port"], MonitorType.PORT: ["hostname", "port"],
MonitorType.PING: ["hostname"], MonitorType.PING: ["hostname"],
@ -283,25 +296,70 @@ def _check_required_arguments_monitor(kwargs):
MonitorType.MQTT: ["hostname", "port", "mqtt_topic"], MonitorType.MQTT: ["hostname", "port", "mqtt_topic"],
MonitorType.SQLSERVER: [], MonitorType.SQLSERVER: [],
} }
type_key = convert_to_socket(params_map_monitor, "type") type_ = kwargs[convert_to_socket(params_map_monitor, "type")]
required_params = required_params_type_specific[kwargs[type_key]] required_args = required_args_by_type[type_]
_raise_missing_arguments(required_params, kwargs, params_map_monitor) _check_missing_arguments(required_args, kwargs, params_map_monitor)
conditions = {
"heartbeat_interval": {
"min": 20
},
"retries": {
"min": 0
},
"heartbeat_retry_interval": {
"min": 20
},
"max_redirects": {
"min": 0
},
"port": {
"min": 0,
"max": 65535
}
}
_check_argument_conditions(conditions, kwargs, params_map_monitor)
def _check_required_arguments_notification(kwargs): def _check_arguments_notification(kwargs):
required_params = ["type_", "name"] required_args = ["type_", "name"]
type_key = convert_to_socket(params_map_notification, "type") _check_missing_arguments(required_args, kwargs, params_map_notification_and_provider)
additional_params_sock = notification_provider_options[kwargs[type_key]]
additional_params = convert_from_socket(params_map_notification_provider, additional_params_sock) type_ = kwargs[convert_to_socket(params_map_notification, "type")]
required_params.extend(additional_params) required_args_sock = notification_provider_options[type_]
_raise_missing_arguments(required_params, kwargs, params_map_notification_and_provider) required_args = convert_from_socket(params_map_notification_provider, required_args_sock)
_check_missing_arguments(required_args, kwargs, params_map_notification_and_provider)
provider_conditions = {
'gotify_priority': {
'max': 10,
'min': 0
},
'ntfy_priority': {
'max': 5,
'min': 1
},
'smtp_smtp_port': {
'max': 65535,
'min': 0
}
}
_check_argument_conditions(provider_conditions, kwargs, params_map_notification_and_provider)
def _check_required_arguments_proxy(kwargs): def _check_arguments_proxy(kwargs):
required_params = ["protocol", "host", "port"] required_args = ["protocol", "host", "port"]
if "auth" in kwargs: if "auth" in kwargs:
required_params.extend(["username", "password"]) required_args.extend(["username", "password"])
_raise_missing_arguments(required_params, kwargs, params_map_proxy) _check_missing_arguments(required_args, kwargs, params_map_proxy)
conditions = {
"port": {
"min": 0,
"max": 65535
}
}
_check_argument_conditions(conditions, kwargs, params_map_proxy)
class UptimeKumaApi(object): class UptimeKumaApi(object):
@ -453,7 +511,7 @@ class UptimeKumaApi(object):
def add_monitor(self, **kwargs): def add_monitor(self, **kwargs):
data = _build_monitor_data(**kwargs) data = _build_monitor_data(**kwargs)
_check_required_arguments_monitor(data) _check_arguments_monitor(data)
r = self._call('add', data) r = self._call('add', data)
r = convert_from_socket(params_map_monitor, r) r = convert_from_socket(params_map_monitor, r)
@ -464,7 +522,7 @@ class UptimeKumaApi(object):
data.update(kwargs) data.update(kwargs)
data = convert_to_socket(params_map_monitor, data) data = convert_to_socket(params_map_monitor, data)
_check_required_arguments_monitor(data) _check_arguments_monitor(data)
r = self._call('editMonitor', data) r = self._call('editMonitor', data)
r = convert_from_socket(params_map_monitor, r) r = convert_from_socket(params_map_monitor, r)
@ -506,13 +564,13 @@ class UptimeKumaApi(object):
def test_notification(self, **kwargs): def test_notification(self, **kwargs):
data = _build_notification_data(**kwargs) data = _build_notification_data(**kwargs)
_check_required_arguments_notification(data) _check_arguments_notification(data)
return self._call('testNotification', data) return self._call('testNotification', data)
def add_notification(self, **kwargs): def add_notification(self, **kwargs):
data = _build_notification_data(**kwargs) data = _build_notification_data(**kwargs)
_check_required_arguments_notification(data) _check_arguments_notification(data)
return self._call('addNotification', (data, None)) return self._call('addNotification', (data, None))
def edit_notification(self, id_: int, **kwargs): def edit_notification(self, id_: int, **kwargs):
@ -530,7 +588,7 @@ class UptimeKumaApi(object):
notification.update(kwargs) notification.update(kwargs)
notification = convert_to_socket(params_map_notification_and_provider, notification) notification = convert_to_socket(params_map_notification_and_provider, notification)
_check_required_arguments_notification(notification) _check_arguments_notification(notification)
return self._call('addNotification', (notification, id_)) return self._call('addNotification', (notification, id_))
def delete_notification(self, id_: int): def delete_notification(self, id_: int):
@ -557,7 +615,7 @@ class UptimeKumaApi(object):
def add_proxy(self, **kwargs): def add_proxy(self, **kwargs):
data = _build_proxy_data(**kwargs) data = _build_proxy_data(**kwargs)
_check_required_arguments_proxy(data) _check_arguments_proxy(data)
return self._call('addProxy', (data, None)) return self._call('addProxy', (data, None))
def edit_proxy(self, id_: int, **kwargs): def edit_proxy(self, id_: int, **kwargs):
@ -565,7 +623,7 @@ class UptimeKumaApi(object):
proxy.update(kwargs) proxy.update(kwargs)
proxy = convert_to_socket(params_map_proxy, proxy) proxy = convert_to_socket(params_map_proxy, proxy)
_check_required_arguments_proxy(proxy) _check_arguments_proxy(proxy)
return self._call('addProxy', (proxy, id_)) return self._call('addProxy', (proxy, id_))
def delete_proxy(self, id_: int): def delete_proxy(self, id_: int):

View file

@ -34,7 +34,8 @@ params_map_monitor = {
params_map_notification = { params_map_notification = {
"type": "type_", "type": "type_",
"isDefault": "default", "isDefault": "default",
"userId": "user_id" "userId": "user_id",
"applyExisting": "apply_existing",
} }
params_map_notification_provider = { params_map_notification_provider = {