from django.contrib.auth import get_user_model from django.contrib.auth.models import Group as DjangoGroup from django.db import models from django.db.models.signals import m2m_changed, post_save from django.dispatch import receiver User = get_user_model() class WikiGroup(models.Model): """A structured user group, used to grant permissions on sub-wikis This model contains a structured group of users, in the sense that a group contains both users and other groups, allowing a DAG group structure. To make it work with django-wiki, we then flatten the set of users in the group into a native Django group. This might seem a bit inelegant, but it has been discussed in https://git.eleves.ens.fr/klub-dev-ens/WikiENS/issues/5 and seemed the best way at the time, in the sense that it seems stable and reliable wrt. internal behaviour and upstream changes in django-wiki. """ class CyclicStructureException(Exception): """Exception raised when a new edge introduces a cycle in the groups' structure""" def __init__(self, from_group, to_group): self.from_group = from_group self.to_group = to_group def __str__(self): return ( "The new relation {} --> {} introduces a cycle in the groups' graph" ).format(self.from_group, self.to_group) django_group = models.OneToOneField(DjangoGroup, on_delete=models.CASCADE) includes_groups = models.ManyToManyField( "self", symmetrical=False, # without this Django assumes that (a -> b) => (b -> a) on # a many-to-many on self related_name="included_in_groups", blank=True, ) users = models.ManyToManyField(User, blank=True) managers = models.ManyToManyField(User, related_name="managed_groups", blank=True) def __str__(self): return str(self.django_group) def get_all_users(self): """Get the queryset of all the users in this group, including recursively included users""" users_set = self.users.all() for subgroup in self.includes_groups.all(): users_set = users_set.union(subgroup.get_all_users()) return users_set def is_manager(self, user): """Checks wether the user is a manager of this group or any subgroup""" # Base case: the user is an explicit manager if user in self.managers.all(): return True for subgroup in self.includes_groups.all(): # If the user is a manager of a subgroup if subgroup.is_manager(user): return True return False def get_all_groups(self): """Returns the set of metagroups i.e. self plus the list of groups including this group recursively""" groups = {self} for metagroup in self.included_in_groups.all(): groups |= metagroup.get_all_groups() return groups def propagate_update(self, already_notified=None): """Commits itself to the Django group, and calls this method on every group in `included_in_groups`""" # Check that we did not already propagate the update signal to this group if already_notified is None: already_notified = set() elif self.pk in already_notified: return already_notified.add(self.pk) self.commit_to_django_group() for metagroup in self.included_in_groups.all(): metagroup.propagate_update(already_notified=already_notified) def commit_to_django_group(self): """Writes this model's data to the related Django group""" self.django_group.user_set.set(self.get_all_users()) def group_in_cycle(self, with_children): """Checks whether this group would be in a group cycle if it had `with_children` as child nodes. This assumes that the graph currently stored is acyclic. Returns `None` if no cycle is found, else retuns a child from `with_children` causing the cycle to appear.""" def do_dfs(cur_node, visited_nodes): """DFS to check whether we find `self` again""" if cur_node.pk in visited_nodes: return False if cur_node.pk == self.pk: return True visited_nodes.add(cur_node.pk) for child in cur_node.includes_groups.all(): if do_dfs(child, visited_nodes): return True return False visited = set() for child in with_children: if do_dfs(child, visited): return child return None @receiver(post_save, sender=WikiGroup, dispatch_uid="on_wiki_group_changed") def on_wiki_group_changed(sender, instance, **kwargs): """Commit the related WikiGroups to Django Group upon model change""" instance.propagate_update() @receiver( m2m_changed, sender=WikiGroup.includes_groups.through, dispatch_uid="on_wiki_group_includes_changed", ) def on_wiki_group_includes_changed(sender, instance, action, **kwargs): """Commit the related WikiGroups to Django Group upon change of the set of included other groups""" if action in ["post_add", "post_remove", "post_clear"]: instance.propagate_update() @receiver( m2m_changed, sender=WikiGroup.users.through, dispatch_uid="on_wiki_group_users_changed", ) def on_wiki_group_users_changed(sender, instance, action, **kwargs): """Commit the related WikiGroups to Django Group upon change of included users""" if action in ["post_add", "post_remove", "post_clear"]: instance.propagate_update() @receiver( m2m_changed, sender=WikiGroup.includes_groups.through, dispatch_uid="on_wiki_group_includes_check_acyclic", ) def on_wiki_group_includes_check_acyclic(sender, instance, action, pk_set, **kwargs): """Checks the acyclicity of the groups' graph before committing new edges. PLEASE NOTE that this check is only a fallback, and that forms should validate the acyclicity before committing anything. """ if action == "pre_add": children = [WikiGroup.objects.get(pk=cur_pk) for cur_pk in pk_set] cycle_child = instance.group_in_cycle(children) if cycle_child: raise WikiGroup.CyclicStructureException(instance, cycle_child)