Compare commits

...

109 commits

Author SHA1 Message Date
lucasheld
22ca1813ff bump version to 1.2.1 2023-09-26 22:32:04 +02:00
lucasheld
370b7e3e18 fix: drop first info event without a version
closes #55
2023-09-26 22:29:33 +02:00
Lucas Held
75a0b57eea
Update README.md 2023-09-23 13:59:27 +02:00
lucasheld
ba047114c9 bump version to 1.2.0 2023-08-29 18:41:23 +02:00
lucasheld
7902213ddb feat: add support for uptime kuma 1.23.0 and 1.23.1 2023-08-29 18:37:27 +02:00
lucasheld
0d49e97fe5 fix: validate accepted status codes types
closes #42
2023-08-12 18:20:59 +02:00
lucasheld
18107848f8 fix: convert sendUrl from bool to int 2023-08-12 16:40:25 +02:00
lucasheld
3543f09a5f fix: rstip url globally 2023-08-12 16:39:44 +02:00
lucasheld
2611b344f1 fix: remove name from maintenance monitors and status pages 2023-08-12 16:37:28 +02:00
lucasheld
be97a4fd8f bump version to 1.1.0 2023-07-07 22:28:25 +02:00
lucasheld
06f1173569 feat: add support for uptime kuma 1.22.0 and 1.22.1 2023-07-07 22:28:20 +02:00
lucasheld
934ab15457 bump version to 1.0.1 2023-05-25 23:53:19 +02:00
lucasheld
ce6f25d604 fix: ValueError if monitor authMethod is None 2023-05-25 23:51:09 +02:00
Lucas Held
144e426ed9
Update CHANGELOG.md 2023-05-25 21:39:12 +02:00
lucasheld
a5a00aec9f bump version to 1.0.0 2023-05-25 21:35:10 +02:00
lucasheld
84d4009d6a feat: replace raw return values with enum values
BREAKING CHANGE:
Types of return values changed to enum values:
  - monitor: `type` (str -> MonitorType), `authMethod` (str -> AuthMethod)
  - notification: `type` (str -> NotificationType)
  - docker host: `dockerType` (str -> DockerType)
  - status page: `style` (str -> IncidentStyle)
  - maintenance: `strategy` (str -> MaintenanceStrategy)
  - proxy: `protocol` (str -> ProxyProtocol)
2023-05-25 21:26:54 +02:00
lucasheld
33b8ffc476 refactor: reformat imports 2023-05-25 18:37:49 +02:00
lucasheld
712cd85aae test: fix notification test 2023-05-25 18:35:56 +02:00
lucasheld
8a0ad53753 fix: check only for required notification arguments 2023-05-20 22:06:05 +02:00
lucasheld
ce1cc12740 fix: adjust get_monitor_status method to previous changes 2023-05-20 20:35:04 +02:00
lucasheld
762dd4a657 fix: process the HEARTBEAT event correctly
BREAKING CHANGE: Removed `get_heartbeat` method. This method was never intended to retrieve information. Use `get_heartbeats` or `get_important_heartbeats` instead.
2023-05-20 20:31:39 +02:00
lucasheld
24b6d367de docs: update notification docstring 2023-05-20 15:42:40 +02:00
lucasheld
27d8e63f7b test: decrease timeout and wait_events 2023-05-20 14:23:01 +02:00
lucasheld
a34b45df8e chore: create dev-requirements.txt 2023-05-20 14:20:38 +02:00
Lucas Held
1359576413
feat: raise exception when deleting an element that does not exist (#37) 2023-05-20 14:09:09 +02:00
Lucas Held
e7693e6081
feat: check for required notification arguments (#36) 2023-05-20 13:10:12 +02:00
lucasheld
f0c5f2ba9d feat: drop support for Uptime Kuma versions < 1.21.3
BREAKING CHANGE: Uptime Kuma versions < 1.21.3 are not supported in uptime-kuma-api 1.x.x
2023-05-20 12:43:57 +02:00
Lucas Held
77630e96b7
fix: memory leak (#29)
* fix: dictionary changed size during iteration during deepcopy

* fix: memory leak in event data

BREAKING CHANGE: changed return values of get_heartbeats, get_important_heartbeats, avg_ping, uptime, get_heartbeat, cert_info

* fix: int to bool conversion

* remove todos

* update examples and adjust types
2023-05-19 14:07:34 +02:00
Lucas Held
6c4342a39d
Merge pull request #32 from lucasheld/feature/monitor-status
fix monitor status values and get current monitor status
2023-05-19 13:51:23 +02:00
Lucas Held
9728cfdb34
feat: implement timeouts for all methods (#34)
BREAKING CHANGE: Removed the `wait_timeout` parameter. Use the new `timeout` parameter instead. The `timeout` parameter specifies how many seconds the client should wait for the connection, an expected event or a server response.
2023-05-19 13:50:39 +02:00
lucasheld
8e841cd324 feat: add support for uptime kuma 1.21.3
BREAKING CHANGE: maintenance parameter `timezone` renamed to `timezoneOption`
2023-05-19 13:49:36 +02:00
lucasheld
d2cfc6652d refactor: use square brackets for tuple type hint 2023-05-06 13:36:26 +02:00
Lucas Held
a576ed9f3a
Update README.md 2023-05-02 20:43:52 +02:00
lucasheld
a9f2b6d894 feat: implement get_monitor_status helper method 2023-05-02 20:36:49 +02:00
lucasheld
b87eed2597 fix: adjust monitor status type to allow all used values
BREAKING CHANGE: monitor `status` type changed from `bool` to `MonitorStatus`
2023-05-02 20:34:26 +02:00
lucasheld
7ef61f8ce1 feat: drop python 3.6 support
BREAKING CHANGE: Python 3.7+ required
2023-05-02 17:57:32 +02:00
lucasheld
50ff8f1219 feat: drop python 3.6 support
BREAKING CHANGE: Python 3.7+ required
2023-05-01 19:06:48 +02:00
Vinalti
19bd8aecfa
Clean up code and implement best practices (#27)
* clean up code and implement best practices: 
 - `type(a) == list` replace with `isinstance(a, list)`
 - `adict['key']` replaced with `adict.get('key')`
 - annotation `-> list` replace by more accurate `-> list[dict]`

* improve compatibility with previous python versions with Typing

* little fix
2023-05-01 18:57:55 +02:00
lucasheld
e42f6461c0 feat: implement context manager for UptimeKumaApi class 2023-05-01 18:41:45 +02:00
lucasheld
391e5a3077 feat: add param wait_events 2023-04-30 16:48:36 +02:00
Lucas Held
31fe4466a2
Merge pull request #26 from Vinalti/add-ssl_verify-param
Add ssl_verify param
2023-04-30 16:03:11 +02:00
Vinalti
341d320549
Add ssl_verify param 2023-04-28 00:07:01 +02:00
lucasheld
e176b6b613 bump version to 0.13.0 2023-04-07 21:14:13 +02:00
Lucas Held
bdd441dd84
Merge pull request #22 from lucasheld/feature/custom-headers
Feature: Implement custom SocketIO Headers
2023-04-07 21:08:13 +02:00
lucasheld
d7f033030e feat: add support for uptime kuma 1.21.2 2023-04-07 21:03:36 +02:00
lucasheld
be537a14d2 fix: do not wait for events that have already arrived 2023-04-07 20:07:52 +02:00
lucasheld
307b25249c test: delete maintenances and api keys in setUp 2023-04-07 20:03:30 +02:00
lucasheld
e1fd3b7f03 feat: implement custom socketio headers 2023-03-30 12:19:31 +02:00
lucasheld
cc3588cf4d bump version to 0.12.0 2023-03-30 11:50:02 +02:00
lucasheld
2773d02ee6 feat: add support for uptime kuma 1.21.1 2023-03-30 11:47:47 +02:00
Lucas Held
b33d2323ca
Update README.md 2023-03-20 15:30:22 +01:00
lucasheld
0492018e6d bump version to 0.11.0 2023-03-20 15:16:06 +01:00
lucasheld
42c040f451 feat: add support for uptime kuma 1.21.0 2023-03-20 15:14:39 +01:00
Lucas Held
550be17817
Update README.md 2023-02-27 15:28:28 +01:00
Lucas Held
481eb67ff7
Update README.md 2023-02-15 13:06:38 +01:00
lucasheld
5bfd1044f6 Update README.md 2023-02-13 23:26:01 +01:00
lucasheld
5ce6cf2068 bump version to 0.10.0 2023-02-13 22:51:50 +01:00
lucasheld
14e9f47406 feat: add support for uptime kuma 1.20.0 2023-02-13 22:51:21 +01:00
Lucas Held
8427deffe0
Update README.md 2023-01-20 12:33:50 +01:00
lucasheld
03781e2399 bump version to 0.9.0 2023-01-17 21:29:51 +01:00
lucasheld
5e5c327b20 chore: update year 2023-01-17 21:06:18 +01:00
lucasheld
3e56459fb2 feat: add support for uptime kuma 1.19.5 2023-01-17 21:01:55 +01:00
Lucas Held
cc48b688da
Update README.md 2023-01-09 14:30:14 +01:00
lucasheld
a909c75542 bump version to 0.8.0 2023-01-04 22:49:01 +01:00
lucasheld
982c37045a feat: add support for uptime kuma 1.19.3 2023-01-04 22:46:34 +01:00
lucasheld
4c9d017f62 bump version to 0.7.1 2023-01-01 13:33:05 +01:00
lucasheld
c1d941a200 docs: complete values for uptime kuma 1.19.2 2022-12-30 21:37:13 +01:00
lucasheld
4538901cea docs: fix list type hints 2022-12-29 01:17:48 +01:00
lucasheld
9e3cbe7d59 docs: replace list[dict] with list type hint and add missing type hints 2022-12-29 01:09:44 +01:00
Lucas Held
10a22c8bc2
Update README.md 2022-12-29 00:44:28 +01:00
lucasheld
fd9411c57e bump version to 0.7.0 2022-12-29 00:25:54 +01:00
lucasheld
d01ff6d80e feat: add support for uptime kuma 1.19.2 2022-12-29 00:22:53 +01:00
lucasheld
1e4be04ad7 fix: skip condition check for None values 2022-12-29 00:17:57 +01:00
lucasheld
bc53116d9f bump version to 0.6.0 2022-12-23 14:09:13 +01:00
lucasheld
e0c4207984 feat: add parameter wait_timeout to adjust connection timeout 2022-12-23 14:07:46 +01:00
lucasheld
d68168b769 docs: add return types and exceptions 2022-12-17 15:31:47 +01:00
lucasheld
748d2b191a docs: write param and type in the same line and add optional to type 2022-12-17 15:31:36 +01:00
Lucas Held
6021e9ebc4
Update README.md 2022-12-16 22:10:52 +01:00
lucasheld
ce5ba2d943 docs: add docstrings and sphinx, readthedocs configuration 2022-12-16 21:43:01 +01:00
lucasheld
face6c731d bump version to 0.5.2 2022-11-05 18:50:57 +01:00
lucasheld
cb62b6c814 test: adjust uptime kuma versions 2022-11-05 18:48:59 +01:00
lucasheld
dac368e2a5 fix: add type to notification provider options 2022-11-05 18:48:31 +01:00
Lucas Held
3c17b2bf16
Update README.md 2022-10-11 11:05:45 +02:00
lucasheld
2c9bf42be5 bump version to 0.5.1 2022-10-07 14:34:04 +02:00
lucasheld
52d33d8751 fix: remove required notification provider args check 2022-10-07 14:32:52 +02:00
lucasheld
3d9a2a40b0 bump version to 0.5.0 2022-10-06 19:03:06 +02:00
lucasheld
c8529d65c3 feat: add support for uptime kuma 1.18.3 2022-10-06 19:01:51 +02:00
lucasheld
b4aa04bec5 bump version to 0.4.0 2022-10-04 19:12:33 +02:00
lucasheld
0ea7865be0 test: add script to run tests for all supported uptime kuma versions 2022-10-04 19:10:06 +02:00
lucasheld
d13621cd9e test: ignore ResourceWarning 2022-10-04 19:08:59 +02:00
lucasheld
06fa29cd41 fix: update event list data after changes 2022-10-04 18:38:17 +02:00
lucasheld
fee0a1dd8e feat: add support for uptime kuma 1.18.1 / 1.18.2 2022-10-04 18:36:32 +02:00
lucasheld
37e1fd6d6f bump version to 0.3.0
x
2022-09-23 19:18:35 +02:00
lucasheld
661c06b15f feat: support autoLogin for enabled disableAuth
Call the login method without parameters to log in when disableAuth is enabled.
2022-09-23 18:24:00 +02:00
lucasheld
ebadfb73e6 fix: set_settings password is only required if disableAuth is enabled 2022-09-23 17:19:00 +02:00
lucasheld
0821f38faa fix: increase event wait time
the statusPageList event is slow
2022-09-18 22:31:38 +02:00
lucasheld
e041d66cdc test: delete objects instead of uploading a backup 2022-09-18 19:54:25 +02:00
lucasheld
a409c2dbaa refactor: use enums 2022-09-18 19:46:20 +02:00
lucasheld
88e4ccc57e bump version to 0.2.2 2022-09-18 14:54:27 +02:00
lucasheld
54d221cdfe fix: convert monitor notificationIDList only once 2022-09-18 14:49:10 +02:00
lucasheld
928462c6b2 test: create objects with all available arguments 2022-09-17 12:24:08 +02:00
lucasheld
de38586bf5 fix: remove tags from monitor input
add_monitor_tag must be used instead
2022-09-17 12:19:36 +02:00
lucasheld
3772d1dc26 bump version to 0.2.1 2022-09-12 22:46:33 +02:00
lucasheld
314f07c93d fix: convert monitor notificationIDList return value
https://github.com/lucasheld/ansible-uptime-kuma/issues/3
2022-09-12 22:45:43 +02:00
lucasheld
12cd8067e4 fix: generate pushToken on push monitor save
https://github.com/lucasheld/ansible-uptime-kuma/issues/5
2022-09-12 20:48:36 +02:00
lucasheld
9b0f0c1e7f bump version to 0.2.0 2022-09-07 13:04:41 +02:00
lucasheld
384fd21726 update scripts 2022-09-07 13:04:16 +02:00
lucasheld
a7f571f508 add support for uptime kuma 1.18.0 2022-09-07 13:03:13 +02:00
lucasheld
45b8b88166 convert monitor data on edit
closes #3
2022-09-06 13:41:51 +02:00
61 changed files with 6426 additions and 846 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ venv
__pycache__ __pycache__
*.egg-info *.egg-info
dist dist
docs/_build

6
.readthedocs.yaml Normal file
View file

@ -0,0 +1,6 @@
version: 2
python:
install:
- method: pip
path: .

View file

@ -1,7 +1,172 @@
## Changelog ## Changelog
### Release 1.2.1
#### Bugfixes
- drop first info event without a version
### Release 1.2.0
#### Features
- add support for uptime kuma 1.23.0 and 1.23.1
#### Bugfixes
- remove `name` from maintenance monitors and status pages
- rstip url globally
- convert sendUrl from bool to int
- validate accepted status codes types
### Release 1.1.0
#### Features
- add support for uptime kuma 1.22.0 and 1.22.1
### Release 1.0.1
#### Bugfixes
- fix ValueError if monitor authMethod is None
### Release 1.0.0
#### Features
- add `ssl_verify` parameter
- add `wait_events` parameter
- implement context manager for UptimeKumaApi class
- drop Python 3.6 support
- implement `get_monitor_status` helper method
- implement timeouts for all methods (`timeout` parameter)
- add support for uptime kuma 1.21.3
- drop support for Uptime Kuma versions < 1.21.3
- check for required notification arguments
- raise exception when deleting an element that does not exist
- replace raw return values with enum values
#### Bugfixes
- adjust monitor `status` type to allow all used values
- fix memory leak
#### BREAKING CHANGES
- Python 3.7+ required
- maintenance parameter `timezone` renamed to `timezoneOption`
- Removed the `wait_timeout` parameter. Use the new `timeout` parameter instead. The `timeout` parameter specifies how many seconds the client should wait for the connection, an expected event or a server response.
- changed return values of methods `get_heartbeats`, `get_important_heartbeats`, `avg_ping`, `uptime`, `cert_info`
- Uptime Kuma versions < 1.21.3 are not supported in uptime-kuma-api 1.0.0+
- Removed the `get_heartbeat` method. This method was never intended to retrieve information. Use `get_heartbeats` or `get_important_heartbeats` instead.
- Types of return values changed to enum values:
- monitor: `type` (str -> MonitorType), `status` (bool -> MonitorStatus), `authMethod` (str -> AuthMethod)
- notification: `type` (str -> NotificationType)
- docker host: `dockerType` (str -> DockerType)
- status page: `style` (str -> IncidentStyle)
- maintenance: `strategy` (str -> MaintenanceStrategy)
- proxy: `protocol` (str -> ProxyProtocol)
### Release 0.13.0
#### Feature
- add support for uptime kuma 1.21.2
- implement custom socketio headers
#### Bugfix
- do not wait for events that have already arrived
### Release 0.12.0
#### Feature
- add support for uptime kuma 1.21.1
### Release 0.11.0
#### Feature
- add support for uptime kuma 1.21.0
### Release 0.10.0
#### Feature
- add support for uptime kuma 1.20.0
### Release 0.9.0
#### Feature
- add support for uptime kuma 1.19.5
### Release 0.8.0
#### Feature
- add support for uptime kuma 1.19.3
### Release 0.7.1
#### Bugfix
- remove unsupported type hints on old python versions
### Release 0.7.0
#### Feature
- add support for uptime kuma 1.19.2
#### Bugfix
- skip condition check for None values
### Release 0.6.0
#### Feature
- add parameter `wait_timeout` to adjust connection timeout
### Release 0.5.2
#### Bugfix
- add type to notification provider options
### Release 0.5.1
#### Bugfix
- remove required notification provider args check
### Release 0.5.0
#### Feature
- support for uptime kuma 1.18.3
### Release 0.4.0
#### Feature
- support for uptime kuma 1.18.1 / 1.18.2
#### Bugfix
- update event list data after changes
### Release 0.3.0
#### Feature
- support autoLogin for enabled disableAuth
#### Bugfix
- set_settings password is only required if disableAuth is enabled
- increase event wait time to receive the slow statusPageList event
### Release 0.2.2
#### Bugfix
- remove `tags` from monitor input
- convert monitor notificationIDList only once
### Release 0.2.1
#### Bugfix
- generate pushToken on push monitor save
- convert monitor notificationIDList return value
### Release 0.2.0
#### Feature
- support for uptime kuma 1.18.0
#### Bugfix
- convert values on monitor edit
### Release 0.1.1 ### Release 0.1.1
#### Bugfix
- implement 2FA login - implement 2FA login
- allow to add monitors to status pages - allow to add monitors to status pages
- do not block certain methods - do not block certain methods

View file

@ -1,6 +1,6 @@
MIT License MIT License
Copyright (c) 2022 Lucas Held Copyright (c) 2023 Lucas Held
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View file

@ -6,7 +6,14 @@ uptime-kuma-api is a Python wrapper for the [Uptime Kuma](https://github.com/lou
This package was developed to configure Uptime Kuma with Ansible. The Ansible collection can be found at https://github.com/lucasheld/ansible-uptime-kuma. This package was developed to configure Uptime Kuma with Ansible. The Ansible collection can be found at https://github.com/lucasheld/ansible-uptime-kuma.
Python version 3.6+ is required. Python version 3.7+ is required.
Supported Uptime Kuma versions:
| Uptime Kuma | uptime-kuma-api |
|-----------------|-----------------|
| 1.21.3 - 1.23.2 | 1.0.0 - 1.2.1 |
| 1.17.0 - 1.21.2 | 0.1.0 - 0.13.0 |
Installation Installation
--- ---
@ -18,7 +25,11 @@ You can install it using pip:
pip install uptime-kuma-api pip install uptime-kuma-api
``` ```
Examples Documentation
---
The API Reference is available on [Read the Docs](https://uptime-kuma-api.readthedocs.io).
Example
--- ---
Once you have installed the python package, you can use it to communicate with an Uptime Kuma instance. Once you have installed the python package, you can use it to communicate with an Uptime Kuma instance.
@ -43,3 +54,17 @@ At the end, the connection to the API must be disconnected so that the program d
```python ```python
>>> api.disconnect() >>> api.disconnect()
``` ```
With a context manager, the disconnect method is called automatically:
```python
from uptime_kuma_api import UptimeKumaApi
with UptimeKumaApi('INSERT_URL') as api:
api.login('INSERT_USERNAME', 'INSERT_PASSWORD')
api.add_monitor(
type=MonitorType.HTTP,
name="Google",
url="https://google.com"
)
```

4
dev-requirements.txt Normal file
View file

@ -0,0 +1,4 @@
Sphinx==5.3.0
pyotp==2.8.0
Jinja2==3.1.2
BeautifulSoup4==4.12.2

20
docs/Makefile Normal file
View file

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

46
docs/api.rst Normal file
View file

@ -0,0 +1,46 @@
.. _api:
Main Interface
--------------
.. module:: uptime_kuma_api
.. autoclass:: UptimeKumaApi
:inherited-members:
Enums
-----
.. autoclass:: AuthMethod
:members:
.. autoclass:: MonitorType
:members:
.. autoclass:: MonitorStatus
:members:
.. autoclass:: NotificationType
:members:
.. autoclass:: ProxyProtocol
:members:
.. autoclass:: IncidentStyle
:members:
.. autoclass:: DockerType
:members:
.. autoclass:: MaintenanceStrategy
:members:
Exceptions
----------
.. autoexception:: UptimeKumaException
.. autoexception:: Timeout

53
docs/conf.py Normal file
View file

@ -0,0 +1,53 @@
# Configuration file for the Sphinx documentation builder.
#
# For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
import os
import sys
sys.path.insert(0, os.path.abspath(".."))
import uptime_kuma_api
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
project = 'uptime-kuma-api'
copyright = '2023, Lucas Held'
author = 'Lucas Held'
version = uptime_kuma_api.__version__
release = uptime_kuma_api.__version__
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
extensions = [
"sphinx.ext.autodoc"
]
templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
toc_object_entries_show_parents = "hide"
autodoc_member_order = "bysource"
add_module_names = False
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = 'alabaster'
html_static_path = ['_static']
html_theme_options = {
"show_powered_by": False,
"github_user": "lucasheld",
"github_repo": "uptime-kuma-api",
"github_banner": True,
"show_related": False,
"note_bg": "#FFF59C",
"github_button": True,
"github_type": "star"
}

32
docs/index.rst Normal file
View file

@ -0,0 +1,32 @@
.. uptime-kuma-api documentation master file, created by
sphinx-quickstart on Thu Dec 15 11:58:11 2022.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
uptime-kuma-api
===============
Release v\ |version|. (:ref:`Installation <install>`)
A Python wrapper for the Uptime Kuma Socket.IO API
-------------------
Indices and tables
------------------
* :ref:`genindex`
* :ref:`search`
API Reference
-------------
If you are looking for information on a specific function, class, or method,
this part of the documentation is for you.
.. toctree::
:maxdepth: 3
api

8
docs/install.rst Normal file
View file

@ -0,0 +1,8 @@
.. _install:
Installation
------------
To install uptime-kuma-api, run this command in your terminal::
$ pip install uptime-kuma-api

35
docs/make.bat Normal file
View file

@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
if "%1" == "" goto help
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

View file

@ -1 +1,2 @@
python-socketio[client]>=5.0.0 python-socketio[client]>=5.0.0
packaging

29
run_tests.sh Executable file
View file

@ -0,0 +1,29 @@
#!/bin/sh
version="$1"
if [ $version ]
then
versions=("$version")
else
versions=(1.23.2 1.23.0 1.22.1 1.22.0 1.21.3)
fi
for version in ${versions[*]}
do
echo "Starting uptime kuma $version..."
docker run -d -it --rm -p 3001:3001 --name uptimekuma "louislam/uptime-kuma:$version" > /dev/null
while [[ "$(curl -s -L -o /dev/null -w ''%{http_code}'' 127.0.0.1:3001)" != "200" ]]
do
sleep 0.5
done
echo "Running tests..."
python -m unittest discover -s tests
echo "Stopping uptime kuma..."
docker stop uptimekuma > /dev/null
sleep 1
echo ''
done

1
scripts/.gitignore vendored
View file

@ -1 +1,2 @@
uptime-kuma uptime-kuma
uptime-kuma-old

108
scripts/build_inputs.py Normal file
View file

@ -0,0 +1,108 @@
import glob
import re
from bs4 import BeautifulSoup
from utils import deduplicate_list, parse_vue_template, diff, type_html_to_py
input_ignores = {
"settings": [
"$root.styleElapsedTime"
]
}
def parse_model_keys(content, object_name):
soup = BeautifulSoup(content, "html.parser")
soup_inputs = soup.find_all(attrs={"v-model": True})
inputs = []
for soup_input in soup_inputs:
key = soup_input["v-model"]
if key in input_ignores.get(object_name, []):
continue
else:
key = re.sub(r'^' + object_name + r'\.', "", key)
type_ = soup_input.get("type", "text")
type_ = type_html_to_py(type_)
if type_ == "bool":
value = True if soup_input.get("checked") else False
else:
value = soup_input.get("value")
inputs.append({
"key": key,
"type": type_,
"default": value,
})
return inputs
def parse_proxy_keys(root):
content = parse_vue_template(f"{root}/src/components/ProxyDialog.vue")
keys = parse_model_keys(content, "proxy")
return keys
def parse_notification_keys(root):
content = parse_vue_template(f"{root}/src/components/NotificationDialog.vue")
keys = parse_model_keys(content, "notification")
return keys
def parse_settings_keys(root):
all_keys = []
for path in glob.glob(f'{root}/src/components/settings/*'):
content = parse_vue_template(path)
keys = parse_model_keys(content, "settings")
all_keys.extend(keys)
all_keys = deduplicate_list(all_keys)
return all_keys
def parse_monitor_keys(root):
content = parse_vue_template(f"{root}/src/pages/EditMonitor.vue")
keys = parse_model_keys(content, "monitor")
return keys
def parse_status_page_keys(root):
all_keys = ["id"]
content = parse_vue_template(f"{root}/src/pages/StatusPage.vue")
keys = parse_model_keys(content, "config")
all_keys.extend(keys)
content = parse_vue_template(f"{root}/src/pages/ManageStatusPage.vue")
keys = parse_model_keys(content, "statusPage")
all_keys.extend(keys)
all_keys = deduplicate_list(all_keys)
return all_keys
def parse_maintenance_keys(root):
content = parse_vue_template(f"{root}/src/pages/EditMaintenance.vue")
keys = parse_model_keys(content, "maintenance")
return keys
def main():
root_old = "uptime-kuma-old"
root_new = "uptime-kuma"
for name, func in [
["proxy", parse_proxy_keys],
["notification", parse_notification_keys],
["settings", parse_settings_keys],
["monitor", parse_monitor_keys],
["status_page", parse_status_page_keys],
["maintenance", parse_maintenance_keys],
]:
keys_old = func(root_old)
keys_new = func(root_new)
print(f"{name}:")
diff(keys_old, keys_new)
if __name__ == "__main__":
main()

View file

@ -1,8 +1,9 @@
import re import re
from pprint import pprint
from utils import deduplicate_list, diff
def parse_data_keys(data): def parse_json_keys(data):
keys = [] keys = []
for line in data.split("\n"): for line in data.split("\n"):
line = line.strip() line = line.strip()
@ -17,84 +18,126 @@ def parse_data_keys(data):
return keys return keys
def parse_heartbeat(): def parse_heartbeat(root):
with open('uptime-kuma/server/model/heartbeat.js') as f: with open(f'{root}/server/model/heartbeat.js') as f:
content = f.read() content = f.read()
all_keys = [] all_keys = []
match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content) match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1) data = match.group(1)
keys = parse_data_keys(data) keys = parse_json_keys(data)
all_keys.extend(keys) all_keys.extend(keys)
match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content) match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1) data = match.group(1)
keys = parse_data_keys(data) keys = parse_json_keys(data)
all_keys.extend(keys) all_keys.extend(keys)
all_keys = list(set(all_keys)) all_keys = deduplicate_list(all_keys)
return all_keys return all_keys
def parse_incident(): def parse_incident(root):
with open('uptime-kuma/server/model/incident.js') as f: with open(f'{root}/server/model/incident.js') as f:
content = f.read() content = f.read()
match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content) match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1) data = match.group(1)
keys = parse_data_keys(data) keys = parse_json_keys(data)
return keys return keys
def parse_monitor(): def parse_monitor(root):
# todo: toPublicJSON ??? # todo: toPublicJSON ???
with open('uptime-kuma/server/model/monitor.js') as f: with open(f'{root}/server/model/monitor.js') as f:
content = f.read() content = f.read()
matches = re.findall(r'data = {([^}]+)}', content) matches = re.findall(r'data = {([^}]+)}', content)
all_keys = [] all_keys = []
for match in matches: for match in matches:
keys = parse_data_keys(match) keys = parse_json_keys(match)
keys = [i for i in keys if i != "...data"] keys = [i for i in keys if i != "...data"]
all_keys.extend(keys) all_keys.extend(keys)
all_keys = deduplicate_list(all_keys)
return all_keys return all_keys
def parse_proxy(): def parse_proxy(root):
with open('uptime-kuma/server/model/proxy.js') as f: with open(f'{root}/server/model/proxy.js') as f:
content = f.read() content = f.read()
match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content) match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1) data = match.group(1)
keys = parse_data_keys(data) keys = parse_json_keys(data)
return keys return keys
def parse_status_page(): # def parse_function(regex_name, content):
with open('uptime-kuma/server/model/status_page.js') as f: # match = re.search(regex_name, content)
# name = match.group(0)
# rest = "".join(content.split(name)[1:])
#
# brackets = 0
# opening_bracket_found = False
# code = ""
# for i in rest:
# code += i
# if i == "{":
# opening_bracket_found = True
# brackets += 1
# if i == "}":
# opening_bracket_found = True
# brackets -= 1
# if opening_bracket_found and brackets == 0:
# break
# return code
# # input (add, edit proxy)
# def parse_proxy2():
# with open(f'{root}/server/proxy.js') as f:
# content = f.read()
#
# code = parse_function(r'async save\([^)]+\) ', content)
# keys = parse_object_keys(code, "proxy")
# return keys
def parse_status_page(root):
with open(f'{root}/server/model/status_page.js') as f:
content = f.read() content = f.read()
all_keys = [] all_keys = []
match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content) match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1) data = match.group(1)
keys = parse_data_keys(data) keys = parse_json_keys(data)
all_keys.extend(keys) all_keys.extend(keys)
match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content) match = re.search(r'toPublicJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1) data = match.group(1)
keys = parse_data_keys(data) keys = parse_json_keys(data)
all_keys.extend(keys) all_keys.extend(keys)
all_keys = list(set(all_keys)) all_keys = deduplicate_list(all_keys)
return all_keys return all_keys
def parse_tag(): def parse_tag(root):
with open('uptime-kuma/server/model/tag.js') as f: with open(f'{root}/server/model/tag.js') as f:
content = f.read() content = f.read()
match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content) match = re.search(r'toJSON\(\) {\s+return.*{([^}]+)}', content)
data = match.group(1) data = match.group(1)
keys = parse_data_keys(data) keys = parse_json_keys(data)
return keys return keys
pprint(parse_heartbeat()) if __name__ == "__main__":
pprint(parse_incident()) root_old = "uptime-kuma-old"
pprint(parse_monitor()) root_new = "uptime-kuma"
pprint(parse_proxy())
pprint(parse_status_page()) for name, func in [
pprint(parse_tag()) ["heartbeat", parse_heartbeat],
["incident", parse_incident],
["monitor", parse_monitor],
["proxy", parse_proxy],
["status page", parse_status_page],
["tag", parse_tag],
]:
keys_old = func(root_old)
keys_new = func(root_new)
print(f"{name}:")
diff(keys_old, keys_new)
# TODO: # TODO:

