prepare pypi release and fix add monitor

This commit is contained in:
lucasheld 2022-07-07 22:17:47 +02:00
parent b87a1bdaf0
commit 140784c4e0
13 changed files with 96 additions and 52 deletions

View file

@ -0,0 +1,11 @@
from .__version__ import __title__, __version__, __author__, __copyright__
from .auth_method import AuthMethod
from .monitor_type import MonitorType
from .notification_providers import NotificationType, notification_provider_options
from .proxy_protocol import ProxyProtocol
from .incident_style import IncidentStyle
from .converter import convert_from_socket, convert_to_socket, params_map_monitor, params_map_notification, \
params_map_notification_provider, params_map_proxy, params_map_status_page, params_map_info, \
params_map_settings
from .exceptions import UptimeKumaException
from .api import UptimeKumaApi

View file

@ -0,0 +1,5 @@
__title__ = "uptime_kuma_api"
__version__ = "0.0.3"
__author__ = "Lucas Held"
# __license__ = ""
__copyright__ = "Copyright 2022 Lucas Held"

733
uptime_kuma_api/api.py Normal file
View file

@ -0,0 +1,733 @@
import json
import time
import socketio
from . import AuthMethod
from . import MonitorType
from . import NotificationType, notification_provider_options
from . import ProxyProtocol
from . import IncidentStyle
from . import convert_from_socket, convert_to_socket, params_map_monitor, params_map_notification, \
params_map_notification_provider, params_map_proxy, params_map_status_page, params_map_info, \
params_map_settings
from . import UptimeKumaException
def int_to_bool(data, keys):
if type(data) == list:
for d in data:
int_to_bool(d, keys)
else:
for key in keys:
if key in data:
data[key] = True if data[key] == 1 else False
def _build_monitor_data(
type_: MonitorType,
name: str,
heartbeat_interval: int = 60,
heartbeat_retry_interval: int = 60,
retries: int = 0,
upside_down_mode: bool = False,
tags: list = None,
notification_ids: list[int] = None,
# HTTP, KEYWORD
url: str = None,
certificate_expiry_notification: bool = False,
ignore_tls_error: bool = False,
max_redirects: int = 10,
accepted_status_codes: list[str] = None,
proxy_id: int = None,
http_method: str = "GET",
http_body: str = None,
http_headers: str = None,
auth_method: AuthMethod = AuthMethod.NONE,
auth_user: str = None,
auth_pass: str = None,
auth_domain: str = None,
auth_workstation: str = None,
# KEYWORD
keyword: str = None,
# DNS, PING, STEAM, MQTT
hostname: str = None,
# DNS, STEAM, MQTT
port: int = 53,
# DNS
dns_resolve_server: str = "1.1.1.1",
dns_resolve_type: str = "A",
# MQTT
mqtt_username: str = None,
mqtt_password: str = None,
mqtt_topic: str = None,
mqtt_success_message: str = None,
# SQLSERVER
sqlserver_connection_string: str = "Server=<hostname>,<port>;Database=<your database>;User Id=<your user id>;Password=<your password>;Encrypt=<true/false>;TrustServerCertificate=<Yes/No>;Connection Timeout=<int>",
sqlserver_query: str = None,
):
if not accepted_status_codes:
accepted_status_codes = ["200-299"]
dict_notification_ids = {}
if notification_ids:
for notification_id in notification_ids:
dict_notification_ids[notification_id] = True
notification_ids = dict_notification_ids
data = {
"type_": type_,
"name": name,
"heartbeat_interval": heartbeat_interval,
"heartbeat_retry_interval": heartbeat_retry_interval,
"retries": retries,
"notification_ids": notification_ids,
"upside_down_mode": upside_down_mode,
}
if tags:
data.update({
"tags": tags
})
if type_ == MonitorType.KEYWORD:
data.update({
"keyword": keyword,
})
# if type_ in [MonitorType.HTTP, MonitorType.KEYWORD]:
data.update({
"url": url,
"certificate_expiry_notification": certificate_expiry_notification,
"ignore_tls_error": ignore_tls_error,
"max_redirects": max_redirects,
"accepted_status_codes": accepted_status_codes,
"proxy_id": proxy_id,
"http_method": http_method,
"http_body": http_body,
"http_headers": http_headers,
"auth_method": auth_method,
})
if auth_method in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]:
data.update({
"auth_user": auth_user,
"auth_pass": auth_pass,
})
if auth_method == AuthMethod.NTLM:
data.update({
"auth_domain": auth_domain,
"auth_workstation": auth_workstation,
})
# if type_ in [MonitorType.DNS, MonitorType.PING, MonitorType.STEAM, MonitorType.MQTT]:
data.update({
"hostname": hostname,
})
if type_ in [MonitorType.DNS, MonitorType.STEAM, MonitorType.MQTT]:
data.update({
"port": port,
})
# if type_ == MonitorType.DNS:
data.update({
"dns_resolve_server": dns_resolve_server,
"dns_resolve_type": dns_resolve_type,
})
# if type_ == MonitorType.MQTT:
data.update({
"mqtt_username": mqtt_username,
"mqtt_password": mqtt_password,
"mqtt_topic": mqtt_topic,
"mqtt_success_message": mqtt_success_message,
})
if type_ == MonitorType.SQLSERVER:
data.update({
"sqlserver_connection_string": sqlserver_connection_string,
"sqlserver_query": sqlserver_query,
})
data = convert_to_socket(params_map_monitor, data)
return data
def _build_notification_data(name: str, type_: NotificationType, default: bool, **kwargs):
data = {
"name": name,
"type_": type_,
"default": default,
**kwargs
}
data = convert_to_socket(params_map_notification, data)
data = convert_to_socket(params_map_notification_provider, data)
return data
def _build_proxy_data(
protocol: ProxyProtocol,
host: str,
port: str,
auth: bool = False,
username: str = None,
password: str = None,
active: bool = True,
default: bool = False,
apply_existing: bool = False,
):
data = {
"protocol": protocol,
"host": host,
"port": port,
"auth": auth,
"username": username,
"password": password,
"active": active,
"default": default,
"apply_existing": apply_existing
}
data = convert_to_socket(params_map_proxy, data)
return data
def _build_status_page_data(
slug: str,
# config
id_: int,
title: str,
description: str = None,
theme: str = "light",
published: bool = True,
show_tags: bool = False,
domain_name_list: list[str] = None,
custom_css: str = "",
footer_text: str = None,
show_powered_by: bool = True,
img_data_url: str = "/icon.svg",
monitors: list = None
):
if theme not in ["light", "dark"]:
raise ValueError
if not domain_name_list:
domain_name_list = []
public_group_list = []
if monitors:
public_group_list.append({
"name": "Services",
"monitorList": monitors
})
config = {
"id_": id_,
"slug": slug,
"title": title,
"description": description,
"img_data_url": img_data_url,
"theme": theme,
"published": published,
"show_tags": show_tags,
"domain_name_list": domain_name_list,
"custom_css": custom_css,
"footer_text": footer_text,
"show_powered_by": show_powered_by
}
config = convert_to_socket(params_map_status_page, config)
return slug, config, img_data_url, public_group_list
class UptimeKumaApi(object):
def __init__(self, url):
self.sio = socketio.Client()
self._event_data: dict[str, any] = {
"monitorList": None,
"notificationList": None,
"proxyList": None,
"statusPageList": None,
"heartbeatList": None,
"importantHeartbeatList": None,
"avgPing": None,
"uptime": None,
"heartbeat": None,
"info": None,
}
self.sio.on("connect", self._event_connect)
self.sio.on("disconnect", self._event_disconnect)
self.sio.on("monitorList", self._event_monitor_list)
self.sio.on("notificationList", self._event_notification_list)
self.sio.on("proxyList", self._event_proxy_list)
self.sio.on("statusPageList", self._event_status_page_list)
self.sio.on("heartbeatList", self._event_heartbeat_list)
self.sio.on("importantHeartbeatList", self._event_important_heartbeat_list)
self.sio.on("avgPing", self._event_avg_ping)
self.sio.on("uptime", self._event_uptime)
self.sio.on("heartbeat", self._event_heartbeat)
self.sio.on("info", self._event_info)
self.connect(url)
def _get_event_data(self, event):
while self._event_data[event] is None:
time.sleep(0.01)
time.sleep(0.01) # wait for multiple messages
return self._event_data[event]
def _call(self, event, data=None):
r = self.sio.call(event, data)
if type(r) == dict and "ok" in r:
if not r["ok"]:
raise UptimeKumaException(r["msg"])
r.pop("ok")
return r
# event handlers
def _event_connect(self):
pass
def _event_disconnect(self):
pass
def _event_monitor_list(self, data):
self._event_data["monitorList"] = data
def _event_notification_list(self, data):
self._event_data["notificationList"] = data
def _event_proxy_list(self, data):
self._event_data["proxyList"] = data
def _event_status_page_list(self, data):
self._event_data["statusPageList"] = data
def _event_heartbeat_list(self, id_, data, bool_):
if self._event_data["heartbeatList"] is None:
self._event_data["heartbeatList"] = []
self._event_data["heartbeatList"].append({
"id": id_,
"data": data,
"bool": bool_,
})
def _event_important_heartbeat_list(self, id_, data, bool_):
if self._event_data["importantHeartbeatList"] is None:
self._event_data["importantHeartbeatList"] = []
self._event_data["importantHeartbeatList"].append({
"id": id_,
"data": data,
"bool": bool_,
})
def _event_avg_ping(self, id_, data):
if self._event_data["avgPing"] is None:
self._event_data["avgPing"] = []
self._event_data["avgPing"].append({
"id": id_,
"data": data,
})
def _event_uptime(self, id_, hours_24, days_30):
if self._event_data["uptime"] is None:
self._event_data["uptime"] = []
self._event_data["uptime"].append({
"id": id_,
"hours_24": hours_24,
"days_30": days_30,
})
def _event_heartbeat(self, data):
if self._event_data["heartbeat"] is None:
self._event_data["heartbeat"] = []
self._event_data["heartbeat"].append(data)
def _event_info(self, data):
self._event_data["info"] = data
# connection
def connect(self, url: str):
url = url.rstrip("/")
self.sio.connect(f'{url}/socket.io/')
def disconnect(self):
self.sio.disconnect()
# monitors
def get_monitors(self):
r = list(self._get_event_data("monitorList").values())
r = convert_from_socket(params_map_monitor, r)
int_to_bool(r, ["active"])
return r
def get_monitor(self, id_: int):
r = self._call('getMonitor', id_)["monitor"]
r = convert_from_socket(params_map_monitor, r)
int_to_bool(r, ["active"])
return r
def pause_monitor(self, id_: int):
r = self._call('pauseMonitor', id_)
return r
def resume_monitor(self, id_: int):
r = self._call('resumeMonitor', id_)
return r
def delete_monitor(self, id_: int):
return self._call('deleteMonitor', id_)
def get_monitor_beats(self, id_: int, hours: int):
r = self._call('getMonitorBeats', (id_, hours))["data"]
int_to_bool(r, ["important", "status"])
return r
def add_monitor(self, *args, **kwargs):
data = _build_monitor_data(*args, **kwargs)
r = self._call('add', data)
r = convert_from_socket(params_map_monitor, r)
return r
def edit_monitor(self, id_: int, **kwargs):
data = self.get_monitor(id_)
kwargs_sock = convert_to_socket(params_map_monitor, kwargs)
data.update(kwargs_sock)
r = self._call('editMonitor', data)
r = convert_from_socket(params_map_monitor, r)
return r
# monitor tags
def add_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
return self._call('addMonitorTag', (tag_id, monitor_id, value))
# editMonitorTag is unused in uptime-kuma
# def edit_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
# return self._call('editMonitorTag', (tag_id, monitor_id, value))
def delete_monitor_tag(self, tag_id: int, monitor_id: int, value=""):
return self._call('deleteMonitorTag', (tag_id, monitor_id, value))
# notifications
def get_notifications(self):
notifications = self._get_event_data("notificationList")
r = []
for notification_raw in notifications:
notification = notification_raw.copy()
config = json.loads(notification["config"])
del notification["config"]
notification.update(config)
r.append(notification)
r = convert_from_socket(params_map_notification, r)
r = convert_from_socket(params_map_notification_provider, r)
return r
def get_notification(self, id_: int):
notifications = self.get_notifications()
for notification in notifications:
if notification["id"] == id_:
return notification
raise UptimeKumaException("notification does not exist")
def test_notification(self, *args, **kwargs):
data = _build_notification_data(*args, **kwargs)
return self._call('testNotification', data)
def add_notification(self, *args, **kwargs):
data = _build_notification_data(*args, **kwargs)
return self._call('addNotification', (data, None))
def edit_notification(self, id_: int, **kwargs):
notification = self.get_notification(id_)
# remove old notification provider options from notification object
if "type_" in kwargs and kwargs != notification["type_"]:
for provider in notification_provider_options:
provider_options = notification_provider_options[provider]
if provider != kwargs:
for option in provider_options:
if option in notification:
del notification[option]
notification.update(kwargs)
notification = convert_to_socket(params_map_notification, notification)
notification = convert_to_socket(params_map_notification_provider, notification)
return self._call('addNotification', (notification, id_))
def delete_notification(self, id_: int):
return self._call('deleteNotification', id_)
def check_apprise(self):
return self._call('checkApprise')
# proxy
def get_proxies(self):
r = self._get_event_data("proxyList")
r = convert_from_socket(params_map_proxy, r)
int_to_bool(r, ["auth", "active", "default", "apply_existing"])
return r
def get_proxy(self, id_: int):
proxies = self.get_proxies()
for proxy in proxies:
if proxy["id"] == id_:
return proxy
raise UptimeKumaException("proxy does not exist")
def add_proxy(self, *args, **kwargs):
data = _build_proxy_data(*args, **kwargs)
return self._call('addProxy', (data, None))
def edit_proxy(self, id_: int, **kwargs):
proxy = self.get_proxy(id_)
proxy.update(kwargs)
return self._call('addProxy', (proxy, id_))
def delete_proxy(self, id_: int):
return self._call('deleteProxy', id_)
# status page
def get_status_pages(self):
r = list(self._get_event_data("statusPageList").values())
r = convert_from_socket(params_map_status_page, r)
return r
def get_status_page(self, slug: str):
r = self._call('getStatusPage', slug)
config = r["config"]
del r["config"]
r.update(config)
r = convert_from_socket(params_map_status_page, r)
return r
def add_status_page(self, slug: str, title: str):
return self._call('addStatusPage', (title, slug))
def delete_status_page(self, slug: str):
return self._call('deleteStatusPage', slug)
def save_status_page(self, slug: str, **kwargs):
status_page = self.get_status_page(slug)
status_page.update(kwargs)
data = _build_status_page_data(**status_page)
return self._call('saveStatusPage', data)
def post_incident(
self,
slug: str,
title: str,
content: str,
style: IncidentStyle = IncidentStyle.PRIMARY
):
incident = {
"title": title,
"content": content,
"style": style
}
r = self._call('postIncident', (slug, incident))["incident"]
r = convert_from_socket(params_map_status_page, r)
self.save_status_page(slug)
return r
def unpin_incident(self, slug: str):
r = self._call('unpinIncident', slug)
self.save_status_page(slug)
return r
# heartbeat
def get_heartbeats(self):
r = self._get_event_data("heartbeatList")
for i in r:
int_to_bool(i["data"], ["important", "status"])
return r
def get_important_heartbeats(self):
r = self._get_event_data("importantHeartbeatList")
for i in r:
int_to_bool(i["data"], ["important", "status"])
return r
def get_heartbeat(self):
r = self._get_event_data("heartbeat")
int_to_bool(r, ["important", "status"])
return r
# avg ping
def avg_ping(self):
return self._get_event_data("avgPing")
# uptime
def uptime(self):
return self._get_event_data("uptime")
# info
def info(self):
r = self._get_event_data("info")
r = convert_from_socket(params_map_info, r)
return r
# clear
def clear_events(self, monitor_id: int):
return self._call('clearEvents', monitor_id)
def clear_heartbeats(self, monitor_id: int):
return self._call('clearHeartbeats', monitor_id)
def clear_statistics(self):
return self._call('clearStatistics')
# tags
def get_tags(self):
return self._call('getTags')["tags"]
def get_tag(self, id_: int):
tags = self.get_tags()
for tag in tags:
if tag["id"] == id_:
return tag
raise UptimeKumaException("tag does not exist")
# not working, monitor id required?
# def edit_tag(self, id_: int, name: str, color: str):
# return self._call('editTag', {
# "id": id_,
# "name": name,
# "color": color
# })
def delete_tag(self, id_: int):
return self._call('deleteTag', id_)
def add_tag(self, name: str, color: str):
return self._call('addTag', {
"name": name,
"color": color,
"new": True
})["tag"]
# settings
def get_settings(self):
r = self._call('getSettings')["data"]
r = convert_from_socket(params_map_settings, r)
return r
def set_settings(
self,
password: str,
# about
check_update: bool = True,
check_beta: bool = False,
# monitor history
keep_data_period_days: int = 180,
# general
entry_page: str = "dashboard",
search_engine_index: bool = False,
primary_base_url: str = "",
steam_api_key: str = "",
# notifications
tls_expiry_notify_days: list[int] = None,
# security
disable_auth: bool = False
):
if not tls_expiry_notify_days:
tls_expiry_notify_days = [7, 14, 21]
data = {
"check_update": check_update,
"check_beta": check_beta,
"keep_data_period_days": keep_data_period_days,
"entry_page": entry_page,
"search_engine_index": search_engine_index,
"primary_base_url": primary_base_url,
"steam_api_key": steam_api_key,
"tls_expiry_notify_days": tls_expiry_notify_days,
"disable_auth": disable_auth
}
data = convert_to_socket(params_map_settings, data)
return self._call('setSettings', (data, password))
def change_password(self, old_password: str, new_password: str):
return self._call('changePassword', {
"currentPassword": old_password,
"newPassword": new_password,
})
def upload_backup(self, json_data, import_handle: str):
if import_handle not in ["overwrite", "skip", "keep"]:
raise ValueError()
return self._call('uploadBackup', (json_data, import_handle))
# 2FA
def twofa_status(self):
return self._call('twoFAStatus')
def prepare_2fa(self, password: str):
return self._call('prepare2FA', password)
def save_2fa(self, password: str):
return self._call('save2FA', password)
def disable_2fa(self, password: str):
return self._call('disable2FA', password)
# login
def login(self, username: str, password: str):
return self._call('login', {
"username": username,
"password": password,
"token": ""
})
def login_by_token(self, token: str):
return self._call('loginByToken', token)
def verify_token(self, token: str, password: str):
return self._call('verifyToken', (token, password))
def logout(self):
return self._call('logout')
# setup
def need_setup(self):
return self._call('needSetup')
def setup(self, username: str, password: str):
return self._call("setup", (username, password))
# database
def get_database_size(self):
return self._call('getDatabaseSize')
def shrink_database(self):
return self._call('shrinkDatabase')

