feat(radius/vlan_id): request vlan_id to REST api
Some checks failed
Linting checks / clippy (push) Has been cancelled
Linting checks / fmt (push) Has been cancelled
Spell Check / codespell (push) Has been cancelled
Container - Kanidm / Set image tag values (push) Has been cancelled
Container - Kanidmd / Set image tag values (push) Has been cancelled
Container - Radiusd / Set image tag values (push) Has been cancelled
Javascript Linting / javascript_lint (push) Has been cancelled
Javascript Linting / javascript_fmt (push) Has been cancelled
Container - Kanidm / Build kanidm Docker image (push) Has been cancelled
Container - Kanidm / Push kanidm Docker image (push) Has been cancelled
Container - Kanidmd / Build kanidmd Docker image (push) Has been cancelled
Container - Kanidmd / Push kanidmd Docker image (push) Has been cancelled
Container - Radiusd / Build radius Docker image (push) Has been cancelled
Container - Radiusd / Push radius Docker image (push) Has been cancelled

This commit is contained in:
catvayor 2025-04-11 20:13:20 +02:00
parent d9f4dbdd3e
commit 834894831e
Signed by: lbailly
GPG key ID: CE3E645251AC63F3
3 changed files with 13 additions and 45 deletions

View file

@ -8,6 +8,7 @@ import logging
import os
from pathlib import Path
import sys
import requests
from typing import Any, Dict, Optional, Union
from kanidm.exceptions import NoMatchingEntries
@ -15,7 +16,6 @@ from kanidm.types import AuthState, RadiusTokenResponse
from .. import KanidmClient
from . import radiusd
from .utils import check_vlan
CONTAINER_CONFIG_FILE_PATH = "/data/radius.toml"
@ -147,13 +147,15 @@ def authorize(
logging.info("User %s doesn't have a group from the required list.", name)
return radiusd.RLM_MODULE_REJECT
# look up them in config for group vlan if possible.
# TODO: work out the typing on this, WTF.
uservlan: int = reduce(
check_vlan,
tok.groups,
kanidm_client.config.radius_default_vlan,
)
dgsi_info = requests.get(kanidm_client.config.dgsi_endpoint + "/" + name, headers={
"Authorization": "Token " + kanidm_client.config.dgsi_token
})
if dgsi_info.status_code != 200:
logging.error("dgsi: error getting vlan of %s : %s.", name, dgsi_info.status_code)
return radiusd.RLM_MODULE_FAIL
uservlan: int = dgsi_info.json().get("vlan_id", default=kanidm_client.config.radius_default_vlan);
if uservlan == int(0):
logging.info("Invalid uservlan of 0")

View file

@ -1,37 +0,0 @@
""" class utils """
from typing import Optional
import logging
import os
from .. import KanidmClient
from ..types import RadiusTokenGroup
def check_vlan(
acc: int,
group: RadiusTokenGroup,
kanidm_client: Optional[KanidmClient] = None,
) -> int:
"""checks if a vlan is in the config,
acc is the default vlan
"""
logging.debug("acc=%s", acc)
if kanidm_client is None:
if "KANIDM_CONFIG_FILE" in os.environ:
kanidm_client = KanidmClient(config_file=os.environ["KANIDM_CONFIG_FILE"])
else:
raise ValueError("Need to pass this a kanidm_client")
for radius_group in kanidm_client.config.radius_groups:
logging.debug(
"Checking vlan group '%s' against user group %s",
radius_group.spn,
group.spn,
)
if radius_group.spn == group.spn:
logging.info("returning new vlan: %s", radius_group.vlan)
return radius_group.vlan
logging.debug("returning already set vlan: %s", acc)
return acc

View file

@ -179,6 +179,9 @@ class KanidmClientConfig(BaseModel):
radius_groups: List[RadiusGroup] = []
radius_clients: List[RadiusClient] = []
dgsi_endpoint: str = ""
dgsi_token: str = ""
connect_timeout: int = 30
@classmethod