View file

@ -0,0 +1,54 @@
from bs4 import BeautifulSoup
from utils import parse_vue_template, write_to_file
titles = {
"http": "HTTP(s)",
"port": "TCP Port",
"ping": "Ping",
"keyword": "HTTP(s) - Keyword",
"grpc-keyword": "gRPC(s) - Keyword",
"dns": "DNS",
"docker": "Docker Container",
"push": "Push",
"steam": "Steam Game Server",
"gamedig": "GameDig",
"mqtt": "MQTT",
"sqlserver": "Microsoft SQL Server",
"postgres": "PostgreSQL",
"mysql": "MySQL/MariaDB",
"mongodb": "MongoDB",
"radius": "Radius",
"redis": "Redis",
"group": "Group",
"json-query": "HTTP(s) - Json Query",
"real-browser": "HTTP(s) - Browser Engine (Chrome/Chromium)",
"kafka-producer": "Kafka Producer",
"tailscale-ping": "Tailscale Ping"
}
def parse_monitor_types():
content = parse_vue_template("uptime-kuma/src/pages/EditMonitor.vue")
soup = BeautifulSoup(content, "html.parser")
select = soup.find("select", id="type")
options = select.find_all("option")
types = {}
for o in options:
type_ = o.attrs["value"]
types[type_] = {
"value": type_,
"title": titles[type_]
}
return types
monitor_types = parse_monitor_types()
write_to_file(
"monitor_type.py.j2", "./../uptime_kuma_api/monitor_type.py",
monitor_types=monitor_types
)

View file

@ -0,0 +1,109 @@
from uptime_kuma_api import notification_provider_options
data = {
"lunaseaTarget": """Allowed values: "device", "user".""",
"lunaseaUserID": """User ID.""",
"lunaseaDevice": """Device ID.""",
"pagertreeAutoResolve": """
Available values are:
- ``0``: Do Nothing
- ``resolve``: Auto Resolve""",
"pagertreeUrgency": """
Available values are:
- ``silent``: Silent
- ``low``: Low
- ``medium``: Medium
- ``high``: High
- ``critical``: Critical""",
"promosmsAllowLongSMS": "Allow long SMS.",
"promosmsPhoneNumber": "Phone number (for Polish recipient You can skip area codes).",
"promosmsSMSType": """
Available values are:
- ``0``: SMS FLASH - Message will automatically show on recipient device. Limited only to Polish recipients.
- ``1``: SMS ECO - cheap but slow and often overloaded. Limited only to Polish recipients.
- ``3``: SMS FULL - Premium tier of SMS, You can use your Sender Name (You need to register name first). Reliable for alerts.
- ``4``: SMS SPEED - Highest priority in system. Very quick and reliable but costly (about twice of SMS FULL price).""",
"smseagleEncoding": "True to send messages in unicode.",
"smseaglePriority": "Message priority (0-9, default = 0).",
"smseagleRecipientType": """Recipient type.
Available values are:
- ``smseagle-to``: Phone number(s)
- ``smseagle-group``: Phonebook group name(s)
- ``smseagle-contact``: Phonebook contact name(s)""",
"smseagleToken": "API Access token.",
"smseagleRecipient": "Recipient(s) (multiple must be separated with comma).",
"smseagleUrl": "Your SMSEagle device URL.",
"splunkAutoResolve": """Auto resolve or acknowledged.
Available values are:
- ``0``: do nothing
- ``ACKNOWLEDGEMENT``: auto acknowledged
- ``RECOVERY``: auto resolve""",
"splunkSeverity": """Severity.
Available values are:
- ``INFO``
- ``WARNING``
- ``CRITICAL``""",
"splunkRestURL": "Splunk Rest URL.",
"opsgeniePriority": "Priority. Available values are numbers between ``1`` and ``5``.",
"opsgenieRegion": """Region. Available values are:
- ``us``: US (Default)
- ``eu``: EU""",
"opsgenieApiKey": "API Key.",
"twilioAccountSID": "Account SID.",
"twilioAuthToken": "Auth Token.",
"twilioToNumber": "To Number.",
"twilioFromNumber": "From Number.",
"pushoverttl": "Message TTL (Seconds).",
"ntfyaccesstoken": "Access Token.",
"ntfyAuthenticationMethod": "Authentication Method.",
}
for provider in notification_provider_options:
provider_options = notification_provider_options[provider]
for option in provider_options:
type_ = provider_options[option]["type"]
required = provider_options[option]["required"]
text = data.get(option)
line = f":param {type_}{', optional' if required else ''} {option}: Notification option for ``type`` :attr:`~.NotificationType.{provider.name}`."
if text:
line += f" {text}"
print(line)

View file

@ -1,18 +0,0 @@
from uptime_kuma_api import notification_provider_options
def build_providers():
providers = []
for provider_enum in notification_provider_options:
provider = provider_enum.__dict__["_value_"]
providers.append(provider)
return providers
for provider in build_providers():
options = notification_provider_options[provider]
for option in options:
print(f'{option}:')
print(f' description: {provider} provider option.')
print(f' returned: if type is {provider}')
print(' type: str')

View file