View file

@ -0,0 +1,7 @@
from enum import Enum
class AuthMethod(str, Enum):
NONE = ""
HTTP_BASIC = "basic"
NTLM = "ntlm"

View file

@ -0,0 +1,235 @@
# socket -> python
params_map_monitor = {
"type": "type_",
"interval": "heartbeat_interval",
"retryInterval": "heartbeat_retry_interval",
"maxretries": "retries",
"notificationIDList": "notification_ids",
"upsideDown": "upside_down_mode",
"expiryNotification": "certificate_expiry_notification",
"ignoreTls": "ignore_tls_error",
"maxredirects": "max_redirects",
"accepted_statuscodes": "accepted_status_codes",
"proxyId": "proxy_id",
"method": "http_method",
"body": "http_body",
"headers": "http_headers",
"authMethod": "auth_method",
"basicauth-user": "auth_user",
"basicauth-pass": "auth_pass",
"basicauth-domain": "auth_domain",
"basicauth-workstation": "auth_workstation",
"mqttUsername": "mqtt_username",
"mqttPassword": "mqtt_password",
"mqttTopic": "mqtt_topic",
"mqttSuccessMessage": "mqtt_success_message",
"databaseConnectionString": "sqlserver_connection_string",
"sqlserverQuery": "sqlserver_query",
"authDomain": "auth_domain",
"authWorkstation": "auth_workstation",
"databaseQuery": "database_query",
"monitorID": "monitor_id"
}
params_map_notification = {
"type": "type_",
"isDefault": "default",
"userId": "user_id"
}
params_map_notification_provider = {
'alertaApiEndpoint': 'alerta_api_endpoint',
'alertaApiKey': 'alerta_api_key',
'alertaEnvironment': 'alerta_environment',
'alertaAlertState': 'alerta_alert_state',
'alertaRecoverState': 'alerta_recover_state',
'phonenumber': 'aliyunsms_phonenumber',
'templateCode': 'aliyunsms_template_code',
'signName': 'aliyunsms_sign_name',
'accessKeyId': 'aliyunsms_access_key_id',
'secretAccessKey': 'aliyunsms_secret_access_key',
'appriseURL': 'apprise_apprise_url',
'title': 'apprise_title',
'barkEndpoint': 'bark_endpoint',
'clicksendsmsLogin': 'clicksendsms_login',
'clicksendsmsPassword': 'clicksendsms_password',
'clicksendsmsToNumber': 'clicksendsms_to_number',
'clicksendsmsSenderName': 'clicksendsms_sender_name',
'webHookUrl': 'dingding_web_hook_url',
'secretKey': 'dingding_secret_key',
'discordUsername': 'discord_username',
'discordWebhookUrl': 'discord_webhook_url',
'discordPrefixMessage': 'discord_prefix_message',
'feishuWebHookUrl': 'feishu_web_hook_url',
'googleChatWebhookURL': 'googlechat_webhook_url',
'gorushDeviceToken': 'gorush_device_token',
'gorushPlatform': 'gorush_platform',
'gorushTitle': 'gorush_title',
'gorushPriority': 'gorush_priority',
'gorushRetry': 'gorush_retry',
'gorushTopic': 'gorush_topic',
'gorushServerURL': 'gorush_server_url',
'gotifyserverurl': 'gotify_serverurl',
'gotifyapplicationToken': 'gotify_application_token',
'gotifyPriority': 'gotify_priority',
'lineChannelAccessToken': 'line_channel_access_token',
'lineUserID': 'line_user_id',
'lunaseaDevice': 'lunasea_device',
'internalRoomId': 'matrix_internal_room_id',
'accessToken': 'onebot_access_token',
'homeserverUrl': 'matrix_homeserver_url',
'mattermostusername': 'mattermost_username',
'mattermostWebhookUrl': 'mattermost_webhook_url',
'mattermostchannel': 'mattermost_channel',
'mattermosticonemo': 'mattermost_iconemo',
'mattermosticonurl': 'mattermost_iconurl',
'ntfyserverurl': 'ntfy_serverurl',
'ntfytopic': 'ntfy_topic',
'ntfyPriority': 'ntfy_priority',
'octopushVersion': 'octopush_version',
'octopushAPIKey': 'octopush_apikey',
'octopushLogin': 'octopush_login',
'octopushPhoneNumber': 'octopush_phone_number',
'octopushSMSType': 'octopush_smstype',
'octopushSenderName': 'octopush_sender_name',
'octopushDMLogin': 'octopush_dmlogin',
'octopushDMAPIKey': 'octopush_dmapikey',
'octopushDMPhoneNumber': 'octopush_dmphone_number',
'octopushDMSenderName': 'octopush_dmsender_name',
'octopushDMSMSType': 'octopush_dmsmstype',
'httpAddr': 'onebot_http_addr',
'msgType': 'onebot_msg_type',
'recieverId': 'onebot_reciever_id',
'pagerdutyAutoResolve': 'pagerduty_auto_resolve',
'pagerdutyIntegrationUrl': 'pagerduty_integration_url',
'pagerdutyPriority': 'pagerduty_priority',
'pagerdutyIntegrationKey': 'pagerduty_integration_key',
'promosmsLogin': 'promosms_login',
'promosmsPassword': 'promosms_password',
'promosmsPhoneNumber': 'promosms_phone_number',
'promosmsSMSType': 'promosms_smstype',
'promosmsSenderName': 'promosms_sender_name',
'pushbulletAccessToken': 'pushbullet_access_token',
'pushdeerKey': 'pushdeer_key',
'pushoveruserkey': 'pushover_userkey',
'pushoverapptoken': 'pushover_apptoken',
'pushoversounds': 'pushover_sounds',
'pushoverpriority': 'pushover_priority',
'pushovertitle': 'pushover_title',
'pushoverdevice': 'pushover_device',
'pushyAPIKey': 'pushy_apikey',
'pushyToken': 'pushy_token',
'rocketchannel': 'rocketchat_channel',
'rocketusername': 'rocketchat_username',
'rocketiconemo': 'rocketchat_iconemo',
'rocketwebhookURL': 'rocketchat_webhook_url',
'rocketbutton': 'rocketchat_button',
'serwersmsUsername': 'serwersms_username',
'serwersmsPassword': 'serwersms_password',
'serwersmsPhoneNumber': 'serwersms_phone_number',
'serwersmsSenderName': 'serwersms_sender_name',
'signalNumber': 'signal_number',
'signalRecipients': 'signal_recipients',
'signalURL': 'signal_url',
'slackbutton': 'slack_button',
'slackchannel': 'slack_channel',
'slackusername': 'slack_username',
'slackiconemo': 'slack_iconemo',
'slackwebhookURL': 'slack_webhook_url',
'smtpHost': 'smtp_smtp_host',
'smtpPort': 'smtp_smtp_port',
'smtpSecure': 'smtp_smtp_secure',
'smtpIgnoreTLSError': 'smtp_smtp_ignore_tlserror',
'smtpDkimDomain': 'smtp_smtp_dkim_domain',
'smtpDkimKeySelector': 'smtp_smtp_dkim_key_selector',
'smtpDkimPrivateKey': 'smtp_smtp_dkim_private_key',
'smtpDkimHashAlgo': 'smtp_smtp_dkim_hash_algo',
'smtpDkimheaderFieldNames': 'smtp_smtp_dkimheader_field_names',
'smtpDkimskipFields': 'smtp_smtp_dkimskip_fields',
'smtpUsername': 'smtp_smtp_username',
'smtpPassword': 'smtp_smtp_password',
'customSubject': 'smtp_custom_subject',
'smtpFrom': 'smtp_smtp_from',
'smtpCC': 'smtp_smtp_cc',
'smtpBCC': 'smtp_smtp_bcc',
'smtpTo': 'smtp_smtp_to',
'stackfieldwebhookURL': 'stackfield_webhook_url',
'webhookUrl': 'teams_webhook_url',
'pushAPIKey': 'pushbytechulus_apikey',
'telegramBotToken': 'telegram_bot_token',
'telegramChatID': 'telegram_chat_id',
'webhookContentType': 'webhook_content_type',
'webhookURL': 'webhook_url',
'weComBotKey': 'wecom_bot_key'
}
params_map_proxy = {
"applyExisting": "apply_existing",
"createdDate": "created_date",
"userId": "user_id"
}
params_map_status_page = {
"id": "id_",
"slug": "slug",
"title": "title",
"description": "description",
"icon": "img_data_url",
"published": "published",
"showTags": "show_tags",
"domainNameList": "domain_name_list",
"customCSS": "custom_css",
"footerText": "footer_text",
"showPoweredBy": "show_powered_by",
"createdDate": "created_date"
}
params_map_info = {
"latestVersion": "latest_version",
"primaryBaseURL": "primary_base_url"
}
params_map_settings = {
# about
"checkUpdate": "check_update",
"checkBeta": "check_beta",
# monitor history
"keepDataPeriodDays": "keep_data_period_days",
# general
"entryPage": "entry_page",
"searchEngineIndex": "search_engine_index",
"primaryBaseURL": "primary_base_url",
"steamAPIKey": "steam_api_key",
# notifications
"tlsExpiryNotifyDays": "tls_expiry_notify_days",
# security
"disableAuth": "disable_auth"
}
def _convert_to_from_socket(params_map: dict[str, str], params, to_socket=False):
if type(params) == list:
out = []
params_list = params
for params in params_list:
params_py = _convert_to_from_socket(params_map, params, to_socket)
out.append(params_py)
else:
if to_socket:
params_map = {v: k for k, v in params_map.items()}
if type(params) == dict:
out = {}
for key, value in params.items():
key = params_map.get(key, key)
out[key] = value
else:
return params_map.get(params, params)
return out
def convert_from_socket(params_map, params):
return _convert_to_from_socket(params_map, params)
def convert_to_socket(params_map, params):
return _convert_to_from_socket(params_map, params, to_socket=True)

