chore(vlan): Simplify management

We no longer rely on kanidm groups for attributing vlans
This commit is contained in:
Tom Hubrecht 2025-06-02 22:03:26 +02:00
parent 9d52ce5bdf
commit fbf6385e65
Signed by: thubrecht
SSH key fingerprint: SHA256:r+nK/SIcWlJ0zFZJGHtlAoRwq1Rm+WcKAm5ADYMoQPc

View file

@ -17,7 +17,7 @@ from django.utils.translation import gettext_lazy as _
from kanidm.exceptions import NoMatchingEntries
from kanidm.models.person import Person
from shared.kanidm import klient, sync_call
from shared.kanidm import klient
logger = logging.getLogger(__name__)
@ -221,15 +221,9 @@ class User(AbstractUser):
###
# VLAN attribution machinery
#
# NOTE: It is a bit cumbersome because we need to store the vlan_id
# information both in DG·SI and in Kanidm
# Now the question will be « Which is the source of truth ? »
# For now, I believe it has to be DG·SI, so a sync script will
# have to be run regularly.
@transaction.atomic
def set_unique_vlan_id(self):
def register_unique_vlan(self):
if self.vlan_id is not None:
raise ValueError(_("Ce compte a déjà un VLAN associé"))
@ -242,34 +236,8 @@ class User(AbstractUser):
)
)
# Preempt the vlan attribution
self.save(update_fields=["vlan_id"])
@transaction.atomic
def register_unique_vlan(self) -> None:
self.set_unique_vlan_id()
group_name = f"vlan_{self.vlan_id}"
# Add the user to the group requested group
sync_call("group_add_members", group_name, [self.username])
# Check that we succeeded in setting a VLAN that is unique to the current user
group = sync_call("group_get", group_name)
if group.member == []:
# Something went wrong
self.vlan_id = None
self.save(update_fields=["vlan_id"])
raise RuntimeError("VLAN attribution failed")
if group.member != [f"{self.username}@sso.dgnum.eu"]:
# Remove the user from the group
sync_call("group_delete_members", group_name, [self.username])
self.vlan_id = None
self.save(update_fields=["vlan_id"])
raise RuntimeError("Duplicate VLAN attribution detected")
def reclaim_vlan(self):
if self.vlan_id is None:
# Nothing to do, just return
@ -278,26 +246,8 @@ class User(AbstractUser):
)
return
group_name = f"vlan_{self.vlan_id}"
sync_call("group_delete_members", group_name, [self.username])
# Check that the call succeeded
try:
group = sync_call("group_get", group_name)
if "{self.username}@sso.dgnum.eu" in group.member:
raise RuntimeError(
f"Something went wrong in trying to reclaim vlan {self.vlan_id}"
)
except ValueError:
# The group does not exist apparently, keep going
logger.warning(
f"Reclaiming VLAN {self.vlan_id}, but the associated group does not exist."
)
finally:
self.vlan_id = None
self.save(update_fields=["vlan_id"])
self.vlan_id = None
self.save(update_fields=["vlan_id"])
class Meta:
constraints = [