@ -1,24 +1,98 @@
import glob import glob
import re import re
from pprint import pprint
import jinja2
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from uptime_kuma_api import convert_from_socket, params_map_notification_provider_options from utils import deduplicate_list, write_to_file, type_html_to_py
def deduplicate_list(l): # deprecated or wrong inputs
out = [] ignored_inputs = {
for i in l: "slack": [
if i not in out: "slackbutton"
out.append(i) ],
return out "rocket.chat": [
"rocketbutton"
],
"octopush": [
"octopushDMLogin",
"octopushDMAPIKey",
"octopushDMPhoneNumber",
"octopushDMSenderName",
"octopushDMSMSType"
],
"Splunk": [
"pagerdutyIntegrationKey"
]
}
input_overwrites = {
"showAdditionalHeadersField": "webhookAdditionalHeaders"
}
titles = {
"alerta": "Alerta",
"AlertNow": "AlertNow",
"apprise": "Apprise (Support 50+ Notification services)",
"Bark": "Bark",
"clicksendsms": "ClickSend SMS",
"discord": "Discord",
"GoogleChat": "Google Chat (Google Workspace)",
"gorush": "Gorush",
"gotify": "Gotify",
"HomeAssistant": "Home Assistant",
"Kook": "Kook",
"line": "LINE Messenger",
"LineNotify": "LINE Notify",
"lunasea": "LunaSea",
"matrix": "Matrix",
"mattermost": "Mattermost",
"ntfy": "Ntfy",
"octopush": "Octopush",
"OneBot": "OneBot",
"Opsgenie": "Opsgenie",
"PagerDuty": "PagerDuty",
"PagerTree": "PagerTree",
"pushbullet": "Pushbullet",
"PushByTechulus": "Push by Techulus",
"pushover": "Pushover",
"pushy": "Pushy",
"rocket.chat": "Rocket.Chat",
"signal": "Signal",
"slack": "Slack",
"squadcast": "SquadCast",
"SMSEagle": "SMSEagle",
"smtp": "Email (SMTP)",
"stackfield": "Stackfield",
"teams": "Microsoft Teams",
"telegram": "Telegram",
"twilio": "Twilio",
"Splunk": "Splunk",
"webhook": "Webhook",
"GoAlert": "GoAlert",
"ZohoCliq": "ZohoCliq",
"AliyunSMS": "AliyunSMS",
"DingDing": "DingDing",
"Feishu": "Feishu",
"FreeMobile": "FreeMobile (mobile.free.fr)",
"PushDeer": "PushDeer",
"promosms": "PromoSMS",
"serwersms": "SerwerSMS.pl",
"SMSManager": "SmsManager (smsmanager.cz)",
"WeCom": "WeCom",
"ServerChan": "ServerChan",
"nostr": "Nostr",
"FlashDuty": "FlashDuty",
"smsc": "SMSC",
}
def build_notification_providers(): def build_notification_providers():
providers = [] root = "uptime-kuma"
for path in glob.glob('uptime-kuma/server/notification-providers/*'): providers = {}
# get providers and input names
for path in sorted(glob.glob(f'{root}/server/notification-providers/*')):
with open(path) as f: with open(path) as f:
content = f.read() content = f.read()
match = re.search(r'class [^ ]+ extends NotificationProvider {', content) match = re.search(r'class [^ ]+ extends NotificationProvider {', content)
@ -26,20 +100,20 @@ def build_notification_providers():
match = re.search(r'name = "([^"]+)";', content) match = re.search(r'name = "([^"]+)";', content)
name = match.group(1) name = match.group(1)
inputs = re.findall(r'notification\.([^ ,.;})\]]+)', content) inputs = re.findall(r'notification\??\.([^ ,.;})\]]+)', content)
inputs = deduplicate_list(inputs) inputs = deduplicate_list(inputs)
inputs = [i.strip() for i in inputs] inputs = [i.strip() for i in inputs]
providers.append({ providers[name] = {
"name": name, "title": titles[name],
"inputs": inputs, "inputs": {},
}) }
return providers for input_ in inputs:
if input_ not in ignored_inputs.get(name, []):
providers[name]["inputs"][input_] = {}
# get inputs
def build_notification_provider_conditions(): for path in glob.glob(f'{root}/src/components/notifications/*'):
conditions = {}
for path in glob.glob('uptime-kuma/src/components/notifications/*'):
if path.endswith("index.js"): if path.endswith("index.js"):
continue continue
with open(path) as f: with open(path) as f:
@ -47,32 +121,62 @@ def build_notification_provider_conditions():
match = re.search(r'<template>[\s\S]+</template>', content, re.MULTILINE) match = re.search(r'<template>[\s\S]+</template>', content, re.MULTILINE)
html = match.group(0) html = match.group(0)
soup = BeautifulSoup(html, "html.parser") soup = BeautifulSoup(html, "html.parser")
inputs = soup.find_all("input") inputs = soup.find_all(attrs={"v-model": True})
for input in inputs: for input_ in inputs:
condition = {} conditions = {}
attrs = input.attrs attrs = input_.attrs
v_model = attrs.get("v-model") v_model = attrs.get("v-model")
min_ = attrs.get("min")
max_ = attrs.get("max") v_model_overwrite = input_overwrites.get(v_model)
if min_: if v_model_overwrite:
condition["min"] = int(min_) param_name = v_model_overwrite
if max_: else:
condition["max"] = int(max_)
param_name = re.match(r'\$parent.notification.(.*)$', v_model).group(1) param_name = re.match(r'\$parent.notification.(.*)$', v_model).group(1)
if condition:
conditions[param_name] = condition type_ = attrs.get("type")
conditions = convert_from_socket(params_map_notification_provider_options, conditions) type_ = type_html_to_py(type_)
return conditions
required_true_values = ['', 'true']
if attrs.get("required") in required_true_values or attrs.get(":required") in required_true_values:
required = True
else:
required = False
min_ = attrs.get("min")
if min_:
conditions["min"] = int(min_)
max_ = attrs.get("max")
if max_:
conditions["max"] = int(max_)
# find provider inputs dict
input_found = False
for name in list(providers.keys()):
inputs = providers[name]["inputs"]
for provider_input in inputs:
if provider_input == param_name:
input_found = True
providers[name]["inputs"][provider_input] = {
"conditions": conditions,
"type": type_,
"required": required
}
assert input_found
return providers
def write_to_file(template, destination, **kwargs): notification_providers = build_notification_providers()
env = jinja2.Environment(loader=jinja2.FileSystemLoader("./"))
template = env.get_template(template)
rendered = template.render(**kwargs)
with open(destination, "w") as f:
f.write(rendered)
conditions = build_notification_provider_conditions() notification_provider_conditions = {}
pprint(conditions) for notification_provider in notification_providers:
# notification_providers = build_notification_providers() for notification_provider_input_name in notification_providers[notification_provider]["inputs"]:
# write_to_file("notification_providers.py.j2", "./../uptimekumaapi/notification_providers.py", notification_providers=notification_providers) notification_provider_input = notification_providers[notification_provider]["inputs"][notification_provider_input_name]
if notification_provider_input["conditions"]:
notification_provider_conditions[notification_provider_input_name] = notification_provider_input["conditions"]
write_to_file(
"notification_providers.py.j2", "./../uptime_kuma_api/notification_providers.py",
notification_providers=notification_providers,
notification_provider_conditions=notification_provider_conditions
)

View file

@ -1,10 +0,0 @@
params_map_notification_provider_options = {
{%- for provider in notification_provider_map %}
{%- set options = notification_provider_map[provider] %}
'{{ provider }}': {
{%- for key, value in options.items() %}
'{{ key }}': '{{ value }}',
{%- endfor %}
},
{%- endfor %}
}

View file

@ -1,40 +0,0 @@
import re
import os
import jinja2
from uptime_kuma_api import params_map_notification_providers, notification_provider_options
def build_notification_provider_map():
params_map_notification_provider_options = {}
for provider_sock, provider_py in params_map_notification_providers.items():
options_sock = notification_provider_options[provider_sock]
params_map_notification_provider_options[provider_py] = {}
for option in options_sock:
option_orig = option
# for example for rocket_chat
prefix = os.path.commonprefix([o.lower() for o in options_sock] + [provider_py])
option = option[len(prefix):]
option = re.sub('([A-Z]+)', r'_\1', option).lower()
# for example for smtp
if option.startswith(provider_py):
option = option[len(provider_py):]
option = provider_py + "_" + option
option = option.replace("__", "_")
params_map_notification_provider_options[provider_py][option_orig] = option
return params_map_notification_provider_options
notification_provider_map = build_notification_provider_map()
env = jinja2.Environment(loader=jinja2.FileSystemLoader("./"))
template = env.get_template("build_params_map_notification_provider_options.j2")
rendered = template.render(notification_provider_map=notification_provider_map)
print(rendered)

View file

@ -1,12 +0,0 @@
import re
from uptime_kuma_api import notification_provider_options
params_map_notification_providers = {}
for notification_provider in notification_provider_options:
provider_name_orig = notification_provider.__dict__["_value_"]
provider_name = re.sub('([A-Z]+)', r'_\1', provider_name_orig).lower().replace(".", "_").strip("_")
params_map_notification_providers[provider_name_orig] = provider_name
print(params_map_notification_providers)

View file

@ -1,6 +0,0 @@
notification = [
"type",
"isDefault",
"userId",
"applyExisting",
]

View file

@ -0,0 +1,11 @@
from enum import Enum
class MonitorType(str, Enum):
"""Enumerate monitor types."""
{{""}}
{%- for type_ in monitor_types.values() %}
{{ type_["value"].upper().replace("-", "_") }} = "{{ type_["value"] }}"
"""{{ type_["title"] }}"""
{% endfor -%}

View file

@ -2,20 +2,30 @@ from enum import Enum
class NotificationType(str, Enum): class NotificationType(str, Enum):
{%- for notification_provider in notification_providers %} """Enumerate notification types."""
{%- set name = notification_provider["name"] %} {% for provider in notification_providers %}
{{ name.upper().replace(".", "_") }} = "{{ name }}" {{ provider.upper().replace(".", "_") }} = "{{ provider }}"
{%- endfor %} """{{ notification_providers[provider]["title"] }}"""
{% endfor %}
notification_provider_options = { notification_provider_options = {
{%- for notification_provider in notification_providers %} {%- for provider in notification_providers %}
{%- set name = notification_provider["name"] %} NotificationType.{{ provider.upper().replace(".", "_") }}: dict(
NotificationType.{{ name.upper().replace(".", "_") }}: [ {%- for input_name in notification_providers[provider]["inputs"] %}
{%- for input in notification_provider["inputs"] %} {%- set input = notification_providers[provider]["inputs"][input_name] %}
"{{ input }}", {{ input_name }}=dict(type="{{ input["type"] }}", required={{ input["required"] }}),
{%- endfor %} {%- endfor %}
], ),
{%- endfor %} {%- endfor %}
} }
notification_provider_conditions = dict(
{%- for provider in notification_provider_conditions %}
{{ provider }}=dict(
{%- for key, value in notification_provider_conditions[provider].items() %}
{{ key }}={{ value }},
{%- endfor %}
),
{%- endfor %}
)

View file

@ -1 +0,0 @@
Jinja2==3.1.2

46
scripts/utils.py Normal file
View file

@ -0,0 +1,46 @@
import re
import jinja2
def deduplicate_list(l):
out = []
for i in l:
if i not in out:
out.append(i)
return out
def parse_vue_template(path):
with open(path) as f:
vue = f.read()
match = re.search(r'<template>[\s\S]+</template>', vue, re.MULTILINE)
template = match.group(0)
return template
def write_to_file(template, destination, **kwargs):
env = jinja2.Environment(loader=jinja2.FileSystemLoader("./"))
template = env.get_template(template)
rendered = template.render(**kwargs)
with open(destination, "w") as f:
f.write(rendered)
def diff(old, new):
for i in new:
if i not in old:
print("+", i)
for i in old:
if i not in new:
print("-", i)
print("")
def type_html_to_py(type_):
if type_ == "number":
type_ = "int"
elif type_ == "checkbox":
type_ = "bool"
else:
type_ = "str"
return type_

View file

