feat: add support for uptime kuma 1.21.0

This commit is contained in:
lucasheld 2023-03-20 15:14:39 +01:00
parent 550be17817
commit 42c040f451
8 changed files with 329 additions and 12 deletions

View file

@ -424,7 +424,8 @@ class UptimeKumaApi(object):
Event.CERT_INFO: None,
Event.DOCKER_HOST_LIST: None,
Event.AUTO_LOGIN: None,
Event.MAINTENANCE_LIST: None
Event.MAINTENANCE_LIST: None,
Event.API_KEY_LIST: None
}
self.sio.on(Event.CONNECT, self._event_connect)
@ -444,6 +445,7 @@ class UptimeKumaApi(object):
self.sio.on(Event.AUTO_LOGIN, self._event_auto_login)
self.sio.on(Event.INIT_SERVER_TIMEZONE, self._event_init_server_timezone)
self.sio.on(Event.MAINTENANCE_LIST, self._event_maintenance_list)
self.sio.on(Event.API_KEY_LIST, self._event_api_key_list)
self.connect()
@ -566,6 +568,9 @@ class UptimeKumaApi(object):
def _event_maintenance_list(self, data) -> None:
self._event_data[Event.MAINTENANCE_LIST] = data
def _event_api_key_list(self, data) -> None:
self._event_data[Event.API_KEY_LIST] = data
# connection
def connect(self) -> None:
@ -601,12 +606,14 @@ class UptimeKumaApi(object):
self,
type: MonitorType,
name: str,
description: str = None,
interval: int = 60,
retryInterval: int = 60,
resendInterval: int = 0,
maxretries: int = 0,
upsideDown: bool = False,
notificationIDList: list = None,
httpBodyEncoding: str = "json",
# HTTP, KEYWORD
url: str = None,
@ -619,6 +626,9 @@ class UptimeKumaApi(object):
body: str = None,
headers: str = None,
authMethod: AuthMethod = AuthMethod.NONE,
tlsCert: str = None,
tlsKey: str = None,
tlsCa: str = None,
basic_auth_user: str = None,
basic_auth_pass: str = None,
authDomain: str = None,
@ -690,6 +700,12 @@ class UptimeKumaApi(object):
"resendInterval": resendInterval
})
if parse_version(self.version) >= parse_version("1.21"):
data.update({
"description": description,
"httpBodyEncoding": httpBodyEncoding
})
if type in [MonitorType.KEYWORD, MonitorType.GRPC_KEYWORD]:
data.update({
"keyword": keyword,
@ -721,6 +737,13 @@ class UptimeKumaApi(object):
"authWorkstation": authWorkstation,
})
if authMethod == AuthMethod.MTLS:
data.update({
"tlsCert": tlsCert,
"tlsKey": tlsKey,
"tlsCa": tlsCa,
})
# GRPC_KEYWORD
if type == MonitorType.GRPC_KEYWORD:
data.update({
@ -878,6 +901,9 @@ class UptimeKumaApi(object):
}
]
"""
# TODO: replace with getMonitorList?
r = list(self._get_event_data(Event.MONITOR_LIST).values())
for monitor in r:
_convert_monitor_return(monitor)
@ -1095,7 +1121,10 @@ class UptimeKumaApi(object):
]
"""
r = self._call('getGameList')
if not r: # workaround, gamelist is not available on first call. TODO: remove when fixed
# Workaround, gamelist is not available on first call.
# Fixed in https://github.com/louislam/uptime-kuma/commit/7b8ed01f272fc4c6b69ff6299185e936a5e63735
# Exists in 1.20.0 - 1.21.0
if not r:
r = self._call('getGameList')
return r["gameList"]
@ -2781,12 +2810,14 @@ class UptimeKumaApi(object):
with self.wait_for_event(Event.DOCKER_HOST_LIST):
return self._call('deleteDockerHost', id_)
# maintenance
def get_maintenances(self) -> list:
"""
Get all maintenances.
:return: All maintenances.
:rtype: dict
:rtype: list
:raises UptimeKumaException: If the server returns an error.
Example::
@ -3176,7 +3207,7 @@ class UptimeKumaApi(object):
:param int id_: Id of the maintenance to get the monitors from.
:return: All monitors of the maintenance.
:rtype: dict
:rtype: list
:raises UptimeKumaException: If the server returns an error.
Example::
@ -3234,7 +3265,7 @@ class UptimeKumaApi(object):
:param int id_: Id of the maintenance to get the status pages from.
:return: All status pages of the maintenance.
:rtype: dict
:rtype: list
:raises UptimeKumaException: If the server returns an error.
Example::
@ -3281,3 +3312,172 @@ class UptimeKumaApi(object):
}
"""
return self._call('addMaintenanceStatusPage', (id_, status_pages))
# api key
def get_api_keys(self) -> list:
"""
Get all api keys.
:return: All api keys.
:rtype: list
:raises UptimeKumaException: If the server returns an error.
Example::
>>> api.get_api_key_list()
[
{
"id": 1,
"name": "test",
"userID": 1,
"createdDate": "2023-03-20 11:15:05",
"active": False,
"expires": null,
"status": "inactive"
},
{
"id": 2,
"name": "test2",
"userID": 1,
"createdDate": "2023-03-20 11:20:29",
"active": True,
"expires": "2023-03-30 12:20:00",
"status": "active"
}
]
"""
# TODO: replace with getAPIKeyList?
r = self._get_event_data(Event.API_KEY_LIST)
int_to_bool(r, ["active"])
return r
def get_api_key(self, id_: int) -> dict:
"""
Get an api key.
:param int id_: Id of the api key to get.
:return: The api key.
:rtype: dict
:raises UptimeKumaException: If the api key does not exist.
Example::
>>> api.get_api_key(1)
{
"id": 1,
"name": "test",
"userID": 1,
"createdDate": "2023-03-20 11:15:05",
"active": False,
"expires": null,
"status": "inactive"
}
"""
api_keys = self.get_api_keys()
for api_key in api_keys:
if api_key["id"] == id_:
return api_key
raise UptimeKumaException("notification does not exist")
def add_api_key(self, name: str, expires: str, active: bool) -> dict:
"""
Adds a new api key.
:param str name: Name of the api key.
:param str expires: Expiration date of the api key. Set to ``None`` to disable expiration.
:param bool active: True to activate api key.
:return: The server response.
:rtype: dict
:raises UptimeKumaException: If the server returns an error.
Example::
>>> api.add_api_key(
... name="test",
... expires="2023-03-30 12:20:00",
... active=True
... )
{
"msg": "Added Successfully.",
"key": "uk1_9XPRjV7ilGj9CvWRKYiBPq9GLtQs74UzTxKfCxWY",
"keyID": 1
}
>>> api.add_api_key(
... name="test2",
... expires=None,
... active=True
... )
{
"msg": "Added Successfully.",
"key": "uk2_jsB9H1Zmt9eEjycNFMTKgse1B0Vfvb944H4_aRqW",
"keyID": 2
}
"""
data = {
"name": name,
"expires": expires,
"active": 1 if active else 0
}
with self.wait_for_event(Event.API_KEY_LIST):
return self._call('addAPIKey', data)
def enable_api_key(self, id_: int) -> dict:
"""
Enable an api key.
:param int id_: Id of the api key to enable.
:return: The server response.
:rtype: dict
:raises UptimeKumaException: If the server returns an error.
Example::
>>> api.enable_api_key(1)
{
"msg": "Enabled Successfully"
}
"""
with self.wait_for_event(Event.API_KEY_LIST):
return self._call('enableAPIKey', id_)
def disable_api_key(self, id_: int) -> dict:
"""
Disable an api key.
:param int id_: Id of the api key to disable.
:return: The server response.
:rtype: dict
:raises UptimeKumaException: If the server returns an error.
Example::
>>> api.disable_api_key(1)
{
"msg": "Disabled Successfully."
}
"""
with self.wait_for_event(Event.API_KEY_LIST):
return self._call('disableAPIKey', id_)
def delete_api_key(self, id_: int) -> dict:
"""
Enable an api key.
:param int id_: Id of the api key to delete.
:return: The server response.
:rtype: dict
:raises UptimeKumaException: If the server returns an error.
Example::
>>> api.delete_api_key(1)
{
"msg": "Deleted Successfully."
}
"""
with self.wait_for_event(Event.API_KEY_LIST):
return self._call('deleteAPIKey', id_)