View file

@ -0,0 +1,2 @@
class UptimeKumaException(Exception):
pass

View file

@ -0,0 +1,10 @@
from enum import Enum
class IncidentStyle(str, Enum):
INFO = "info"
WARNING = "warning"
DANGER = "danger"
PRIMARY = "primary"
LIGHT = "light"
DARK = "dark"

View file

@ -0,0 +1,13 @@
from enum import Enum
class MonitorType(str, Enum):
HTTP = "http"
PORT = "port"
PING = "ping"
KEYWORD = "keyword"
DNS = "dns"
PUSH = "push"
STEAM = "steam"
MQTT = "mqtt"
SQLSERVER = "sqlserver"

View file

@ -0,0 +1,238 @@
from enum import Enum
class NotificationType(str, Enum):
ALERTA = "alerta"
ALIYUNSMS = "AliyunSMS"
APPRISE = "apprise"
BARK = "Bark"
CLICKSENDSMS = "clicksendsms"
DINGDING = "DingDing"
DISCORD = "discord"
FEISHU = "Feishu"
GOOGLECHAT = "GoogleChat"
GORUSH = "gorush"
GOTIFY = "gotify"
LINE = "line"
LUNASEA = "lunasea"
MATRIX = "matrix"
MATTERMOST = "mattermost"
NTFY = "ntfy"
OCTOPUSH = "octopush"
ONEBOT = "OneBot"
PAGERDUTY = "PagerDuty"
PROMOSMS = "promosms"
PUSHBULLET = "pushbullet"
PUSHDEER = "PushDeer"
PUSHOVER = "pushover"
PUSHY = "pushy"
ROCKET_CHAT = "rocket.chat"
SERWERSMS = "serwersms"
SIGNAL = "signal"
SLACK = "slack"
SMTP = "smtp"
STACKFIELD = "stackfield"
TEAMS = "teams"
PUSHBYTECHULUS = "PushByTechulus"
TELEGRAM = "telegram"
WEBHOOK = "webhook"
WECOM = "WeCom"
notification_provider_options = {
NotificationType.ALERTA: [
"alertaApiEndpoint",
"alertaApiKey",
"alertaEnvironment",
"alertaAlertState",
"alertaRecoverState",
],
NotificationType.ALIYUNSMS: [
"phonenumber",
"templateCode",
"signName",
"accessKeyId",
"secretAccessKey",
],
NotificationType.APPRISE: [
"appriseURL",
"title",
],
NotificationType.BARK: [
"barkEndpoint",
],
NotificationType.CLICKSENDSMS: [
"clicksendsmsLogin",
"clicksendsmsPassword",
"clicksendsmsToNumber",
"clicksendsmsSenderName",
],
NotificationType.DINGDING: [
"webHookUrl",
"secretKey",
],
NotificationType.DISCORD: [
"discordUsername",
"discordWebhookUrl",
"discordPrefixMessage",
],
NotificationType.FEISHU: [
"feishuWebHookUrl",
],
NotificationType.GOOGLECHAT: [
"googleChatWebhookURL",
],
NotificationType.GORUSH: [
"gorushDeviceToken",
"gorushPlatform",
"gorushTitle",
"gorushPriority",
"gorushRetry",
"gorushTopic",
"gorushServerURL",
],
NotificationType.GOTIFY: [
"gotifyserverurl",
"gotifyapplicationToken",
"gotifyPriority",
],
NotificationType.LINE: [
"lineChannelAccessToken",
"lineUserID",
],
NotificationType.LUNASEA: [
"lunaseaDevice",
],
NotificationType.MATRIX: [
"internalRoomId",
"accessToken",
"homeserverUrl",
],
NotificationType.MATTERMOST: [
"mattermostusername",
"mattermostWebhookUrl",
"mattermostchannel",
"mattermosticonemo",
"mattermosticonurl",
],
NotificationType.NTFY: [
"ntfyserverurl",
"ntfytopic",
"ntfyPriority",
],
NotificationType.OCTOPUSH: [
"octopushVersion",
"octopushAPIKey",
"octopushLogin",
"octopushPhoneNumber",
"octopushSMSType",
"octopushSenderName",
"octopushDMLogin",
"octopushDMAPIKey",
"octopushDMPhoneNumber",
"octopushDMSenderName",
"octopushDMSMSType",
],
NotificationType.ONEBOT: [
"httpAddr",
"accessToken",
"msgType",
"recieverId",
],
NotificationType.PAGERDUTY: [
"pagerdutyAutoResolve",
"pagerdutyIntegrationUrl",
"pagerdutyPriority",
"pagerdutyIntegrationKey",
],
NotificationType.PROMOSMS: [
"promosmsLogin",
"promosmsPassword",
"promosmsPhoneNumber",
"promosmsSMSType",
"promosmsSenderName",
],
NotificationType.PUSHBULLET: [
"pushbulletAccessToken",
],
NotificationType.PUSHDEER: [
"pushdeerKey",
],
NotificationType.PUSHOVER: [
"pushoveruserkey",
"pushoverapptoken",
"pushoversounds",
"pushoverpriority",
"pushovertitle",
"pushoverdevice",
],
NotificationType.PUSHY: [
"pushyAPIKey",
"pushyToken",
],
NotificationType.ROCKET_CHAT: [
"rocketchannel",
"rocketusername",
"rocketiconemo",
"rocketwebhookURL",
"rocketbutton",
],
NotificationType.SERWERSMS: [
"serwersmsUsername",
"serwersmsPassword",
"serwersmsPhoneNumber",
"serwersmsSenderName",
],
NotificationType.SIGNAL: [
"signalNumber",
"signalRecipients",
"signalURL",
],
NotificationType.SLACK: [
"slackbutton",
"slackchannel",
"slackusername",
"slackiconemo",
"slackwebhookURL",
"slackbutton",
],
NotificationType.SMTP: [
"smtpHost",
"smtpPort",
"smtpSecure",
"smtpIgnoreTLSError",
"smtpDkimDomain",
"smtpDkimKeySelector",
"smtpDkimPrivateKey",
"smtpDkimHashAlgo",
"smtpDkimheaderFieldNames",
"smtpDkimskipFields",
"smtpUsername",
"smtpPassword",
"customSubject",
"smtpFrom",
"smtpCC",
"smtpBCC",
"smtpTo",
],
NotificationType.STACKFIELD: [
"stackfieldwebhookURL",
],
NotificationType.TEAMS: [
"webhookUrl",
],
NotificationType.PUSHBYTECHULUS: [
"pushAPIKey",
],
NotificationType.TELEGRAM: [
"telegramBotToken",
"telegramChatID",
],
NotificationType.WEBHOOK: [
"webhookContentType",
"webhookURL",
],
NotificationType.WECOM: [
"weComBotKey",
],
}

View file

@ -0,0 +1,9 @@
from enum import Enum
class ProxyProtocol(str, Enum):
HTTPS = "https"
HTTP = "http"
SOCKS = "socks"
SOCKS5 = "socks5"
SOCKS4 = "socks4"