@ -29,10 +29,13 @@ setup(
author_email="lucasheld@hotmail.de", author_email="lucasheld@hotmail.de",
license=info["__license__"], license=info["__license__"],
packages=["uptime_kuma_api"], packages=["uptime_kuma_api"],
python_requires=">=3.6, <4", python_requires=">=3.7, <4",
install_requires=["python-socketio[client]>=5.0.0"], install_requires=[
"python-socketio[client]>=5.0.0",
"packaging"
],
classifiers=[ classifiers=[
"Development Status :: 1 - Planning", "Development Status :: 5 - Production/Stable",
"Environment :: Web Environment", "Environment :: Web Environment",
"Intended Audience :: Developers", "Intended Audience :: Developers",
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
@ -40,7 +43,6 @@ setup(
"Operating System :: OS Independent", "Operating System :: OS Independent",
"Programming Language :: Python", "Programming Language :: Python",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",

View file

@ -1 +0,0 @@
pyotp==2.6.0

59
tests/test_api_key.py Normal file
View file

@ -0,0 +1,59 @@
import unittest
from uptime_kuma_api import UptimeKumaException
from uptime_kuma_test_case import UptimeKumaTestCase
class TestApiKey(UptimeKumaTestCase):
def test_api_key(self):
# get empty list to make sure that future accesses will also work
self.api.get_api_keys()
expected = {
"name": "name 1",
"expires": "2023-03-30 12:20:00",
"active": True
}
# add api key
r = self.api.add_api_key(**expected)
self.assertEqual(r["msg"], "Added Successfully.")
api_key_id = r["keyID"]
# get api key
api_key = self.api.get_api_key(api_key_id)
self.compare(api_key, expected)
# get api keys
api_keys = self.api.get_api_keys()
api_key = self.find_by_id(api_keys, api_key_id)
self.assertIsNotNone(api_key)
self.compare(api_key, expected)
# disable api key
r = self.api.disable_api_key(api_key_id)
self.assertEqual(r["msg"], "Disabled Successfully.")
api_key = self.api.get_api_key(api_key_id)
expected["active"] = False
self.compare(api_key, expected)
# enable api key
r = self.api.enable_api_key(api_key_id)
self.assertEqual(r["msg"], "Enabled Successfully")
api_key = self.api.get_api_key(api_key_id)
expected["active"] = True
self.compare(api_key, expected)
# delete api key
r = self.api.delete_api_key(api_key_id)
self.assertEqual(r["msg"], "Deleted Successfully.")
with self.assertRaises(UptimeKumaException):
self.api.get_api_key(api_key_id)
def test_delete_not_existing_api_key(self):
with self.assertRaises(UptimeKumaException):
self.api.delete_api_key(42)
if __name__ == '__main__':
unittest.main()

60
tests/test_docker_host.py Normal file
View file

@ -0,0 +1,60 @@
import unittest
from packaging.version import parse as parse_version
from uptime_kuma_api import DockerType, UptimeKumaException
from uptime_kuma_test_case import UptimeKumaTestCase
class TestDockerHost(UptimeKumaTestCase):
def test_docker_host(self):
# get empty list to make sure that future accesses will also work
self.api.get_docker_hosts()
expected_docker_host = {
"name": "name 1",
"dockerType": DockerType.SOCKET,
"dockerDaemon": "/var/run/docker.sock"
}
# test docker host
if parse_version(self.api.version) != parse_version("1.23.0"):
# test_docker_host does not work in 1.23.0 (https://github.com/louislam/uptime-kuma/issues/3605)
with self.assertRaisesRegex(UptimeKumaException, r'connect ENOENT /var/run/docker.sock'):
self.api.test_docker_host(**expected_docker_host)
# add docker host
r = self.api.add_docker_host(**expected_docker_host)
self.assertEqual(r["msg"], "Saved")
docker_host_id = r["id"]
# get docker host
docker_host = self.api.get_docker_host(docker_host_id)
self.compare(docker_host, expected_docker_host)
# get docker hosts
docker_hosts = self.api.get_docker_hosts()
self.assertTrue(type(docker_hosts[0]["dockerType"]) == DockerType)
docker_host = self.find_by_id(docker_hosts, docker_host_id)
self.assertIsNotNone(docker_host)
self.compare(docker_host, expected_docker_host)
# edit docker host
r = self.api.edit_docker_host(docker_host_id, name="name 2")
self.assertEqual(r["msg"], "Saved")
docker_host = self.api.get_docker_host(docker_host_id)
expected_docker_host["name"] = "name 2"
self.compare(docker_host, expected_docker_host)
# delete docker host
r = self.api.delete_docker_host(docker_host_id)
self.assertEqual(r["msg"], "Deleted")
with self.assertRaises(UptimeKumaException):
self.api.get_docker_host(docker_host_id)
def test_delete_not_existing_docker_host(self):
with self.assertRaises(UptimeKumaException):
self.api.delete_docker_host(42)
if __name__ == '__main__':
unittest.main()

13
tests/test_game_list.py Normal file
View file

@ -0,0 +1,13 @@
import unittest
from uptime_kuma_test_case import UptimeKumaTestCase
class TestGameList(UptimeKumaTestCase):
def test_game_list(self):
game_list = self.api.get_game_list()
self.assertTrue("keys" in game_list[0])
if __name__ == '__main__':
unittest.main()

View file

@ -1,14 +1,19 @@
import unittest import unittest
from uptime_kuma_api import MonitorStatus
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
class TestHeartbeat(UptimeKumaTestCase): class TestHeartbeat(UptimeKumaTestCase):
def test_get_heartbeats(self): def test_get_heartbeats(self):
self.api.get_heartbeats() self.add_monitor()
r = self.api.get_heartbeats()
self.assertTrue(type(list(r.values())[0][0]["status"]) == MonitorStatus)
def test_get_important_heartbeats(self): def test_get_important_heartbeats(self):
self.api.get_important_heartbeats() self.add_monitor()
r = self.api.get_important_heartbeats()
self.assertTrue(type(list(r.values())[0][0]["status"]) == MonitorStatus)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -0,0 +1,15 @@
import unittest
from uptime_kuma_api import MonitorStatus
from uptime_kuma_test_case import UptimeKumaTestCase
class TestHelperMethods(UptimeKumaTestCase):
def test_monitor_status(self):
monitor_id = self.add_monitor()
status = self.api.get_monitor_status(monitor_id)
self.assertTrue(type(status) == MonitorStatus)
if __name__ == '__main__':
unittest.main()

View file

@ -1,5 +1,6 @@
import unittest import unittest
from uptime_kuma_api import UptimeKumaApi
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
@ -9,6 +10,16 @@ class TestInfo(UptimeKumaTestCase):
self.assertIn("version", info) self.assertIn("version", info)
self.assertIn("latestVersion", info) self.assertIn("latestVersion", info)
def test_info_with_version(self):
# If wait_events is set to 0, the first info event is normally used.
# The info event handler needs to drop this first event without a version.
self.api.logout()
self.api.disconnect()
self.api = UptimeKumaApi(self.url, wait_events=0)
self.api.login(self.username, self.password)
info = self.api.info()
self.assertIn("version", info)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

31
tests/test_login.py Normal file
View file

@ -0,0 +1,31 @@
import unittest
from uptime_kuma_api import UptimeKumaApi
from uptime_kuma_test_case import UptimeKumaTestCase
class TestLogin(UptimeKumaTestCase):
def test_auto_login(self):
# disable auth
r = self.api.set_settings(self.password, disableAuth=True)
self.assertEqual(r["msg"], "Saved")
# login again without username and password
self.api.logout()
self.api.disconnect()
self.api = UptimeKumaApi(self.url)
self.api.login()
r = self.api.get_settings()
self.assertTrue(r["disableAuth"])
# enable auth again
r = self.api.set_settings(disableAuth=False)
self.assertEqual(r["msg"], "Saved")
r = self.api.get_settings()
self.assertFalse(r["disableAuth"])
if __name__ == '__main__':
unittest.main()

269
tests/test_maintenance.py Normal file
View file

@ -0,0 +1,269 @@
import unittest
from uptime_kuma_api import UptimeKumaException, MaintenanceStrategy
from uptime_kuma_test_case import UptimeKumaTestCase
class TestMaintenance(UptimeKumaTestCase):
def test_maintenance(self):
expected_maintenance = {
"title": "maintenance 1",
"description": "test",
"strategy": MaintenanceStrategy.SINGLE,
"active": True,
"intervalDay": 1,
"dateRange": [
"2022-12-27 22:36:00",
"2022-12-29 22:36:00"
],
"weekdays": [],
"daysOfMonth": [],
"timezoneOption": "Europe/Berlin"
}
# add maintenance
r = self.api.add_maintenance(**expected_maintenance)
self.assertEqual(r["msg"], "Added Successfully.")
maintenance_id = r["maintenanceID"]
# get maintenance
maintenance = self.api.get_maintenance(maintenance_id)
self.assertTrue(type(maintenance["strategy"]) == MaintenanceStrategy)
self.compare(maintenance, expected_maintenance)
# get maintenances
maintenances = self.api.get_maintenances()
self.assertTrue(type(maintenances[0]["strategy"]) == MaintenanceStrategy)
maintenance = self.find_by_id(maintenances, maintenance_id)
self.assertIsNotNone(maintenance)
self.compare(maintenance, expected_maintenance)
# edit maintenance
expected_maintenance["strategy"] = MaintenanceStrategy.RECURRING_INTERVAL
expected_maintenance["title"] = "maintenance 1 new"
r = self.api.edit_maintenance(maintenance_id, **expected_maintenance)
self.assertEqual(r["msg"], "Saved.")
maintenance = self.api.get_maintenance(maintenance_id)
self.compare(maintenance, expected_maintenance)
# pause maintenance
r = self.api.pause_maintenance(maintenance_id)
self.assertEqual(r["msg"], "Paused Successfully.")
# resume maintenance
r = self.api.resume_maintenance(maintenance_id)
self.assertEqual(r["msg"], "Resume Successfully")
# add monitor maintenance
monitor_name = "monitor 1"
monitor_id = self.add_monitor(monitor_name)
monitors = [
{
"id": monitor_id
},
]
r = self.api.add_monitor_maintenance(maintenance_id, monitors)
self.assertEqual(r["msg"], "Added Successfully.")
# get monitor maintenance
monitors = self.api.get_monitor_maintenance(maintenance_id)
monitor = self.find_by_id(monitors, monitor_id)
self.assertIsNotNone(monitor)
# add status page maintenance
status_page_title = "status page 1"
status_page_id = self.add_status_page(status_page_title)
status_pages = [
{
"id": status_page_id
}
]
r = self.api.add_status_page_maintenance(maintenance_id, status_pages)
self.assertEqual(r["msg"], "Added Successfully.")
# get status page maintenance
status_pages = self.api.get_status_page_maintenance(maintenance_id)
status_page = self.find_by_id(status_pages, status_page_id)
self.assertIsNotNone(status_page)
# delete maintenance
r = self.api.delete_maintenance(maintenance_id)
self.assertEqual(r["msg"], "Deleted Successfully.")
with self.assertRaises(UptimeKumaException):
self.api.get_maintenance(maintenance_id)
def test_maintenance_strategy_manual(self):
expected_maintenance = {
"title": "test",
"description": "test",
"strategy": MaintenanceStrategy.MANUAL,
"active": True,
"intervalDay": 1,
"dateRange": [
"2022-12-27 00:00:00"
],
"weekdays": [],
"daysOfMonth": []
}
self.do_test_maintenance_strategy(expected_maintenance)
def test_maintenance_strategy_single(self):
expected_maintenance = {
"title": "test",
"description": "test",
"strategy": MaintenanceStrategy.SINGLE,
"active": True,
"intervalDay": 1,
"dateRange": [
"2022-12-27 22:36:00",
"2022-12-29 22:36:00"
],
"weekdays": [],
"daysOfMonth": []
}
self.do_test_maintenance_strategy(expected_maintenance)
def test_maintenance_strategy_recurring_interval(self):
expected_maintenance = {
"title": "test",
"description": "test",
"strategy": MaintenanceStrategy.RECURRING_INTERVAL,
"active": True,
"intervalDay": 1,
"dateRange": [
"2022-12-27 22:37:00",
"2022-12-31 22:37:00"
],
"timeRange": [
{
"hours": 2,
"minutes": 0
},
{
"hours": 3,
"minutes": 0
}
],
"weekdays": [],
"daysOfMonth": []
}
self.do_test_maintenance_strategy(expected_maintenance)
def test_maintenance_strategy_recurring_weekday(self):
expected_maintenance = {
"title": "test",
"description": "test",
"strategy": MaintenanceStrategy.RECURRING_WEEKDAY,
"active": True,
"intervalDay": 1,
"dateRange": [
"2022-12-27 22:38:00",
"2022-12-31 22:38:00"
],
"timeRange": [
{
"hours": 2,
"minutes": 0
},
{
"hours": 3,
"minutes": 0
}
],
"weekdays": [
1,
3,
5,
0
],
"daysOfMonth": []
}
self.do_test_maintenance_strategy(expected_maintenance)
def test_maintenance_strategy_recurring_day_of_month(self):
expected_maintenance = {
"title": "test",
"description": "test",
"strategy": MaintenanceStrategy.RECURRING_DAY_OF_MONTH,
"active": True,
"intervalDay": 1,
"dateRange": [
"2022-12-27 22:39:00",
"2022-12-31 22:39:00"
],
"timeRange": [
{
"hours": 2,
"minutes": 0
},
{
"hours": 3,
"minutes": 0
}
],
"weekdays": [],
"daysOfMonth": [
1,
10,
20,
30,
"lastDay1"
]
}
self.do_test_maintenance_strategy(expected_maintenance)
def test_maintenance_strategy_cron(self):
expected_maintenance = {
"title": "test",
"description": "test",
"strategy": MaintenanceStrategy.CRON,
"active": True,
"intervalDay": 1,
"dateRange": [
"2022-12-27 22:37:00",
"2022-12-31 22:37:00"
],
"weekdays": [],
"daysOfMonth": [],
"cron": "50 5 * * *",
"durationMinutes": 120,
"timezoneOption": "Europe/Berlin"
}
self.do_test_maintenance_strategy(expected_maintenance)
def do_test_maintenance_strategy(self, expected_maintenance):
# add maintenance
r = self.api.add_maintenance(**expected_maintenance)
self.assertEqual(r["msg"], "Added Successfully.")
maintenance_id = r["maintenanceID"]
# get maintenance
maintenance = self.api.get_maintenance(maintenance_id)
self.compare(maintenance, expected_maintenance)
# get maintenances
maintenances = self.api.get_maintenances()
maintenance = self.find_by_id(maintenances, maintenance_id)
self.assertIsNotNone(maintenance)
self.compare(maintenance, expected_maintenance)
# edit maintenance
r = self.api.edit_maintenance(maintenance_id, title="name 2")
self.assertEqual(r["msg"], "Saved.")
maintenance = self.api.get_maintenance(maintenance_id)
expected_maintenance["title"] = "name 2"
self.compare(maintenance, expected_maintenance)
# delete maintenance
r = self.api.delete_maintenance(maintenance_id)
self.assertEqual(r["msg"], "Deleted Successfully.")
with self.assertRaises(UptimeKumaException):
self.api.get_maintenance(maintenance_id)
def test_delete_not_existing_maintenance(self):
with self.assertRaises(UptimeKumaException):
self.api.delete_maintenance(42)
if __name__ == '__main__':
unittest.main()

View file

@ -1,15 +1,28 @@
import unittest import unittest
from packaging.version import parse as parse_version
from uptime_kuma_api import UptimeKumaException, MonitorType from uptime_kuma_api import UptimeKumaException, MonitorType, AuthMethod, MonitorStatus
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
class TestMonitor(UptimeKumaTestCase): class TestMonitor(UptimeKumaTestCase):
def test_monitor(self): def test_monitor(self):
# get empty list to make sure that future accesses will also work
self.api.get_monitors()
notification_id_1 = self.add_notification()
notification_id_2 = self.add_notification()
expected_monitor = { expected_monitor = {
"type": MonitorType.HTTP, "type": MonitorType.HTTP,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1" "interval": 60,
"retryInterval": 60,
"maxretries": 0,
"notificationIDList": [notification_id_1, notification_id_2],
"upsideDown": False,
"url": "http://127.0.0.1",
"resendInterval": 0
} }
# add monitor # add monitor
@ -23,12 +36,16 @@ class TestMonitor(UptimeKumaTestCase):
# get monitors # get monitors
monitors = self.api.get_monitors() monitors = self.api.get_monitors()
self.assertTrue(type(monitors[0]["type"]) == MonitorType)
self.assertTrue(type(monitors[0]["authMethod"]) == AuthMethod)
monitor = self.find_by_id(monitors, monitor_id) monitor = self.find_by_id(monitors, monitor_id)
self.assertTrue(type(monitor["type"]) == MonitorType)
self.assertTrue(type(monitor["authMethod"]) == AuthMethod)
self.assertIsNotNone(monitor) self.assertIsNotNone(monitor)
self.compare(monitor, expected_monitor) self.compare(monitor, expected_monitor)
# edit monitor # edit monitor
expected_monitor["type"] = "ping" expected_monitor["type"] = MonitorType.PING
expected_monitor["name"] = "monitor 1 new" expected_monitor["name"] = "monitor 1 new"
expected_monitor["hostname"] = "127.0.0.10" expected_monitor["hostname"] = "127.0.0.10"
del expected_monitor["url"] del expected_monitor["url"]
@ -46,7 +63,8 @@ class TestMonitor(UptimeKumaTestCase):
self.assertEqual(r["msg"], "Resumed Successfully.") self.assertEqual(r["msg"], "Resumed Successfully.")
# get monitor beats # get monitor beats
self.api.get_monitor_beats(monitor_id, 6) r = self.api.get_monitor_beats(monitor_id, 6)
self.assertTrue(type(r[0]["status"]) == MonitorStatus)
# delete monitor # delete monitor
r = self.api.delete_monitor(monitor_id) r = self.api.delete_monitor(monitor_id)
@ -62,11 +80,68 @@ class TestMonitor(UptimeKumaTestCase):
monitor = self.api.get_monitor(monitor_id) monitor = self.api.get_monitor(monitor_id)
self.compare(monitor, expected_monitor) self.compare(monitor, expected_monitor)
expected_monitor.update({
"name": "monitor 2"
})
r = self.api.edit_monitor(monitor_id, **expected_monitor)
self.assertEqual(r["msg"], "Saved.")
monitor = self.api.get_monitor(monitor_id)
self.compare(monitor, expected_monitor)
monitor = self.api.get_monitor(monitor_id)
self.compare(monitor, expected_monitor)
return monitor
def test_monitor_type_http(self): def test_monitor_type_http(self):
proxy_id = self.add_proxy()
json_data = '{"key": "value"}'
expected_monitor = { expected_monitor = {
"type": MonitorType.HTTP, "type": MonitorType.HTTP,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1" "url": "http://127.0.0.1",
"expiryNotification": False,
"ignoreTls": False,
"maxredirects": 10,
"accepted_statuscodes": ["200-299"],
"proxyId": proxy_id,
"method": "GET",
"body": json_data,
"headers": json_data,
"authMethod": AuthMethod.NONE
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_auth_method(self):
for auth_method in [AuthMethod.HTTP_BASIC, AuthMethod.NTLM]:
expected_monitor = {
"type": MonitorType.HTTP,
"name": "monitor 1",
"url": "http://127.0.0.1",
"authMethod": auth_method,
"basic_auth_user": "auth user",
"basic_auth_pass": "auth pass",
}
self.do_test_monitor_type(expected_monitor)
expected_monitor = {
"type": MonitorType.HTTP,
"name": "monitor 1",
"url": "http://127.0.0.1",
"authMethod": AuthMethod.NTLM,
"authDomain": "auth domain",
"authWorkstation": "auth workstation",
}
self.do_test_monitor_type(expected_monitor)
expected_monitor = {
"type": MonitorType.HTTP,
"name": "monitor 1",
"url": "http://127.0.0.1",
"authMethod": AuthMethod.MTLS,
"tlsCert": "cert",
"tlsKey": "key",
"tlsCa": "ca",
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
@ -84,6 +159,7 @@ class TestMonitor(UptimeKumaTestCase):
"type": MonitorType.PING, "type": MonitorType.PING,
"name": "monitor 1", "name": "monitor 1",
"hostname": "127.0.0.1", "hostname": "127.0.0.1",
"packetSize": 56
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
@ -96,32 +172,66 @@ class TestMonitor(UptimeKumaTestCase):
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
def test_monitor_type_grpc_keyword(self):
expected_monitor = {
"type": MonitorType.GRPC_KEYWORD,
"name": "monitor 1",
"grpcUrl": "127.0.0.1",
"keyword": "healthy",
"grpcServiceName": "health",
"grpcMethod": "check",
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_dns(self): def test_monitor_type_dns(self):
expected_monitor = { expected_monitor = {
"type": MonitorType.DNS, "type": MonitorType.DNS,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1",
"hostname": "127.0.0.1", "hostname": "127.0.0.1",
"port": 8888, "port": 8888,
"dns_resolve_server": "1.1.1.1", "dns_resolve_server": "1.1.1.1",
"dns_resolve_type": "A"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_docker(self):
docker_host_id = self.add_docker_host()
expected_monitor = {
"type": MonitorType.DOCKER,
"name": "monitor 1",
"docker_container": "test",
"docker_host": docker_host_id
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
def test_monitor_type_push(self): def test_monitor_type_push(self):
expected_monitor = { expected_monitor = {
"type": MonitorType.PUSH, "type": MonitorType.PUSH,
"name": "monitor 1", "name": "monitor 1"
"url": "http://127.0.0.1"
} }
self.do_test_monitor_type(expected_monitor) monitor = self.do_test_monitor_type(expected_monitor)
# https://github.com/lucasheld/ansible-uptime-kuma/issues/5
self.assertIsNotNone(monitor["pushToken"])
def test_monitor_type_steam(self): def test_monitor_type_steam(self):
expected_monitor = { expected_monitor = {
"type": MonitorType.STEAM, "type": MonitorType.STEAM,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1", "hostname": "127.0.0.1",
"port": 8888
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_gamedig(self):
game_list = self.api.get_game_list()
game = game_list[0]["keys"][0]
expected_monitor = {
"type": MonitorType.GAMEDIG,
"name": "monitor 1",
"hostname": "127.0.0.1", "hostname": "127.0.0.1",
"port": 8888, "port": 8888,
"game": game
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
@ -129,10 +239,12 @@ class TestMonitor(UptimeKumaTestCase):
expected_monitor = { expected_monitor = {
"type": MonitorType.MQTT, "type": MonitorType.MQTT,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1",
"hostname": "127.0.0.1", "hostname": "127.0.0.1",
"port": 8888, "port": 8888,
"mqttTopic": "test" "mqttUsername": "mqtt username",
"mqttPassword": "mqtt password",
"mqttTopic": "mqtt topic",
"mqttSuccessMessage": "mqtt success message"
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
@ -140,13 +252,129 @@ class TestMonitor(UptimeKumaTestCase):
expected_monitor = { expected_monitor = {
"type": MonitorType.SQLSERVER, "type": MonitorType.SQLSERVER,
"name": "monitor 1", "name": "monitor 1",
"url": "http://127.0.0.1",
"databaseConnectionString": "Server=127.0.0.1,8888;Database=test;User Id=1;Password=secret123;Encrypt=true;" "databaseConnectionString": "Server=127.0.0.1,8888;Database=test;User Id=1;Password=secret123;Encrypt=true;"
"TrustServerCertificate=Yes;Connection Timeout=5", "TrustServerCertificate=Yes;Connection Timeout=5",
"databaseQuery": "select getdate()" "databaseQuery": "select getdate()"
} }
self.do_test_monitor_type(expected_monitor) self.do_test_monitor_type(expected_monitor)
def test_monitor_type_postgres(self):
expected_monitor = {
"type": MonitorType.POSTGRES,
"name": "monitor 1",
"databaseConnectionString": "postgres://username:password@host:port/database",
"databaseQuery": "select getdate()"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_mysql(self):
expected_monitor = {
"type": MonitorType.MYSQL,
"name": "monitor 1",
"databaseConnectionString": "mysql://username:password@host:port/database",
"databaseQuery": "select getdate()"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_mongodb(self):
expected_monitor = {
"type": MonitorType.MONGODB,
"name": "monitor 1",
"databaseConnectionString": "mongodb://username:password@host:port/database"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_radius(self):
expected_monitor = {
"type": MonitorType.RADIUS,
"name": "monitor 1",
"radiusUsername": "123",
"radiusPassword": "456",
"radiusSecret": "789",
"radiusCalledStationId": "1",
"radiusCallingStationId": "2"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_redis(self):
expected_monitor = {
"type": MonitorType.REDIS,
"name": "monitor 1",
"databaseConnectionString": "redis://user:password@host:port"
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_group(self):
if parse_version(self.api.version) < parse_version("1.22"):
self.skipTest("Unsupported in this Uptime Kuma version")
# create monitor group
expected_monitor = {
"type": MonitorType.GROUP,
"name": "monitor 1"
}
group_monitor = self.do_test_monitor_type(expected_monitor)
group_monitor_id = group_monitor["id"]
# use monitor group as parent for another monitor
expected_monitor = {
"type": MonitorType.PUSH,
"name": "monitor 1",
"parent": group_monitor_id
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_json_query(self):
if parse_version(self.api.version) < parse_version("1.23"):
self.skipTest("Unsupported in this Uptime Kuma version")
expected_monitor = {
"type": MonitorType.JSON_QUERY,
"name": "monitor 1",
"url": "http://127.0.0.1",
"jsonPath": "address.country",
"expectedValue": "germany",
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_real_browser(self):
if parse_version(self.api.version) < parse_version("1.23"):
self.skipTest("Unsupported in this Uptime Kuma version")
expected_monitor = {
"type": MonitorType.REAL_BROWSER,
"name": "monitor 1",
"url": "http://127.0.0.1",
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_kafka_producer(self):
if parse_version(self.api.version) < parse_version("1.23"):
self.skipTest("Unsupported in this Uptime Kuma version")
expected_monitor = {
"type": MonitorType.KAFKA_PRODUCER,
"name": "monitor 1",
"kafkaProducerTopic": "topic",
"kafkaProducerMessage": "message",
}
self.do_test_monitor_type(expected_monitor)
def test_monitor_type_tailscale_ping(self):
if parse_version(self.api.version) < parse_version("1.23"):
self.skipTest("Unsupported in this Uptime Kuma version")
expected_monitor = {
"type": MonitorType.TAILSCALE_PING,
"name": "monitor 1",
"hostname": "127.0.0.1"
}
self.do_test_monitor_type(expected_monitor)
def test_delete_not_existing_monitor(self):
with self.assertRaises(UptimeKumaException):
self.api.delete_monitor(42)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -1,5 +1,6 @@
import unittest import unittest
from uptime_kuma_api import UptimeKumaException
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
@ -18,10 +19,24 @@ class TestMonitorTag(UptimeKumaTestCase):
r = self.api.add_monitor_tag(**expected_monitor_tag) r = self.api.add_monitor_tag(**expected_monitor_tag)
self.assertEqual(r["msg"], "Added Successfully.") self.assertEqual(r["msg"], "Added Successfully.")
# check if tag is listed in monitor tags
monitors = self.api.get_monitors()
monitor = self.find_by_id(monitors, monitor_id)
self.assertEqual(monitor["tags"][0]["tag_id"], tag_id)
# delete monitor tag # delete monitor tag
r = self.api.delete_monitor_tag(**expected_monitor_tag) r = self.api.delete_monitor_tag(**expected_monitor_tag)
self.assertEqual(r["msg"], "Deleted Successfully.") self.assertEqual(r["msg"], "Deleted Successfully.")
# check if tag is not listed in monitor tags
monitors = self.api.get_monitors()
monitor = self.find_by_id(monitors, monitor_id)
self.assertEqual(monitor["tags"], [])
def test_delete_not_existing_monitor_tag(self):
with self.assertRaises(UptimeKumaException):
self.api.delete_monitor_tag(42, 42, 42)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -1,21 +1,25 @@
import unittest import unittest
from uptime_kuma_api import UptimeKumaException from uptime_kuma_api import UptimeKumaException, NotificationType
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
class TestNotification(UptimeKumaTestCase): class TestNotification(UptimeKumaTestCase):
def test_notification(self): def test_notification(self):
# get empty list to make sure that future accesses will also work
self.api.get_notifications()
expected_notification = { expected_notification = {
"name": "notification 1", "name": "notification 1",
"default": True, "isDefault": True,
"applyExisting": True, "applyExisting": True,
"type": "PushByTechulus", "type": NotificationType.TELEGRAM,
"pushAPIKey": "123456789" "telegramChatID": "123456789",
"telegramBotToken": "987654321"
} }
# test notification # test notification
with self.assertRaisesRegex(UptimeKumaException, r'Invalid API key'): with self.assertRaisesRegex(UptimeKumaException, r'Not Found'):
self.api.test_notification(**expected_notification) self.api.test_notification(**expected_notification)
# add notification # add notification
@ -29,7 +33,9 @@ class TestNotification(UptimeKumaTestCase):
# get notifications # get notifications
notifications = self.api.get_notifications() notifications = self.api.get_notifications()
self.assertTrue(type(notifications[0]["type"]) == NotificationType)
notification = self.find_by_id(notifications, notification_id) notification = self.find_by_id(notifications, notification_id)
self.assertTrue(type(notification["type"]) == NotificationType)
self.assertIsNotNone(notification) self.assertIsNotNone(notification)
self.compare(notification, expected_notification) self.compare(notification, expected_notification)
@ -37,9 +43,10 @@ class TestNotification(UptimeKumaTestCase):
expected_notification["name"] = "notification 1 new" expected_notification["name"] = "notification 1 new"
expected_notification["default"] = False expected_notification["default"] = False
expected_notification["applyExisting"] = False expected_notification["applyExisting"] = False
expected_notification["type"] = "PushDeer" expected_notification["type"] = NotificationType.PUSHDEER
expected_notification["pushdeerKey"] = "987654321" expected_notification["pushdeerKey"] = "987654321"
del expected_notification["pushAPIKey"] del expected_notification["telegramChatID"]
del expected_notification["telegramBotToken"]
r = self.api.edit_notification(notification_id, **expected_notification) r = self.api.edit_notification(notification_id, **expected_notification)
self.assertEqual(r["msg"], "Saved") self.assertEqual(r["msg"], "Saved")
notification = self.api.get_notification(notification_id) notification = self.api.get_notification(notification_id)
@ -52,6 +59,10 @@ class TestNotification(UptimeKumaTestCase):
with self.assertRaises(UptimeKumaException): with self.assertRaises(UptimeKumaException):
self.api.delete_notification(notification_id) self.api.delete_notification(notification_id)
def test_delete_not_existing_notification(self):
with self.assertRaises(UptimeKumaException):
self.api.delete_notification(42)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -1,20 +1,27 @@
import unittest import unittest
from uptime_kuma_api import UptimeKumaException from uptime_kuma_api import UptimeKumaException, ProxyProtocol
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
class TestProxy(UptimeKumaTestCase): class TestProxy(UptimeKumaTestCase):
def test_proxy(self): def test_proxy(self):
# get empty list to make sure that future accesses will also work
self.api.get_proxies()
expected_proxy = { expected_proxy = {
"protocol": "http", "protocol": ProxyProtocol.HTTP,
"host": "127.0.0.1", "host": "127.0.0.1",
"port": 8080, "port": 8080,
"active": True "auth": True,
"username": "username",
"password": "password",
"active": True,
"default": False
} }
# add proxy # add proxy
r = self.api.add_proxy(**expected_proxy) r = self.api.add_proxy(applyExisting=False, **expected_proxy)
self.assertEqual(r["msg"], "Saved") self.assertEqual(r["msg"], "Saved")
proxy_id = r["id"] proxy_id = r["id"]
@ -24,12 +31,13 @@ class TestProxy(UptimeKumaTestCase):
# get proxies # get proxies
proxies = self.api.get_proxies() proxies = self.api.get_proxies()
self.assertTrue(type(proxies[0]["protocol"]) == ProxyProtocol)
proxy = self.find_by_id(proxies, proxy_id) proxy = self.find_by_id(proxies, proxy_id)
self.assertIsNotNone(proxy) self.assertIsNotNone(proxy)
self.compare(proxy, expected_proxy) self.compare(proxy, expected_proxy)
# edit proxy # edit proxy
expected_proxy["protocol"] = "https" expected_proxy["protocol"] = ProxyProtocol.HTTPS
expected_proxy["host"] = "127.0.0.2" expected_proxy["host"] = "127.0.0.2"
expected_proxy["port"] = 8888 expected_proxy["port"] = 8888
r = self.api.edit_proxy(proxy_id, **expected_proxy) r = self.api.edit_proxy(proxy_id, **expected_proxy)
@ -43,6 +51,10 @@ class TestProxy(UptimeKumaTestCase):
with self.assertRaises(UptimeKumaException): with self.assertRaises(UptimeKumaException):
self.api.get_proxy(proxy_id) self.api.get_proxy(proxy_id)
def test_delete_not_existing_proxy(self):
with self.assertRaises(UptimeKumaException):
self.api.delete_proxy(42)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -1,18 +1,37 @@
import unittest
import json import json
import unittest
from uptime_kuma_test_case import UptimeKumaTestCase from uptime_kuma_test_case import UptimeKumaTestCase
class TestSettings(UptimeKumaTestCase): class TestSettings(UptimeKumaTestCase):
def test_settings(self): def test_settings(self):
settings_before = self.api.get_settings() expected_settings = {
"checkUpdate": False,
"checkBeta": False,
"keepDataPeriodDays": 180,
"entryPage": "dashboard",
"searchEngineIndex": False,
"primaryBaseURL": "",
"steamAPIKey": "",
"tlsExpiryNotifyDays": [7, 14, 21],
"disableAuth": False,
"trustProxy": False,
"serverTimezone": "Europe/Berlin",
"dnsCache": True
}
expected_check_update = not settings_before.get("checkUpdate", True) # set settings
self.api.set_settings(self.password, checkUpdate=expected_check_update) r = self.api.set_settings(self.password, **expected_settings)
self.assertEqual(r["msg"], "Saved")
# set settings without password
r = self.api.set_settings(**expected_settings)
self.assertEqual(r["msg"], "Saved")
# get settings
settings = self.api.get_settings() settings = self.api.get_settings()
self.assertEqual(settings["checkUpdate"], expected_check_update) self.compare(settings, expected_settings)
def test_change_password(self): def test_change_password(self):
new_password = "321terces" new_password = "321terces"

View file

@ -6,6 +6,9 @@ from uptime_kuma_test_case import UptimeKumaTestCase
class TestStatusPage(UptimeKumaTestCase): class TestStatusPage(UptimeKumaTestCase):
def test_status_page(self): def test_status_page(self):
# get empty list to make sure that future accesses will also work
self.api.get_status_pages()
monitor_id = self.add_monitor() monitor_id = self.add_monitor()
slug = "slug1" slug = "slug1"
@ -13,7 +16,14 @@ class TestStatusPage(UptimeKumaTestCase):
"slug": slug, "slug": slug,
"title": "status page 1", "title": "status page 1",
"description": "description 1", "description": "description 1",
"theme": "light",
"published": True,
"showTags": False,
"domainNameList": [],
"customCSS": "",
"footerText": None,
"showPoweredBy": False, "showPoweredBy": False,
"icon": "/icon.svg",
"publicGroupList": [ "publicGroupList": [
{ {
'name': 'Services', 'name': 'Services',
@ -24,15 +34,10 @@ class TestStatusPage(UptimeKumaTestCase):
} }
] ]
} }
] ],
"googleAnalyticsId": ""
} }
# slug must be unique
try:
self.api.delete_status_page(slug)
except UptimeKumaException:
pass
# add status page # add status page
r = self.api.add_status_page(slug, expected_status_page["title"]) r = self.api.add_status_page(slug, expected_status_page["title"])
self.assertEqual(r["msg"], "OK!") self.assertEqual(r["msg"], "OK!")
@ -66,9 +71,11 @@ class TestStatusPage(UptimeKumaTestCase):
"style": IncidentStyle.DANGER "style": IncidentStyle.DANGER
} }
incident = self.api.post_incident(slug, **incident_expected) incident = self.api.post_incident(slug, **incident_expected)
self.assertTrue(type(incident["style"]) == IncidentStyle)
self.compare(incident, incident_expected) self.compare(incident, incident_expected)
status_page = self.api.get_status_page(slug) status_page = self.api.get_status_page(slug)
self.compare(status_page["incident"], incident) self.compare(status_page["incident"], incident)
self.assertTrue(type(status_page["incident"]["style"]) == IncidentStyle)
# unpin incident # unpin incident
self.api.unpin_incident(slug) self.api.unpin_incident(slug)
@ -80,6 +87,14 @@ class TestStatusPage(UptimeKumaTestCase):
with self.assertRaises(UptimeKumaException): with self.assertRaises(UptimeKumaException):
self.api.get_status_page(slug) self.api.get_status_page(slug)
status_pages = self.api.get_status_pages()
status_page = self.find_by_id(status_pages, slug, "slug")
self.assertIsNone(status_page)
def test_delete_not_existing_status_page(self):
with self.assertRaises(UptimeKumaException):
self.api.delete_status_page("slug42")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -26,12 +26,24 @@ class TestTag(UptimeKumaTestCase):
self.assertIsNotNone(tag) self.assertIsNotNone(tag)
self.compare(tag, expected_tag) self.compare(tag, expected_tag)
# edit tag
expected_tag["name"] = "tag 1 new"
expected_tag["color"] = "#000000"
r = self.api.edit_tag(tag_id, **expected_tag)
self.assertEqual(r["msg"], "Saved")
tag = self.api.get_tag(tag_id)
self.compare(tag, expected_tag)
# delete tag # delete tag
r = self.api.delete_tag(tag_id) r = self.api.delete_tag(tag_id)
self.assertEqual(r["msg"], "Deleted Successfully.") self.assertEqual(r["msg"], "Deleted Successfully.")
with self.assertRaises(UptimeKumaException): with self.assertRaises(UptimeKumaException):
self.api.get_tag(tag_id) self.api.get_tag(tag_id)
def test_delete_not_existing_tag(self):
with self.assertRaises(UptimeKumaException):
self.api.delete_tag(42)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -1,7 +1,7 @@
import json
import unittest import unittest
import warnings
from uptime_kuma_api import UptimeKumaApi, Event, MonitorType from uptime_kuma_api import UptimeKumaApi, MonitorType, DockerType
token = None token = None
@ -11,7 +11,13 @@ def compare(subset, superset):
value2 = superset.get(key) value2 = superset.get(key)
if type(value) == list: if type(value) == list:
for i in range(len(value)): for i in range(len(value)):
if not value2 or not compare(value[i], value2[i]): if not value2:
return False
elif type(value[i]) == list or type(value[i]) == dict:
if not compare(value[i], value2[i]):
return False
else:
if value[i] != value2[i]:
return False return False
elif type(value) == dict: elif type(value) == dict:
if not compare(value, value2): if not compare(value, value2):
@ -29,7 +35,9 @@ class UptimeKumaTestCase(unittest.TestCase):
password = "secret123" password = "secret123"
def setUp(self): def setUp(self):
self.api = UptimeKumaApi(self.url) warnings.simplefilter("ignore", ResourceWarning)
self.api = UptimeKumaApi(self.url, timeout=1, wait_events=0.01)
global token global token
if not token: if not token:
@ -40,17 +48,50 @@ class UptimeKumaTestCase(unittest.TestCase):
self.api.login_by_token(token) self.api.login_by_token(token)
data = { # delete monitors
"version": "1.17.1", monitors = self.api.get_monitors()
"notificationList": [], for monitor in monitors:
"monitorList": [], self.api.delete_monitor(monitor["id"])
"proxyList": []
}
data_str = json.dumps(data)
r = self.api.upload_backup(data_str, "overwrite")
self.assertEqual(r["msg"], "Backup successfully restored.")
self.api._event_data[Event.MONITOR_LIST] = {} # delete notifications
notifications = self.api.get_notifications()
for notification in notifications:
self.api.delete_notification(notification["id"])
# delete proxies
proxies = self.api.get_proxies()
for proxy in proxies:
self.api.delete_proxy(proxy["id"])
# delete tags
tags = self.api.get_tags()
for tag in tags:
self.api.delete_tag(tag["id"])
# delete status pages
status_pages = self.api.get_status_pages()
for status_page in status_pages:
self.api.delete_status_page(status_page["slug"])
# delete docker hosts
docker_hosts = self.api.get_docker_hosts()
for docker_host in docker_hosts:
self.api.delete_docker_host(docker_host["id"])
# delete maintenances
maintenances = self.api.get_maintenances()
for maintenance in maintenances:
self.api.delete_maintenance(maintenance["id"])
# delete api keys
api_keys = self.api.get_api_keys()
for api_key in api_keys:
self.api.delete_api_key(api_key["id"])
# login again to receive initial messages
self.api.disconnect()
self.api = UptimeKumaApi(self.url)
self.api.login_by_token(token)
def tearDown(self): def tearDown(self):
self.api.disconnect() self.api.disconnect()
@ -63,12 +104,52 @@ class UptimeKumaTestCase(unittest.TestCase):
if obj[key] == value: if obj[key] == value:
return obj return obj
def add_monitor(self): def add_monitor(self, name="monitor 1"):
r = self.api.add_monitor(type=MonitorType.HTTP, name="monitor 1", url="http://127.0.0.1") r = self.api.add_monitor(
type=MonitorType.HTTP,
name=name,
url="http://127.0.0.1"
)
monitor_id = r["monitorID"] monitor_id = r["monitorID"]
return monitor_id return monitor_id
def add_tag(self): def add_tag(self):
r = self.api.add_tag(name="tag 1", color="#ffffff") r = self.api.add_tag(
name="tag 1",
color="#ffffff"
)
tag_id = r["id"] tag_id = r["id"]
return tag_id return tag_id
def add_notification(self):
r = self.api.add_notification(
name="notification 1",
type="PushByTechulus",
pushAPIKey="123456789"
)
notification_id = r["id"]
return notification_id
def add_proxy(self):
r = self.api.add_proxy(
protocol="http",
host="127.0.0.1",
port=8080,
active=True
)
proxy_id = r["id"]
return proxy_id
def add_docker_host(self):
r = self.api.add_docker_host(
name="docker host 1",
dockerType=DockerType.SOCKET
)
docker_host_id = r["id"]
return docker_host_id
def add_status_page(self, title="status page 1"):
slug = "statuspage1"
self.api.add_status_page(slug, title)
r = self.api.get_status_page(slug)
return r["id"]

View file

@ -1,9 +1,12 @@
from .__version__ import __title__, __version__, __author__, __copyright__ from .__version__ import __title__, __version__, __author__, __copyright__
from .auth_method import AuthMethod from .auth_method import AuthMethod
from .monitor_status import MonitorStatus
from .monitor_type import MonitorType from .monitor_type import MonitorType
from .notification_providers import NotificationType, notification_provider_options from .notification_providers import NotificationType, notification_provider_options, notification_provider_conditions
from .proxy_protocol import ProxyProtocol from .proxy_protocol import ProxyProtocol
from .incident_style import IncidentStyle from .incident_style import IncidentStyle
from .exceptions import UptimeKumaException from .docker_type import DockerType
from .maintenance_strategy import MaintenanceStrategy
from .exceptions import UptimeKumaException, Timeout
from .event import Event from .event import Event
from .api import UptimeKumaApi from .api import UptimeKumaApi

View file

@ -1,5 +1,5 @@
__title__ = "uptime_kuma_api" __title__ = "uptime_kuma_api"
__version__ = "0.1.1" __version__ = "1.2.1"
__author__ = "Lucas Held" __author__ = "Lucas Held"
__license__ = "MIT" __license__ = "MIT"
__copyright__ = "Copyright 2022 Lucas Held" __copyright__ = "Copyright 2023 Lucas Held"

File diff suppressed because one or more lines are too long

View file

@ -2,6 +2,19 @@ from enum import Enum
class AuthMethod(str, Enum): class AuthMethod(str, Enum):
"""Enumerate authentication methods for monitors."""
NONE = "" NONE = ""
"""Authentication is disabled."""
HTTP_BASIC = "basic" HTTP_BASIC = "basic"
"""HTTP Basic Authentication."""
NTLM = "ntlm" NTLM = "ntlm"
"""NTLM Authentication."""
MTLS = "mtls"
"""mTLS Authentication."""
OAUTH2_CC = "oauth2-cc"
"""OAuth2: Client Credentials"""

View file

@ -0,0 +1,11 @@
from enum import Enum
class DockerType(str, Enum):
"""Enumerate docker connection types."""
SOCKET = "socket"
"""Socket"""
TCP = "tcp"
"""TCP"""

View file

@ -0,0 +1,387 @@
def append_docstring(value):
def _doc(func):
# inserts the value into the existing docstring before the :return: line
split_value = ":return:"
splitted = func.__doc__.split(split_value)
part1 = splitted[0]
line = [i for i in part1.split("\n") if i][0]
indent = len(line) - len(line.lstrip())
line_start = " " * indent
part2 = split_value + line_start.join(splitted[1:])
func.__doc__ = part1 + "\n" + line_start + value + "\n" + line_start + part2
return func
return _doc
def monitor_docstring(mode) -> str:
return f"""
:param MonitorType{", optional" if mode == "edit" else ""} type: Monitor Type
:param str{", optional" if mode == "edit" else ""} name: Friendly Name
:param str, optional parent: Id of the monitor group, defaults to None
:param str, optional description: Description, defaults to None
:param int, optional interval: Heartbeat Interval, defaults to 60
:param int, optional retryInterval: Retry every X seconds, defaults to 60
:param int, optional resendInterval: Resend every X times, defaults to 0
:param int, optional maxretries: Retries. Maximum retries before the service is marked as down and a notification is sent., defaults to 0
:param bool, optional upsideDown: Upside Down Mode. Flip the status upside down. If the service is reachable, it is DOWN., defaults to False
:param list, optional notificationIDList: Notifications, defaults to None
:param str, optional url: URL, defaults to None
:param bool, optional expiryNotification: Certificate Expiry Notification, defaults to False
:param bool, optional ignoreTls: Ignore TLS/SSL error for HTTPS websites, defaults to False
:param int, optional maxredirects: Max. Redirects. Maximum number of redirects to follow. Set to 0 to disable redirects., defaults to 10
:param list, optional accepted_statuscodes: Accepted Status Codes. Select status codes which are considered as a successful response., defaults to None
:param int, optional proxyId: Proxy, defaults to None
:param str, optional method: Method, defaults to "GET"
:param str, optional httpBodyEncoding: Body Encoding, defaults to "json". Allowed values: "json", "xml".
:param str, optional body: Body, defaults to None
:param str, optional headers: Headers, defaults to None
:param AuthMethod, optional authMethod: Method, defaults to :attr:`~.AuthMethod.NONE`
:param str, optional tlsCert: Cert for ``authMethod`` :attr:`~.AuthMethod.MTLS`, defaults to None.
:param str, optional tlsKey: Key for ``authMethod`` :attr:`~.AuthMethod.MTLS`, defaults to None.
:param str, optional tlsCa: Ca for ``authMethod`` :attr:`~.AuthMethod.MTLS`, defaults to None.
:param str, optional basic_auth_user: Username for ``authMethod`` :attr:`~.AuthMethod.HTTP_BASIC` and :attr:`~.AuthMethod.NTLM`, defaults to None
:param str, optional basic_auth_pass: Password for ``authMethod`` :attr:`~.AuthMethod.HTTP_BASIC` and :attr:`~.AuthMethod.NTLM`, defaults to None
:param str, optional authDomain: Domain for ``authMethod`` :attr:`~.AuthMethod.NTLM`, defaults to None
:param str, optional authWorkstation: Workstation for ``authMethod`` :attr:`~.AuthMethod.NTLM`, defaults to None
:param str, optional oauth_auth_method: Authentication Method, defaults to None
:param str, optional oauth_token_url: OAuth Token URL, defaults to None
:param str, optional oauth_client_id: Client ID, defaults to None
:param str, optional oauth_client_secret: Client Secret, defaults to None
:param str, optional oauth_scopes: OAuth Scope, defaults to None
:param int, optional timeout: Request Timeout, defaults to None
:param str, optional keyword: Keyword. Search keyword in plain HTML or JSON response. The search is case-sensitive., defaults to None
:param bool, optional invertKeyword: Invert Keyword. Look for the keyword to be absent rather than present., defaults to False
:param str, optional hostname: Hostname, defaults to None
:param int, optional packetSize: Packet Size, defaults to None
:param int, optional port: Port, ``type`` :attr:`~.MonitorType.DNS` defaults to ``53`` and ``type`` :attr:`~.MonitorType.RADIUS` defaults to ``1812``
:param str, optional dns_resolve_server: Resolver Server, defaults to "1.1.1.1"
:param str, optional dns_resolve_type: Resource Record Type, defaults to "A". Available values are:
- "A"
- "AAAA"
- "CAA"
- "CNAME"
- "MX"
- "NS"
- "PTR"
- "SOA"
- "SRV"
- "TXT"
:param str, optional mqttUsername: MQTT Username, defaults to None
:param str, optional mqttPassword: MQTT Password, defaults to None
:param str, optional mqttTopic: MQTT Topic, defaults to None
:param str, optional mqttSuccessMessage: MQTT Success Message, defaults to None
:param str, optional databaseConnectionString: Connection String, defaults to None
:param str, optional databaseQuery: Query, defaults to None
:param str, optional docker_container: Container Name / ID, defaults to ""
:param int, optional docker_host: Docker Host, defaults to None
:param str, optional radiusUsername: Radius Username, defaults to None
:param str, optional radiusPassword: Radius Password, defaults to None
:param str, optional radiusSecret: Radius Secret. Shared Secret between client and server., defaults to None
:param str, optional radiusCalledStationId: Called Station Id. Identifier of the called device., defaults to None
:param str, optional radiusCallingStationId: Calling Station Id. Identifier of the calling device., defaults to None
:param str, optional game: Game, defaults to None
:param bool, optional gamedigGivenPortOnly: Gamedig: Guess Port. The port used by Valve Server Query Protocol may be different from the client port. Try this if the monitor cannot connect to your server., defaults to False
:param str, optional jsonPath: Json Query, defaults to None
:param str, optional expectedValue: Expected Value, defaults to None
:param str, optional kafkaProducerBrokers: Kafka Broker list, defaults to None
:param str, optional kafkaProducerTopic: Kafka Topic Name, defaults to None
:param str, optional kafkaProducerMessage: Kafka Producer Message, defaults to None
:param bool, optional kafkaProducerSsl: Enable Kafka SSL, defaults to False
:param bool, optional kafkaProducerAllowAutoTopicCreation: Enable Kafka Producer Auto Topic Creation, defaults to False
:param dict, optional kafkaProducerSaslOptions: Kafka SASL Options
- **mechanism** (*str*, *optional*): Mechanism, defaults to "None". Available values are:
- "None"
- "plain"
- "scram-sha-256"
- "scram-sha-512"
- "aws"
- **username** (*str*, *optional*): Username, defaults to None
- **password** (*str*, *optional*): Password, defaults to None
- **authorizationIdentity** (*str*, *optional*): Authorization Identity, defaults to None
- **accessKeyId** (*str*, *optional*): AccessKey Id, defaults to None
- **secretAccessKey** (*str*, *optional*): Secret AccessKey, defaults to None
- **sessionToken** (*str*, *optional*): Session Token, defaults to None
"""
def notification_docstring(mode) -> str:
return f"""
:param str{", optional" if mode == "edit" else ""} name: Friendly Name
:param NotificationType{", optional" if mode == "edit" else ""} type: Notification Type
:param bool, optional isDefault: Default enabled. This notification will be enabled by default for new monitors. You can still disable the notification separately for each monitor., defaults to False
:param bool, optional applyExisting: Apply on all existing monitors, defaults to False
:param str, optional alertaApiEndpoint: Notification option for ``type`` :attr:`~.NotificationType.ALERTA`.
:param str, optional alertaApiKey: Notification option for ``type`` :attr:`~.NotificationType.ALERTA`.
:param str, optional alertaEnvironment: Notification option for ``type`` :attr:`~.NotificationType.ALERTA`.
:param str, optional alertaAlertState: Notification option for ``type`` :attr:`~.NotificationType.ALERTA`.
:param str, optional alertaRecoverState: Notification option for ``type`` :attr:`~.NotificationType.ALERTA`.
:param str, optional alertNowWebhookURL: Notification option for ``type`` :attr:`~.NotificationType.ALERTNOW`.
:param str, optional phonenumber: Notification option for ``type`` :attr:`~.NotificationType.ALIYUNSMS`.
:param str, optional templateCode: Notification option for ``type`` :attr:`~.NotificationType.ALIYUNSMS`.
:param str, optional signName: Notification option for ``type`` :attr:`~.NotificationType.ALIYUNSMS`.
:param str, optional accessKeyId: Notification option for ``type`` :attr:`~.NotificationType.ALIYUNSMS`.
:param str, optional secretAccessKey: Notification option for ``type`` :attr:`~.NotificationType.ALIYUNSMS`.
:param str, optional appriseURL: Notification option for ``type`` :attr:`~.NotificationType.APPRISE`.
:param str title: Notification option for ``type`` :attr:`~.NotificationType.APPRISE`.
:param str, optional barkEndpoint: Notification option for ``type`` :attr:`~.NotificationType.BARK`.
:param str, optional barkGroup: Notification option for ``type`` :attr:`~.NotificationType.BARK`.
:param str, optional barkSound: Notification option for ``type`` :attr:`~.NotificationType.BARK`.
:param str, optional clicksendsmsLogin: Notification option for ``type`` :attr:`~.NotificationType.CLICKSENDSMS`.
:param str, optional clicksendsmsPassword: Notification option for ``type`` :attr:`~.NotificationType.CLICKSENDSMS`.
:param str, optional clicksendsmsToNumber: Notification option for ``type`` :attr:`~.NotificationType.CLICKSENDSMS`.
:param str clicksendsmsSenderName: Notification option for ``type`` :attr:`~.NotificationType.CLICKSENDSMS`.
:param str, optional webHookUrl: Notification option for ``type`` :attr:`~.NotificationType.DINGDING`.
:param str, optional secretKey: Notification option for ``type`` :attr:`~.NotificationType.DINGDING`.
:param str discordUsername: Notification option for ``type`` :attr:`~.NotificationType.DISCORD`.
:param str, optional discordWebhookUrl: Notification option for ``type`` :attr:`~.NotificationType.DISCORD`.
:param str discordPrefixMessage: Notification option for ``type`` :attr:`~.NotificationType.DISCORD`.
:param str, optional feishuWebHookUrl: Notification option for ``type`` :attr:`~.NotificationType.FEISHU`.
:param str, optional flashdutySeverity: Notification option for ``type`` :attr:`~.NotificationType.FLASHDUTY`.
:param str flashdutyIntegrationKey: Notification option for ``type`` :attr:`~.NotificationType.FLASHDUTY`.
:param str, optional freemobileUser: Notification option for ``type`` :attr:`~.NotificationType.FREEMOBILE`.
:param str, optional freemobilePass: Notification option for ``type`` :attr:`~.NotificationType.FREEMOBILE`.
:param str, optional goAlertBaseURL: Notification option for ``type`` :attr:`~.NotificationType.GOALERT`.
:param str, optional goAlertToken: Notification option for ``type`` :attr:`~.NotificationType.GOALERT`.
:param str, optional googleChatWebhookURL: Notification option for ``type`` :attr:`~.NotificationType.GOOGLECHAT`.
:param str, optional gorushDeviceToken: Notification option for ``type`` :attr:`~.NotificationType.GORUSH`.
:param str gorushPlatform: Notification option for ``type`` :attr:`~.NotificationType.GORUSH`.
:param str gorushTitle: Notification option for ``type`` :attr:`~.NotificationType.GORUSH`.
:param str gorushPriority: Notification option for ``type`` :attr:`~.NotificationType.GORUSH`.
:param int gorushRetry: Notification option for ``type`` :attr:`~.NotificationType.GORUSH`.
:param str gorushTopic: Notification option for ``type`` :attr:`~.NotificationType.GORUSH`.
:param str, optional gorushServerURL: Notification option for ``type`` :attr:`~.NotificationType.GORUSH`.
:param str, optional gotifyserverurl: Notification option for ``type`` :attr:`~.NotificationType.GOTIFY`.
:param str, optional gotifyapplicationToken: Notification option for ``type`` :attr:`~.NotificationType.GOTIFY`.
:param int, optional gotifyPriority: Notification option for ``type`` :attr:`~.NotificationType.GOTIFY`.
:param str notificationService: Notification option for ``type`` :attr:`~.NotificationType.HOMEASSISTANT`.
:param str, optional homeAssistantUrl: Notification option for ``type`` :attr:`~.NotificationType.HOMEASSISTANT`.
:param str, optional longLivedAccessToken: Notification option for ``type`` :attr:`~.NotificationType.HOMEASSISTANT`.
:param str, optional kookGuildID: Notification option for ``type`` :attr:`~.NotificationType.KOOK`.
:param str, optional kookBotToken: Notification option for ``type`` :attr:`~.NotificationType.KOOK`.
:param str, optional lineChannelAccessToken: Notification option for ``type`` :attr:`~.NotificationType.LINE`.
:param str, optional lineUserID: Notification option for ``type`` :attr:`~.NotificationType.LINE`.
:param str, optional lineNotifyAccessToken: Notification option for ``type`` :attr:`~.NotificationType.LINENOTIFY`.
:param str, optional lunaseaTarget: Notification option for ``type`` :attr:`~.NotificationType.LUNASEA`. Allowed values: "device", "user".
:param str lunaseaUserID: Notification option for ``type`` :attr:`~.NotificationType.LUNASEA`. User ID.
:param str lunaseaDevice: Notification option for ``type`` :attr:`~.NotificationType.LUNASEA`. Device ID.
:param str, optional internalRoomId: Notification option for ``type`` :attr:`~.NotificationType.MATRIX`.
:param str, optional accessToken: Notification option for ``type`` :attr:`~.NotificationType.MATRIX`.
:param str, optional homeserverUrl: Notification option for ``type`` :attr:`~.NotificationType.MATRIX`.
:param str mattermostusername: Notification option for ``type`` :attr:`~.NotificationType.MATTERMOST`.
:param str, optional mattermostWebhookUrl: Notification option for ``type`` :attr:`~.NotificationType.MATTERMOST`.
:param str mattermostchannel: Notification option for ``type`` :attr:`~.NotificationType.MATTERMOST`.
:param str mattermosticonemo: Notification option for ``type`` :attr:`~.NotificationType.MATTERMOST`.
:param str mattermosticonurl: Notification option for ``type`` :attr:`~.NotificationType.MATTERMOST`.
:param str, optional sender: Notification option for ``type`` :attr:`~.NotificationType.NOSTR`.
:param str, optional recipients: Notification option for ``type`` :attr:`~.NotificationType.NOSTR`.
:param str, optional relays: Notification option for ``type`` :attr:`~.NotificationType.NOSTR`.
:param str ntfyAuthenticationMethod: Notification option for ``type`` :attr:`~.NotificationType.NTFY`. Authentication Method.
:param str ntfyusername: Notification option for ``type`` :attr:`~.NotificationType.NTFY`.
:param str ntfypassword: Notification option for ``type`` :attr:`~.NotificationType.NTFY`.
:param str ntfyaccesstoken: Notification option for ``type`` :attr:`~.NotificationType.NTFY`. Access Token.
:param str, optional ntfytopic: Notification option for ``type`` :attr:`~.NotificationType.NTFY`.
:param int, optional ntfyPriority: Notification option for ``type`` :attr:`~.NotificationType.NTFY`.
:param str, optional ntfyserverurl: Notification option for ``type`` :attr:`~.NotificationType.NTFY`.
:param str ntfyIcon: Notification option for ``type`` :attr:`~.NotificationType.NTFY`.
:param str octopushVersion: Notification option for ``type`` :attr:`~.NotificationType.OCTOPUSH`.
:param str, optional octopushAPIKey: Notification option for ``type`` :attr:`~.NotificationType.OCTOPUSH`.
:param str, optional octopushLogin: Notification option for ``type`` :attr:`~.NotificationType.OCTOPUSH`.
:param str, optional octopushPhoneNumber: Notification option for ``type`` :attr:`~.NotificationType.OCTOPUSH`.
:param str octopushSMSType: Notification option for ``type`` :attr:`~.NotificationType.OCTOPUSH`.
:param str octopushSenderName: Notification option for ``type`` :attr:`~.NotificationType.OCTOPUSH`.
:param str, optional httpAddr: Notification option for ``type`` :attr:`~.NotificationType.ONEBOT`.
:param str, optional accessToken: Notification option for ``type`` :attr:`~.NotificationType.ONEBOT`.
:param str msgType: Notification option for ``type`` :attr:`~.NotificationType.ONEBOT`.
:param str, optional recieverId: Notification option for ``type`` :attr:`~.NotificationType.ONEBOT`.
:param int opsgeniePriority: Notification option for ``type`` :attr:`~.NotificationType.OPSGENIE`. Priority. Available values are numbers between ``1`` and ``5``.
:param str, optional opsgenieRegion: Notification option for ``type`` :attr:`~.NotificationType.OPSGENIE`. Region. Available values are:
- ``us``: US (Default)
- ``eu``: EU
:param str, optional opsgenieApiKey: Notification option for ``type`` :attr:`~.NotificationType.OPSGENIE`. API Key.
:param str pagerdutyAutoResolve: Notification option for ``type`` :attr:`~.NotificationType.PAGERDUTY`.
:param str pagerdutyIntegrationUrl: Notification option for ``type`` :attr:`~.NotificationType.PAGERDUTY`.
:param str pagerdutyPriority: Notification option for ``type`` :attr:`~.NotificationType.PAGERDUTY`.
:param str, optional pagerdutyIntegrationKey: Notification option for ``type`` :attr:`~.NotificationType.PAGERDUTY`.
:param str pagertreeAutoResolve: Notification option for ``type`` :attr:`~.NotificationType.PAGERTREE`.
Available values are:
- ``0``: Do Nothing
- ``resolve``: Auto Resolve
:param str pagertreeIntegrationUrl: Notification option for ``type`` :attr:`~.NotificationType.PAGERTREE`.
:param str pagertreeUrgency: Notification option for ``type`` :attr:`~.NotificationType.PAGERTREE`.
Available values are:
- ``silent``: Silent
- ``low``: Low
- ``medium``: Medium
- ``high``: High
- ``critical``: Critical
:param bool promosmsAllowLongSMS: Notification option for ``type`` :attr:`~.NotificationType.PROMOSMS`. Allow long SMS.
:param str, optional promosmsLogin: Notification option for ``type`` :attr:`~.NotificationType.PROMOSMS`.
:param str, optional promosmsPassword: Notification option for ``type`` :attr:`~.NotificationType.PROMOSMS`.
:param str, optional promosmsPhoneNumber: Notification option for ``type`` :attr:`~.NotificationType.PROMOSMS`. Phone number (for Polish recipient You can skip area codes).
:param str promosmsSMSType: Notification option for ``type`` :attr:`~.NotificationType.PROMOSMS`.
Available values are:
- ``0``: SMS FLASH - Message will automatically show on recipient device. Limited only to Polish recipients.
- ``1``: SMS ECO - cheap but slow and often overloaded. Limited only to Polish recipients.
- ``3``: SMS FULL - Premium tier of SMS, You can use your Sender Name (You need to register name first). Reliable for alerts.
- ``4``: SMS SPEED - Highest priority in system. Very quick and reliable but costly (about twice of SMS FULL price).
:param str promosmsSenderName: Notification option for ``type`` :attr:`~.NotificationType.PROMOSMS`.
:param str, optional pushbulletAccessToken: Notification option for ``type`` :attr:`~.NotificationType.PUSHBULLET`.
:param str pushdeerServer: Notification option for ``type`` :attr:`~.NotificationType.PUSHDEER`.
:param str, optional pushdeerKey: Notification option for ``type`` :attr:`~.NotificationType.PUSHDEER`.
:param str, optional pushoveruserkey: Notification option for ``type`` :attr:`~.NotificationType.PUSHOVER`.
:param str, optional pushoverapptoken: Notification option for ``type`` :attr:`~.NotificationType.PUSHOVER`.
:param str pushoversounds: Notification option for ``type`` :attr:`~.NotificationType.PUSHOVER`.
:param str pushoverpriority: Notification option for ``type`` :attr:`~.NotificationType.PUSHOVER`.
:param str pushovertitle: Notification option for ``type`` :attr:`~.NotificationType.PUSHOVER`.
:param str pushoverdevice: Notification option for ``type`` :attr:`~.NotificationType.PUSHOVER`.
:param int pushoverttl: Notification option for ``type`` :attr:`~.NotificationType.PUSHOVER`. Message TTL (Seconds).
:param str, optional pushyAPIKey: Notification option for ``type`` :attr:`~.NotificationType.PUSHY`.
:param str, optional pushyToken: Notification option for ``type`` :attr:`~.NotificationType.PUSHY`.
:param str rocketchannel: Notification option for ``type`` :attr:`~.NotificationType.ROCKET_CHAT`.
:param str rocketusername: Notification option for ``type`` :attr:`~.NotificationType.ROCKET_CHAT`.
:param str rocketiconemo: Notification option for ``type`` :attr:`~.NotificationType.ROCKET_CHAT`.
:param str, optional rocketwebhookURL: Notification option for ``type`` :attr:`~.NotificationType.ROCKET_CHAT`.
:param str, optional serverChanSendKey: Notification option for ``type`` :attr:`~.NotificationType.SERVERCHAN`.
:param str, optional serwersmsUsername: Notification option for ``type`` :attr:`~.NotificationType.SERWERSMS`.
:param str, optional serwersmsPassword: Notification option for ``type`` :attr:`~.NotificationType.SERWERSMS`.
:param str, optional serwersmsPhoneNumber: Notification option for ``type`` :attr:`~.NotificationType.SERWERSMS`.
:param str serwersmsSenderName: Notification option for ``type`` :attr:`~.NotificationType.SERWERSMS`.
:param str, optional signalNumber: Notification option for ``type`` :attr:`~.NotificationType.SIGNAL`.
:param str, optional signalRecipients: Notification option for ``type`` :attr:`~.NotificationType.SIGNAL`.
:param str, optional signalURL: Notification option for ``type`` :attr:`~.NotificationType.SIGNAL`.
:param bool slackchannelnotify: Notification option for ``type`` :attr:`~.NotificationType.SLACK`.
:param str slackchannel: Notification option for ``type`` :attr:`~.NotificationType.SLACK`.
:param str slackusername: Notification option for ``type`` :attr:`~.NotificationType.SLACK`.
:param str slackiconemo: Notification option for ``type`` :attr:`~.NotificationType.SLACK`.
:param str, optional slackwebhookURL: Notification option for ``type`` :attr:`~.NotificationType.SLACK`.
:param str smscTranslit: Notification option for ``type`` :attr:`~.NotificationType.SMSC`.
:param str, optional smscLogin: Notification option for ``type`` :attr:`~.NotificationType.SMSC`.
:param str, optional smscPassword: Notification option for ``type`` :attr:`~.NotificationType.SMSC`.
:param str, optional smscToNumber: Notification option for ``type`` :attr:`~.NotificationType.SMSC`.
:param str smscSenderName: Notification option for ``type`` :attr:`~.NotificationType.SMSC`.
:param bool smseagleEncoding: Notification option for ``type`` :attr:`~.NotificationType.SMSEAGLE`. True to send messages in unicode.
:param int smseaglePriority: Notification option for ``type`` :attr:`~.NotificationType.SMSEAGLE`. Message priority (0-9, default = 0).
:param str smseagleRecipientType: Notification option for ``type`` :attr:`~.NotificationType.SMSEAGLE`. Recipient type.
Available values are:
- ``smseagle-to``: Phone number(s)
- ``smseagle-group``: Phonebook group name(s)
- ``smseagle-contact``: Phonebook contact name(s)
:param str, optional smseagleToken: Notification option for ``type`` :attr:`~.NotificationType.SMSEAGLE`. API Access token.
:param str, optional smseagleRecipient: Notification option for ``type`` :attr:`~.NotificationType.SMSEAGLE`. Recipient(s) (multiple must be separated with comma).
:param str, optional smseagleUrl: Notification option for ``type`` :attr:`~.NotificationType.SMSEAGLE`. Your SMSEagle device URL.
:param str smsmanagerApiKey: Notification option for ``type`` :attr:`~.NotificationType.SMSMANAGER`.
:param str numbers: Notification option for ``type`` :attr:`~.NotificationType.SMSMANAGER`.
:param str messageType: Notification option for ``type`` :attr:`~.NotificationType.SMSMANAGER`.
:param str, optional smtpHost: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param int, optional smtpPort: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpSecure: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param bool smtpIgnoreTLSError: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpDkimDomain: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpDkimKeySelector: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpDkimPrivateKey: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpDkimHashAlgo: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpDkimheaderFieldNames: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpDkimskipFields: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpUsername: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpPassword: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str customSubject: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str, optional smtpFrom: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpCC: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpBCC: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str smtpTo: Notification option for ``type`` :attr:`~.NotificationType.SMTP`.
:param str splunkAutoResolve: Notification option for ``type`` :attr:`~.NotificationType.SPLUNK`. Auto resolve or acknowledged.
Available values are:
- ``0``: do nothing
- ``ACKNOWLEDGEMENT``: auto acknowledged
- ``RECOVERY``: auto resolve
:param str splunkSeverity: Notification option for ``type`` :attr:`~.NotificationType.SPLUNK`. Severity.
Available values are:
- ``INFO``
- ``WARNING``
- ``CRITICAL``
:param str, optional splunkRestURL: Notification option for ``type`` :attr:`~.NotificationType.SPLUNK`. Splunk Rest URL.
:param str, optional squadcastWebhookURL: Notification option for ``type`` :attr:`~.NotificationType.SQUADCAST`.
:param str, optional stackfieldwebhookURL: Notification option for ``type`` :attr:`~.NotificationType.STACKFIELD`.
:param str, optional webhookUrl: Notification option for ``type`` :attr:`~.NotificationType.TEAMS`.
:param str, optional pushAPIKey: Notification option for ``type`` :attr:`~.NotificationType.PUSHBYTECHULUS`.
:param str, optional telegramChatID: Notification option for ``type`` :attr:`~.NotificationType.TELEGRAM`.
:param bool telegramSendSilently: Notification option for ``type`` :attr:`~.NotificationType.TELEGRAM`.
:param bool telegramProtectContent: Notification option for ``type`` :attr:`~.NotificationType.TELEGRAM`.
:param str telegramMessageThreadID: Notification option for ``type`` :attr:`~.NotificationType.TELEGRAM`.
:param str, optional telegramBotToken: Notification option for ``type`` :attr:`~.NotificationType.TELEGRAM`.
:param str, optional twilioAccountSID: Notification option for ``type`` :attr:`~.NotificationType.TWILIO`. Account SID.
:param str twilioApiKey: Notification option for ``type`` :attr:`~.NotificationType.TWILIO`.
:param str, optional twilioAuthToken: Notification option for ``type`` :attr:`~.NotificationType.TWILIO`. Auth Token.
:param str, optional twilioToNumber: Notification option for ``type`` :attr:`~.NotificationType.TWILIO`. To Number.
:param str, optional twilioFromNumber: Notification option for ``type`` :attr:`~.NotificationType.TWILIO`. From Number.
:param str, optional webhookContentType: Notification option for ``type`` :attr:`~.NotificationType.WEBHOOK`.
:param str webhookCustomBody: Notification option for ``type`` :attr:`~.NotificationType.WEBHOOK`.
:param str webhookAdditionalHeaders: Notification option for ``type`` :attr:`~.NotificationType.WEBHOOK`.
:param str, optional webhookURL: Notification option for ``type`` :attr:`~.NotificationType.WEBHOOK`.
:param str, optional weComBotKey: Notification option for ``type`` :attr:`~.NotificationType.WECOM`.
:param str, optional webhookUrl: Notification option for ``type`` :attr:`~.NotificationType.ZOHOCLIQ`.
"""
def proxy_docstring(mode) -> str:
return f"""
:param ProxyProtocol{", optional" if mode == "edit" else ""} protocol: Proxy Protocol
:param str{", optional" if mode == "edit" else ""} host: Proxy Server
:param str{", optional" if mode == "edit" else ""} port: Port
:param bool, optional auth: Proxy server has authentication, defaults to False
:param str, optional username: User, defaults to None
:param str, optional password: Password, defaults to None
:param bool, optional active: Enabled. This proxy will not effect on monitor requests until it is activated. You can control temporarily disable the proxy from all monitors by activation status., defaults to True
:param bool, optional default: Set As Default. This proxy will be enabled by default for new monitors. You can still disable the proxy separately for each monitor., , defaults to False
:param bool, optional applyExisting: Apply on all existing monitors, defaults to False
"""
def docker_host_docstring(mode) -> str:
return f"""
:param str{", optional" if mode == "edit" else ""} name: Friendly Name
:param DockerType{", optional" if mode == "edit" else ""} dockerType: Connection Type
:param str, optional dockerDaemon: Docker Daemon, defaults to None
"""
def maintenance_docstring(mode) -> str:
return f"""
:param str{", optional" if mode == "edit" else ""} title: Title
:param MaintenanceStrategy{", optional" if mode == "edit" else ""} strategy: Strategy
:param bool, optional active: True if maintenance is active, defaults to ``True``
:param str, optional description: Description, defaults to ``""``
:param list, optional dateRange: DateTime Range, defaults to ``["<current date>"]``
:param int, optional intervalDay: Interval (Run once every day), defaults to ``1``
:param list, optional weekdays: List that contains the days of the week on which the maintenance is enabled (Sun = ``0``, Mon = ``1``, ..., Sat = ``6``). Required for ``strategy`` :attr:`~.MaintenanceStrategy.RECURRING_WEEKDAY`., defaults to ``[]``.
:param list, optional daysOfMonth: List that contains the days of the month on which the maintenance is enabled (Day 1 = ``1``, Day 2 = ``2``, ..., Day 31 = ``31``) and the last day of the month (``"lastDay1"``). Required for ``strategy`` :attr:`~.MaintenanceStrategy.RECURRING_DAY_OF_MONTH`., defaults to ``[]``.
:param list, optional timeRange: Maintenance Time Window of a Day, defaults to ``[{{"hours": 2, "minutes": 0}}, {{"hours": 3, "minutes": 0}}]``.
:param str, optional cron: Cron Schedule. Required for ``strategy`` :attr:`~.MaintenanceStrategy.CRON`., defaults to ``"30 3 * * *"``
:param int, optional durationMinutes: Duration (Minutes). Required for ``strategy`` :attr:`~.MaintenanceStrategy.CRON`., defaults to ``60``
:param str, optional timezone: Timezone, defaults to ``None`` (Server Timezone)
"""
def tag_docstring(mode) -> str:
return f"""
:param str{", optional" if mode == "edit" else ""} name: Tag name
:param str{", optional" if mode == "edit" else ""} color: Tag color
"""

View file

@ -15,3 +15,8 @@ class Event(str, Enum):
HEARTBEAT = "heartbeat" HEARTBEAT = "heartbeat"
INFO = "info" INFO = "info"
CERT_INFO = "certInfo" CERT_INFO = "certInfo"
DOCKER_HOST_LIST = "dockerHostList"
AUTO_LOGIN = "autoLogin"
INIT_SERVER_TIMEZONE = "initServerTimezone"
MAINTENANCE_LIST = "maintenanceList"
API_KEY_LIST = "apiKeyList"

View file

@ -1,2 +1,10 @@
class UptimeKumaException(Exception): class UptimeKumaException(Exception):
pass """
There was an exception that occurred while communicating with Uptime Kuma.
"""
class Timeout(UptimeKumaException):
"""
A timeout has occurred while communicating with Uptime Kuma.
"""

View file

@ -2,9 +2,22 @@ from enum import Enum
class IncidentStyle(str, Enum): class IncidentStyle(str, Enum):
"""Enumerate incident styles."""
INFO = "info" INFO = "info"
"""Info"""
WARNING = "warning" WARNING = "warning"
"""Warning"""
DANGER = "danger" DANGER = "danger"
"""Danger"""
PRIMARY = "primary" PRIMARY = "primary"
"""Primary"""
LIGHT = "light" LIGHT = "light"
"""Light"""
DARK = "dark" DARK = "dark"
"""Dark"""

View file

@ -0,0 +1,23 @@
from enum import Enum
class MaintenanceStrategy(str, Enum):
"""Enumerate maintenance strategies."""
MANUAL = "manual"
"""Active/Inactive Manually"""
SINGLE = "single"
"""Single Maintenance Window"""
RECURRING_INTERVAL = "recurring-interval"
"""Recurring - Interval"""
RECURRING_WEEKDAY = "recurring-weekday"
"""Recurring - Day of Week"""
RECURRING_DAY_OF_MONTH = "recurring-day-of-month"
"""Recurring - Day of Month"""
CRON = "cron"
"""Cron Expression"""

View file

@ -0,0 +1,17 @@
from enum import Enum
class MonitorStatus(int, Enum):
"""Enumerate monitor statuses."""
DOWN = 0
"""DOWN"""
UP = 1
"""UP"""
PENDING = 2
"""PENDING"""
MAINTENANCE = 3
"""MAINTENANCE"""

View file

@ -2,12 +2,70 @@ from enum import Enum
class MonitorType(str, Enum): class MonitorType(str, Enum):
"""Enumerate monitor types."""
GROUP = "group"
"""Group"""
HTTP = "http" HTTP = "http"
"""HTTP(s)"""
PORT = "port" PORT = "port"
"""TCP Port"""
PING = "ping" PING = "ping"
"""Ping"""
KEYWORD = "keyword" KEYWORD = "keyword"
"""HTTP(s) - Keyword"""
JSON_QUERY = "json-query"
"""HTTP(s) - Json Query"""
GRPC_KEYWORD = "grpc-keyword"
"""gRPC(s) - Keyword"""
DNS = "dns" DNS = "dns"
"""DNS"""
DOCKER = "docker"
"""Docker Container"""
REAL_BROWSER = "real-browser"
"""HTTP(s) - Browser Engine (Chrome/Chromium)"""
PUSH = "push" PUSH = "push"
"""Push"""
STEAM = "steam" STEAM = "steam"
"""Steam Game Server"""
GAMEDIG = "gamedig"
"""GameDig"""
MQTT = "mqtt" MQTT = "mqtt"
"""MQTT"""
KAFKA_PRODUCER = "kafka-producer"
"""Kafka Producer"""
SQLSERVER = "sqlserver" SQLSERVER = "sqlserver"
"""Microsoft SQL Server"""
POSTGRES = "postgres"
"""PostgreSQL"""
MYSQL = "mysql"
"""MySQL/MariaDB"""
MONGODB = "mongodb"
"""MongoDB"""
RADIUS = "radius"
"""Radius"""
REDIS = "redis"
"""Redis"""
TAILSCALE_PING = "tailscale-ping"
"""Tailscale Ping"""

View file

@ -2,237 +2,481 @@ from enum import Enum
class NotificationType(str, Enum): class NotificationType(str, Enum):
"""Enumerate notification types."""
ALERTA = "alerta" ALERTA = "alerta"
"""Alerta"""
ALERTNOW = "AlertNow"
"""AlertNow"""
ALIYUNSMS = "AliyunSMS" ALIYUNSMS = "AliyunSMS"
"""AliyunSMS"""
APPRISE = "apprise" APPRISE = "apprise"
"""Apprise (Support 50+ Notification services)"""
BARK = "Bark" BARK = "Bark"
"""Bark"""
CLICKSENDSMS = "clicksendsms" CLICKSENDSMS = "clicksendsms"
"""ClickSend SMS"""
DINGDING = "DingDing" DINGDING = "DingDing"
"""DingDing"""
DISCORD = "discord" DISCORD = "discord"
"""Discord"""
FEISHU = "Feishu" FEISHU = "Feishu"
"""Feishu"""
FLASHDUTY = "FlashDuty"
"""FlashDuty"""
FREEMOBILE = "FreeMobile"
"""FreeMobile (mobile.free.fr)"""
GOALERT = "GoAlert"
"""GoAlert"""
GOOGLECHAT = "GoogleChat" GOOGLECHAT = "GoogleChat"
"""Google Chat (Google Workspace)"""
GORUSH = "gorush" GORUSH = "gorush"
"""Gorush"""
GOTIFY = "gotify" GOTIFY = "gotify"
"""Gotify"""
HOMEASSISTANT = "HomeAssistant"
"""Home Assistant"""
KOOK = "Kook"
"""Kook"""
LINE = "line" LINE = "line"
"""LINE Messenger"""
LINENOTIFY = "LineNotify"
"""LINE Notify"""
LUNASEA = "lunasea" LUNASEA = "lunasea"
"""LunaSea"""
MATRIX = "matrix" MATRIX = "matrix"
"""Matrix"""
MATTERMOST = "mattermost" MATTERMOST = "mattermost"
"""Mattermost"""
NOSTR = "nostr"
"""Nostr"""
NTFY = "ntfy" NTFY = "ntfy"
"""Ntfy"""
OCTOPUSH = "octopush" OCTOPUSH = "octopush"
"""Octopush"""
ONEBOT = "OneBot" ONEBOT = "OneBot"
"""OneBot"""
OPSGENIE = "Opsgenie"
"""Opsgenie"""
PAGERDUTY = "PagerDuty" PAGERDUTY = "PagerDuty"
"""PagerDuty"""
PAGERTREE = "PagerTree"
"""PagerTree"""
PROMOSMS = "promosms" PROMOSMS = "promosms"
"""PromoSMS"""
PUSHBULLET = "pushbullet" PUSHBULLET = "pushbullet"
"""Pushbullet"""
PUSHDEER = "PushDeer" PUSHDEER = "PushDeer"
"""PushDeer"""
PUSHOVER = "pushover" PUSHOVER = "pushover"
"""Pushover"""
PUSHY = "pushy" PUSHY = "pushy"
"""Pushy"""
ROCKET_CHAT = "rocket.chat" ROCKET_CHAT = "rocket.chat"
"""Rocket.Chat"""
SERVERCHAN = "ServerChan"
"""ServerChan"""
SERWERSMS = "serwersms" SERWERSMS = "serwersms"
"""SerwerSMS.pl"""
SIGNAL = "signal" SIGNAL = "signal"
"""Signal"""
SLACK = "slack" SLACK = "slack"
"""Slack"""
SMSC = "smsc"
"""SMSC"""
SMSEAGLE = "SMSEagle"
"""SMSEagle"""
SMSMANAGER = "SMSManager"
"""SmsManager (smsmanager.cz)"""
SMTP = "smtp" SMTP = "smtp"
"""Email (SMTP)"""
SPLUNK = "Splunk"
"""Splunk"""
SQUADCAST = "squadcast"
"""SquadCast"""
STACKFIELD = "stackfield" STACKFIELD = "stackfield"
"""Stackfield"""
TEAMS = "teams" TEAMS = "teams"
"""Microsoft Teams"""
PUSHBYTECHULUS = "PushByTechulus" PUSHBYTECHULUS = "PushByTechulus"
"""Push by Techulus"""
TELEGRAM = "telegram" TELEGRAM = "telegram"
"""Telegram"""
TWILIO = "twilio"
"""Twilio"""
WEBHOOK = "webhook" WEBHOOK = "webhook"
"""Webhook"""
WECOM = "WeCom" WECOM = "WeCom"
"""WeCom"""
ZOHOCLIQ = "ZohoCliq"
"""ZohoCliq"""
notification_provider_options = { notification_provider_options = {
NotificationType.ALERTA: [ NotificationType.ALERTA: dict(
"alertaApiEndpoint", alertaApiEndpoint=dict(type="str", required=True),
"alertaApiKey", alertaApiKey=dict(type="str", required=True),
"alertaEnvironment", alertaEnvironment=dict(type="str", required=True),
"alertaAlertState", alertaAlertState=dict(type="str", required=True),
"alertaRecoverState", alertaRecoverState=dict(type="str", required=True),
], ),
NotificationType.ALIYUNSMS: [ NotificationType.ALERTNOW: dict(
"phonenumber", alertNowWebhookURL=dict(type="str", required=True),
"templateCode", ),
"signName", NotificationType.ALIYUNSMS: dict(
"accessKeyId", phonenumber=dict(type="str", required=True),
"secretAccessKey", templateCode=dict(type="str", required=True),
], signName=dict(type="str", required=True),
NotificationType.APPRISE: [ accessKeyId=dict(type="str", required=True),
"appriseURL", secretAccessKey=dict(type="str", required=True),
"title", ),
], NotificationType.APPRISE: dict(
NotificationType.BARK: [ appriseURL=dict(type="str", required=True),
"barkEndpoint", title=dict(type="str", required=False),
], ),
NotificationType.CLICKSENDSMS: [ NotificationType.BARK: dict(
"clicksendsmsLogin", barkEndpoint=dict(type="str", required=True),
"clicksendsmsPassword", barkGroup=dict(type="str", required=True),
"clicksendsmsToNumber", barkSound=dict(type="str", required=True),
"clicksendsmsSenderName", ),
], NotificationType.CLICKSENDSMS: dict(
NotificationType.DINGDING: [ clicksendsmsLogin=dict(type="str", required=True),
"webHookUrl", clicksendsmsPassword=dict(type="str", required=True),
"secretKey", clicksendsmsToNumber=dict(type="str", required=True),
], clicksendsmsSenderName=dict(type="str", required=False),
NotificationType.DISCORD: [ ),
"discordUsername", NotificationType.DINGDING: dict(
"discordWebhookUrl", webHookUrl=dict(type="str", required=True),
"discordPrefixMessage", secretKey=dict(type="str", required=True),
], ),
NotificationType.FEISHU: [ NotificationType.DISCORD: dict(
"feishuWebHookUrl", discordUsername=dict(type="str", required=False),
], discordWebhookUrl=dict(type="str", required=True),
NotificationType.GOOGLECHAT: [ discordPrefixMessage=dict(type="str", required=False),
"googleChatWebhookURL", ),
], NotificationType.FEISHU: dict(
NotificationType.GORUSH: [ feishuWebHookUrl=dict(type="str", required=True),
"gorushDeviceToken", ),
"gorushPlatform", NotificationType.FLASHDUTY: dict(
"gorushTitle", flashdutySeverity=dict(type="str", required=True),
"gorushPriority", flashdutyIntegrationKey=dict(type="str", required=False),
"gorushRetry", ),
"gorushTopic", NotificationType.FREEMOBILE: dict(
"gorushServerURL", freemobileUser=dict(type="str", required=True),
], freemobilePass=dict(type="str", required=True),
NotificationType.GOTIFY: [ ),
"gotifyserverurl", NotificationType.GOALERT: dict(
"gotifyapplicationToken", goAlertBaseURL=dict(type="str", required=True),
"gotifyPriority", goAlertToken=dict(type="str", required=True),
], ),
NotificationType.LINE: [ NotificationType.GOOGLECHAT: dict(
"lineChannelAccessToken", googleChatWebhookURL=dict(type="str", required=True),
"lineUserID", ),
], NotificationType.GORUSH: dict(
NotificationType.LUNASEA: [ gorushDeviceToken=dict(type="str", required=True),
"lunaseaDevice", gorushPlatform=dict(type="str", required=False),
], gorushTitle=dict(type="str", required=False),
NotificationType.MATRIX: [ gorushPriority=dict(type="str", required=False),
"internalRoomId", gorushRetry=dict(type="int", required=False),
"accessToken", gorushTopic=dict(type="str", required=False),
"homeserverUrl", gorushServerURL=dict(type="str", required=True),
], ),
NotificationType.MATTERMOST: [ NotificationType.GOTIFY: dict(
"mattermostusername", gotifyserverurl=dict(type="str", required=True),
"mattermostWebhookUrl", gotifyapplicationToken=dict(type="str", required=True),
"mattermostchannel", gotifyPriority=dict(type="int", required=True),
"mattermosticonemo", ),
"mattermosticonurl", NotificationType.HOMEASSISTANT: dict(
], notificationService=dict(type="str", required=False),
NotificationType.NTFY: [ homeAssistantUrl=dict(type="str", required=True),
"ntfyserverurl", longLivedAccessToken=dict(type="str", required=True),
"ntfytopic", ),
"ntfyPriority", NotificationType.KOOK: dict(
], kookGuildID=dict(type="str", required=True),
NotificationType.OCTOPUSH: [ kookBotToken=dict(type="str", required=True),
"octopushVersion", ),
"octopushAPIKey", NotificationType.LINE: dict(
"octopushLogin", lineChannelAccessToken=dict(type="str", required=True),
"octopushPhoneNumber", lineUserID=dict(type="str", required=True),
"octopushSMSType", ),
"octopushSenderName", NotificationType.LINENOTIFY: dict(
"octopushDMLogin", lineNotifyAccessToken=dict(type="str", required=True),
"octopushDMAPIKey", ),
"octopushDMPhoneNumber", NotificationType.LUNASEA: dict(
"octopushDMSenderName", lunaseaTarget=dict(type="str", required=True),
"octopushDMSMSType", lunaseaUserID=dict(type="str", required=False),
], lunaseaDevice=dict(type="str", required=False),
NotificationType.ONEBOT: [ ),
"httpAddr", NotificationType.MATRIX: dict(
"accessToken", internalRoomId=dict(type="str", required=True),
"msgType", accessToken=dict(type="str", required=True),
"recieverId", homeserverUrl=dict(type="str", required=True),
], ),
NotificationType.PAGERDUTY: [ NotificationType.MATTERMOST: dict(
"pagerdutyAutoResolve", mattermostusername=dict(type="str", required=False),
"pagerdutyIntegrationUrl", mattermostWebhookUrl=dict(type="str", required=True),
"pagerdutyPriority", mattermostchannel=dict(type="str", required=False),
"pagerdutyIntegrationKey", mattermosticonemo=dict(type="str", required=False),
], mattermosticonurl=dict(type="str", required=False),
NotificationType.PROMOSMS: [ ),
"promosmsLogin", NotificationType.NOSTR: dict(
"promosmsPassword", sender=dict(type="str", required=True),
"promosmsPhoneNumber", recipients=dict(type="str", required=True),
"promosmsSMSType", relays=dict(type="str", required=True),
"promosmsSenderName", ),
], NotificationType.NTFY: dict(
NotificationType.PUSHBULLET: [ ntfyAuthenticationMethod=dict(type="str", required=False),
"pushbulletAccessToken", ntfyusername=dict(type="str", required=False),
], ntfypassword=dict(type="str", required=False),
NotificationType.PUSHDEER: [ ntfyaccesstoken=dict(type="str", required=False),
"pushdeerKey", ntfytopic=dict(type="str", required=True),
], ntfyPriority=dict(type="int", required=True),
NotificationType.PUSHOVER: [ ntfyserverurl=dict(type="str", required=True),
"pushoveruserkey", ntfyIcon=dict(type="str", required=False),
"pushoverapptoken", ),
"pushoversounds", NotificationType.OCTOPUSH: dict(
"pushoverpriority", octopushVersion=dict(type="str", required=False),
"pushovertitle", octopushAPIKey=dict(type="str", required=True),
"pushoverdevice", octopushLogin=dict(type="str", required=True),
], octopushPhoneNumber=dict(type="str", required=True),
NotificationType.PUSHY: [ octopushSMSType=dict(type="str", required=False),
"pushyAPIKey", octopushSenderName=dict(type="str", required=False),
"pushyToken", ),
], NotificationType.ONEBOT: dict(
NotificationType.ROCKET_CHAT: [ httpAddr=dict(type="str", required=True),
"rocketchannel", accessToken=dict(type="str", required=True),
"rocketusername", msgType=dict(type="str", required=False),
"rocketiconemo", recieverId=dict(type="str", required=True),
"rocketwebhookURL", ),
"rocketbutton", NotificationType.OPSGENIE: dict(
], opsgeniePriority=dict(type="int", required=False),
NotificationType.SERWERSMS: [ opsgenieRegion=dict(type="str", required=True),
"serwersmsUsername", opsgenieApiKey=dict(type="str", required=True),
"serwersmsPassword", ),
"serwersmsPhoneNumber", NotificationType.PAGERDUTY: dict(
"serwersmsSenderName", pagerdutyAutoResolve=dict(type="str", required=False),
], pagerdutyIntegrationUrl=dict(type="str", required=False),
NotificationType.SIGNAL: [ pagerdutyPriority=dict(type="str", required=False),
"signalNumber", pagerdutyIntegrationKey=dict(type="str", required=True),
"signalRecipients", ),
"signalURL", NotificationType.PAGERTREE: dict(
], pagertreeAutoResolve=dict(type="str", required=False),
NotificationType.SLACK: [ pagertreeIntegrationUrl=dict(type="str", required=False),
"slackbutton", pagertreeUrgency=dict(type="str", required=False),
"slackchannel", ),
"slackusername", NotificationType.PROMOSMS: dict(
"slackiconemo", promosmsAllowLongSMS=dict(type="bool", required=False),
"slackwebhookURL", promosmsLogin=dict(type="str", required=True),
"slackbutton", promosmsPassword=dict(type="str", required=True),
], promosmsPhoneNumber=dict(type="str", required=True),
NotificationType.SMTP: [ promosmsSMSType=dict(type="str", required=False),
"smtpHost", promosmsSenderName=dict(type="str", required=False),
"smtpPort", ),
"smtpSecure", NotificationType.PUSHBULLET: dict(
"smtpIgnoreTLSError", pushbulletAccessToken=dict(type="str", required=True),
"smtpDkimDomain", ),
"smtpDkimKeySelector", NotificationType.PUSHDEER: dict(
"smtpDkimPrivateKey", pushdeerServer=dict(type="str", required=False),
"smtpDkimHashAlgo", pushdeerKey=dict(type="str", required=True),
"smtpDkimheaderFieldNames", ),
"smtpDkimskipFields", NotificationType.PUSHOVER: dict(
"smtpUsername", pushoveruserkey=dict(type="str", required=True),
"smtpPassword", pushoverapptoken=dict(type="str", required=True),
"customSubject", pushoversounds=dict(type="str", required=False),
"smtpFrom", pushoverpriority=dict(type="str", required=False),
"smtpCC", pushovertitle=dict(type="str", required=False),
"smtpBCC", pushoverdevice=dict(type="str", required=False),
"smtpTo", pushoverttl=dict(type="int", required=False),
], ),
NotificationType.STACKFIELD: [ NotificationType.PUSHY: dict(
"stackfieldwebhookURL", pushyAPIKey=dict(type="str", required=True),
], pushyToken=dict(type="str", required=True),
NotificationType.TEAMS: [ ),
"webhookUrl", NotificationType.ROCKET_CHAT: dict(
], rocketchannel=dict(type="str", required=False),
NotificationType.PUSHBYTECHULUS: [ rocketusername=dict(type="str", required=False),
"pushAPIKey", rocketiconemo=dict(type="str", required=False),
], rocketwebhookURL=dict(type="str", required=True),
NotificationType.TELEGRAM: [ ),
"telegramBotToken", NotificationType.SERVERCHAN: dict(
"telegramChatID", serverChanSendKey=dict(type="str", required=True),
], ),
NotificationType.WEBHOOK: [ NotificationType.SERWERSMS: dict(
"webhookContentType", serwersmsUsername=dict(type="str", required=True),
"webhookURL", serwersmsPassword=dict(type="str", required=True),
], serwersmsPhoneNumber=dict(type="str", required=True),
NotificationType.WECOM: [ serwersmsSenderName=dict(type="str", required=False),
"weComBotKey", ),
], NotificationType.SIGNAL: dict(
signalNumber=dict(type="str", required=True),
signalRecipients=dict(type="str", required=True),
signalURL=dict(type="str", required=True),
),
NotificationType.SLACK: dict(
slackchannelnotify=dict(type="bool", required=False),
slackchannel=dict(type="str", required=False),
slackusername=dict(type="str", required=False),
slackiconemo=dict(type="str", required=False),
slackwebhookURL=dict(type="str", required=True),
),
NotificationType.SMSC: dict(
smscTranslit=dict(type="str", required=False),
smscLogin=dict(type="str", required=True),
smscPassword=dict(type="str", required=True),
smscToNumber=dict(type="str", required=True),
smscSenderName=dict(type="str", required=False),
),
NotificationType.SMSEAGLE: dict(
smseagleEncoding=dict(type="bool", required=False),
smseaglePriority=dict(type="int", required=False),
smseagleRecipientType=dict(type="str", required=False),
smseagleToken=dict(type="str", required=True),
smseagleRecipient=dict(type="str", required=True),
smseagleUrl=dict(type="str", required=True),
),
NotificationType.SMSMANAGER: dict(
smsmanagerApiKey=dict(type="str", required=False),
numbers=dict(type="str", required=False),
messageType=dict(type="str", required=False),
),
NotificationType.SMTP: dict(
smtpHost=dict(type="str", required=True),
smtpPort=dict(type="int", required=True),
smtpSecure=dict(type="str", required=False),
smtpIgnoreTLSError=dict(type="bool", required=False),
smtpDkimDomain=dict(type="str", required=False),
smtpDkimKeySelector=dict(type="str", required=False),
smtpDkimPrivateKey=dict(type="str", required=False),
smtpDkimHashAlgo=dict(type="str", required=False),
smtpDkimheaderFieldNames=dict(type="str", required=False),
smtpDkimskipFields=dict(type="str", required=False),
smtpUsername=dict(type="str", required=False),
smtpPassword=dict(type="str", required=False),
customSubject=dict(type="str", required=False),
smtpFrom=dict(type="str", required=True),
smtpCC=dict(type="str", required=False),
smtpBCC=dict(type="str", required=False),
smtpTo=dict(type="str", required=False),
),
NotificationType.SPLUNK: dict(
splunkAutoResolve=dict(type="str", required=False),
splunkSeverity=dict(type="str", required=False),
splunkRestURL=dict(type="str", required=True),
),
NotificationType.SQUADCAST: dict(
squadcastWebhookURL=dict(type="str", required=True),
),
NotificationType.STACKFIELD: dict(
stackfieldwebhookURL=dict(type="str", required=True),
),
NotificationType.TEAMS: dict(
webhookUrl=dict(type="str", required=True),
),
NotificationType.PUSHBYTECHULUS: dict(
pushAPIKey=dict(type="str", required=True),
),
NotificationType.TELEGRAM: dict(
telegramChatID=dict(type="str", required=True),
telegramSendSilently=dict(type="bool", required=False),
telegramProtectContent=dict(type="bool", required=False),
telegramMessageThreadID=dict(type="str", required=False),
telegramBotToken=dict(type="str", required=True),
),
NotificationType.TWILIO: dict(
twilioAccountSID=dict(type="str", required=True),
twilioApiKey=dict(type="str", required=False),
twilioAuthToken=dict(type="str", required=True),
twilioToNumber=dict(type="str", required=True),
twilioFromNumber=dict(type="str", required=True),
),
NotificationType.WEBHOOK: dict(
webhookContentType=dict(type="str", required=True),
webhookCustomBody=dict(type="str", required=False),
webhookAdditionalHeaders=dict(type="str", required=False),
webhookURL=dict(type="str", required=True),
),
NotificationType.WECOM: dict(
weComBotKey=dict(type="str", required=True),
),
NotificationType.ZOHOCLIQ: dict(
webhookUrl=dict(type="str", required=True),
),
} }
notification_provider_conditions = dict(
gotifyPriority=dict(
min=0,
max=10,
),
ntfyPriority=dict(
min=1,
max=5,
),
opsgeniePriority=dict(
min=1,
max=5,
),
pushoverttl=dict(
min=0,
),
smseaglePriority=dict(
min=0,
max=9,
),
smtpPort=dict(
min=0,
max=65535,
),
)

View file

@ -2,8 +2,22 @@ from enum import Enum
class ProxyProtocol(str, Enum): class ProxyProtocol(str, Enum):
"""Enumerate proxy protocols."""
HTTPS = "https" HTTPS = "https"
"""HTTPS"""
HTTP = "http" HTTP = "http"
"""HTTP"""
SOCKS = "socks" SOCKS = "socks"
"""SOCKS"""
SOCKS5 = "socks5" SOCKS5 = "socks5"
"""SOCKS v5"""
SOCKS5H = "socks5h"
"""SOCKS v5 (+DNS)"""
SOCKS4 = "socks4" SOCKS4 = "socks4"
"""SOCKS v4"""