feat(dgsi/user): Add VLAN information and machinery

Only via the shell for now, we can attribute a VLAN to a user and
reclaim it if needed
This commit is contained in:
Tom Hubrecht 2025-01-30 10:03:48 +01:00
parent 6860d6cb3b
commit d3d342879c
Signed by: thubrecht
SSH key fingerprint: SHA256:r+nK/SIcWlJ0zFZJGHtlAoRwq1Rm+WcKAm5ADYMoQPc
4 changed files with 135 additions and 2 deletions

View file

@ -238,6 +238,9 @@ AUTH_USER_MODEL = "dgsi.User"
DGSI_STAFF_GROUP = credentials.get("STAFF_GROUP", "dgnum_admins@sso.dgnum.eu")
DGSI_SUPERUSER_GROUP = credentials.get("SUPERUSER_GROUP", "dgnum_admins@sso.dgnum.eu")
VLAN_ID_MAX = 4094
VLAN_ID_MIN = (VLAN_ID_MAX - 850) + 1
###
# Internationalization configuration

View file

@ -44,9 +44,15 @@ class UserAdmin(DjangoUserAdmin, ImportExportMixin, ModelAdmin):
export_form_class = ExportForm
export_form_class = SelectableFieldsExportForm
readonly_fields = ("vlan_id",)
# Add the local fields
fieldsets = (
*DjangoUserAdmin.fieldsets,
(
_("Informations réseau"),
{"fields": ("vlan_id",)},
),
(
_("Documents DGNum"),
{"fields": ("accepted_statutes", "accepted_bylaws")},

View file

@ -0,0 +1,30 @@
# Generated by Django 4.2.16 on 2025-01-30 08:20
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("dgsi", "0009_archive"),
]
operations = [
migrations.AlterModelOptions(
name="user",
options={},
),
migrations.AddField(
model_name="user",
name="vlan_id",
field=models.PositiveSmallIntegerField(
null=True, verbose_name="VLAN associé au compte"
),
),
migrations.AddConstraint(
model_name="user",
constraint=models.UniqueConstraint(
fields=("vlan_id",), name="unique_vlan_attribution"
),
),
]

View file

@ -9,7 +9,7 @@ from asgiref.sync import async_to_sync
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.core.files import storage
from django.db import models
from django.db import models, transaction
from django.db.models.signals import pre_delete
from django.dispatch import receiver
from django.http import HttpRequest
@ -17,7 +17,9 @@ from django.utils.translation import gettext_lazy as _
from kanidm.exceptions import NoMatchingEntries
from kanidm.models.person import Person
from shared.kanidm import klient
from shared.kanidm import klient, sync_call
logger = logging.getLogger(__name__)
class Service(models.Model):
@ -177,6 +179,8 @@ class User(AbstractUser):
)
# accepted_terms = models.ManyToManyField(TermsAndConditions)
vlan_id = models.PositiveSmallIntegerField(_("VLAN associé au compte"), null=True)
@classmethod
def from_request(cls, request: HttpRequest) -> Self:
u = request.user
@ -211,3 +215,93 @@ class User(AbstractUser):
def can_access_archive(self, archive: Archive) -> bool:
# Prepare a more complex workflow
return True
###
# VLAN attribution machinery
#
# NOTE: It is a bit cumbersome because we need to store the vlan_id
# information both in DG·SI and in Kanidm
# Now the question will be « Which is the source of truth ? »
# For now, I believe it has to be DG·SI, so a sync script will
# have to be run regularly.
@transaction.atomic
def set_unique_vlan_id(self):
if self.vlan_id is not None:
raise ValueError(_("Ce compte a déjà un VLAN associé"))
self.vlan_id = min(
set(range(settings.VLAN_ID_MIN, settings.VLAN_ID_MAX))
- set(User.objects.exclude(vlan_id__isnull=True).values_list("vlan_id"))
)
# Preempt the vlan attribution
self.save(update_fields=["vlan_id"])
@transaction.atomic
def register_unique_vlan(self) -> None:
self.set_unique_vlan_id()
group_name = f"vlan_{self.vlan_id}"
res = sync_call("group_create", group_name)
if res.data is not None:
if (
res.data.get("plugin", {}).get("attrunique")
== "duplicate value detected"
):
logger.info(f"The {group_name} group already exists")
group = sync_call("group_get", group_name)
if group.member != []:
raise ValueError(
_("Le VLAN {} est déjà attribué.").format(self.vlan_id)
)
# The group is created and should be empty, so we can add the user to it
sync_call("group_add_members", group_name, [self.username])
# Check that we succeeded in setting a VLAN that is unique to the current user
group = sync_call("group_get", group_name)
if group.member != [f"{self.username}@sso.dgnum.eu"]:
# Remove the user from the group
sync_call("group_delete_members", self.username)
self.vlan_id = None
self.save(update_fields=["vlan_id"])
raise RuntimeError("Duplicate VLAN attribution detected")
def reclaim_vlan(self):
if self.vlan_id is None:
# Nothing to do, just return
logger.warning(
f"Reclaiming VLAN for {self.username} who does not have one."
)
return
group_name = f"vlan_{self.vlan_id}"
sync_call("group_delete_members", group_name, [self.username])
# Check that the call succeeded
try:
group = sync_call("group_get", group_name)
if "{self.username}@sso.dgnum.eu" in group.member:
raise RuntimeError(
f"Something went wrong in trying to reclaim vlan {self.vlan_id}"
)
except ValueError:
# The group does not exist apparently, keep going
logger.warning(
f"Reclaiming VLAN {self.vlan_id}, but the associated group does not exist."
)
finally:
self.vlan_id = None
self.save(update_fields=["vlan_id"])
class Meta:
constraints = [
models.UniqueConstraint(fields=["vlan_id"], name="unique_vlan_attribution")
]