From b1d8bb04c4d9e772c4cc205de22147b893e01991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20P=C3=A9pin?= Date: Wed, 11 Dec 2019 22:00:10 +0100 Subject: [PATCH] Generic auto-completion mechanism --- gestioncof/autocomplete.py | 118 ++++++---------- gestioncof/templates/autocomplete_user.html | 29 ---- .../templates/gestioncof/search_results.html | 56 ++++++++ gestioncof/tests/test_views.py | 24 ++-- shared/__init__.py | 0 shared/tests/testcases.py | 2 +- shared/views/autocomplete.py | 128 +++++++++++++++++- 7 files changed, 235 insertions(+), 122 deletions(-) delete mode 100644 gestioncof/templates/autocomplete_user.html create mode 100644 gestioncof/templates/gestioncof/search_results.html create mode 100644 shared/__init__.py diff --git a/gestioncof/autocomplete.py b/gestioncof/autocomplete.py index e27cdb92..239317f8 100644 --- a/gestioncof/autocomplete.py +++ b/gestioncof/autocomplete.py @@ -1,94 +1,56 @@ -from django import shortcuts -from django.conf import settings -from django.contrib.auth.models import User +from django.contrib.auth import get_user_model from django.db.models import Q from django.http import Http404 +from django.views.generic import TemplateView from gestioncof.decorators import buro_required -from gestioncof.models import CofProfile +from shared.views import autocomplete -if getattr(settings, "LDAP_SERVER_URL", None): - from ldap3 import Connection -else: - # shared.tests.testcases.TestCaseMixin.mockLDAP needs - # Connection to be defined in order to mock it. - Connection = None +User = get_user_model() -class Clipper(object): - def __init__(self, clipper, fullname): - if fullname is None: - fullname = "" - assert isinstance(clipper, str) - assert isinstance(fullname, str) - self.clipper = clipper - self.fullname = fullname +class COFMemberSearch(autocomplete.ModelSearch): + model = User + search_fields = ["username", "first_name", "last_name"] - def __str__(self): - return "{} ({})".format(self.clipper, self.fullname) - - def __eq__(self, other): - return self.clipper == other.clipper and self.fullname == other.fullname + def get_queryset_filter(self, *args, **kwargs): + qset_filter = super().get_queryset_filter(*args, **kwargs) + qset_filter &= Q(profile__is_cof=True) + return qset_filter -@buro_required -def autocomplete(request): - if "q" not in request.GET: - raise Http404 - q = request.GET["q"] - data = {"q": q} +class COFOthersSearch(autocomplete.ModelSearch): + model = User + search_fields = ["username", "first_name", "last_name"] - queries = {} - bits = q.split() + def get_queryset_filter(self, *args, **kwargs): + qset_filter = super().get_queryset_filter(*args, **kwargs) + qset_filter &= Q(profile__is_cof=False) + return qset_filter - # Fetching data from User and CofProfile tables - queries["members"] = CofProfile.objects.filter(is_cof=True) - queries["users"] = User.objects.filter(profile__is_cof=False) - for bit in bits: - queries["members"] = queries["members"].filter( - Q(user__first_name__icontains=bit) - | Q(user__last_name__icontains=bit) - | Q(user__username__icontains=bit) - | Q(login_clipper__icontains=bit) - ) - queries["users"] = queries["users"].filter( - Q(first_name__icontains=bit) - | Q(last_name__icontains=bit) - | Q(username__icontains=bit) - ) - queries["members"] = queries["members"].distinct() - queries["users"] = queries["users"].distinct() - # Clearing redundancies - usernames = set(queries["members"].values_list("login_clipper", flat="True")) | set( - queries["users"].values_list("profile__login_clipper", flat="True") - ) +class COFSearch(autocomplete.Compose): + search_units = [ + ("members", "username", COFMemberSearch), + ("others", "username", COFOthersSearch), + ("clippers", "clipper", autocomplete.LDAPSearch), + ] - # Fetching data from the SPI - if getattr(settings, "LDAP_SERVER_URL", None): - # Fetching - ldap_query = "(&{:s})".format( - "".join( - "(|(cn=*{bit:s}*)(uid=*{bit:s}*))".format(bit=bit) - for bit in bits - if bit.isalnum() - ) - ) - if ldap_query != "(&)": - # If none of the bits were legal, we do not perform the query - entries = None - with Connection(settings.LDAP_SERVER_URL) as conn: - conn.search("dc=spi,dc=ens,dc=fr", ldap_query, attributes=["uid", "cn"]) - entries = conn.entries - # Clearing redundancies - queries["clippers"] = [ - Clipper(entry.uid.value, entry.cn.value) - for entry in entries - if entry.uid.value and entry.uid.value not in usernames - ] - # Resulting data - data.update(queries) - data["options"] = sum(len(query) for query in queries) +cof_search = COFSearch() - return shortcuts.render(request, "autocomplete_user.html", data) + +class AutocompleteView(TemplateView): + template_name = "gestioncof/search_results.html" + + def get_context_data(self, *args, **kwargs): + ctx = super().get_context_data(*args, **kwargs) + if "q" not in self.request.GET: + raise Http404 + q = self.request.GET["q"] + ctx["q"] = q + ctx.update(cof_search.search(q.split())) + return ctx + + +autocomplete = buro_required(AutocompleteView.as_view()) diff --git a/gestioncof/templates/autocomplete_user.html b/gestioncof/templates/autocomplete_user.html deleted file mode 100644 index face824d..00000000 --- a/gestioncof/templates/autocomplete_user.html +++ /dev/null @@ -1,29 +0,0 @@ -{% load utils %} - diff --git a/gestioncof/templates/gestioncof/search_results.html b/gestioncof/templates/gestioncof/search_results.html new file mode 100644 index 00000000..ba8b6580 --- /dev/null +++ b/gestioncof/templates/gestioncof/search_results.html @@ -0,0 +1,56 @@ +{% load utils %} + + diff --git a/gestioncof/tests/test_views.py b/gestioncof/tests/test_views.py index 31cb8d8a..f757b4c2 100644 --- a/gestioncof/tests/test_views.py +++ b/gestioncof/tests/test_views.py @@ -15,9 +15,9 @@ from django.test import Client, TestCase, override_settings from django.urls import reverse from bda.models import Salle, Tirage -from gestioncof.autocomplete import Clipper from gestioncof.models import CalendarSubscription, Club, Event, Survey, SurveyAnswer from gestioncof.tests.testcases import ViewTestCaseMixin +from shared.views.autocomplete import Clipper from .utils import create_member, create_root, create_user @@ -285,21 +285,19 @@ class RegistrationAutocompleteViewTests(ViewTestCaseMixin, TestCase): self.mockLDAP([]) - def _test(self, query, expected_users, expected_members, expected_clippers): + def _test(self, query, expected_others, expected_members, expected_clippers): r = self.client.get(self.url, {"q": query}) self.assertEqual(r.status_code, 200) self.assertQuerysetEqual( - r.context["users"], map(repr, expected_users), ordered=False + r.context["others"], map(repr, expected_others), ordered=False ) self.assertQuerysetEqual( - r.context["members"], - map(lambda u: repr(u.profile), expected_members), - ordered=False, + r.context["members"], map(repr, expected_members), ordered=False, ) self.assertCountEqual( - map(str, r.context.get("clippers", [])), map(str, expected_clippers) + map(str, r.context["clippers"]), map(str, expected_clippers) ) def test_username(self): @@ -322,7 +320,7 @@ class RegistrationAutocompleteViewTests(ViewTestCaseMixin, TestCase): mock_ldap.search.assert_called_once_with( "dc=spi,dc=ens,dc=fr", "(&(|(cn=*aa*)(uid=*aa*))(|(cn=*bb*)(uid=*bb*)))", - attributes=["uid", "cn"], + attributes=["cn", "uid"], ) def test_clipper_escaped(self): @@ -333,14 +331,14 @@ class RegistrationAutocompleteViewTests(ViewTestCaseMixin, TestCase): mock_ldap.search.assert_not_called() def test_clipper_no_duplicate(self): - self.mockLDAP([("uid", "uu_u1")]) + self.mockLDAP([("uid", "abc")]) - self._test("uu u1", [self.u1], [], [Clipper("uid", "uu_u1")]) + self._test("abc", [self.u1], [], [Clipper("uid", "abc")]) - self.u1.profile.login_clipper = "uid" - self.u1.profile.save() + self.u1.username = "uid" + self.u1.save() - self._test("uu u1", [self.u1], [], []) + self._test("abc", [self.u1], [], []) class HomeViewTests(ViewTestCaseMixin, TestCase): diff --git a/shared/__init__.py b/shared/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/shared/tests/testcases.py b/shared/tests/testcases.py index 35d697e7..507e1361 100644 --- a/shared/tests/testcases.py +++ b/shared/tests/testcases.py @@ -111,7 +111,7 @@ class TestCaseMixin: mock_context_manager.return_value.__enter__.return_value = mock_connection patcher = mock.patch( - "gestioncof.autocomplete.Connection", new=mock_context_manager + "shared.views.autocomplete.Connection", new=mock_context_manager ) patcher.start() self.addCleanup(patcher.stop) diff --git a/shared/views/autocomplete.py b/shared/views/autocomplete.py index 095dc3f8..5254f8c8 100644 --- a/shared/views/autocomplete.py +++ b/shared/views/autocomplete.py @@ -1,8 +1,37 @@ +from collections import namedtuple + from dal import autocomplete +from django.conf import settings from django.db.models import Q +if getattr(settings, "LDAP_SERVER_URL", None): + from ldap3 import Connection +else: + # shared.tests.testcases.TestCaseMixin.mockLDAP needs + # Connection to be defined + Connection = None -class ModelSearch: + +class SearchUnit: + """Base class for all the search utilities. + + A search unit should implement a ``search`` method taking a list of keywords as + argument and returning an iterable of search results. + """ + + def search(self, _keywords): + raise NotImplementedError( + "Class implementing the SeachUnit interface should implement the search " + "method" + ) + + +# --- +# Model-based search +# --- + + +class ModelSearch(SearchUnit): """Basic search engine for models based on filtering. The class should be configured through its ``model`` class attribute: the ``search`` @@ -55,3 +84,100 @@ class Select2QuerySetView(ModelSearch, autocomplete.Select2QuerySetView): def get_queryset(self): keywords = self.q.split() return super().search(keywords) + + +# --- +# LDAP search +# --- + +Clipper = namedtuple("Clipper", "clipper fullname") + + +class LDAPSearch(SearchUnit): + ldap_server_url = getattr(settings, "LDAP_SERVER_URL", None) + domain_component = "dc=spi,dc=ens,dc=fr" + search_fields = ["cn", "uid"] + + def get_ldap_query(self, keywords): + # Dumb but safe + keywords = filter(str.isalnum, keywords) + + ldap_filters = [] + + for keyword in keywords: + ldap_filter = "(|{})".format( + "".join( + "({}=*{}*)".format(field, keyword) for field in self.search_fields + ) + ) + ldap_filters.append(ldap_filter) + + return "(&{})".format("".join(ldap_filters)) + + def search(self, keywords): + """Return a list of Clipper objects matching all the keywords. + + The semantic of the search is the following: a Clipper appears in the + search results iff all of the keywords given as arguments occur in at least one + of the search fields. + """ + + query = self.get_ldap_query(keywords) + + if Connection is None or query == "(&)": + return [] + + with Connection(self.ldap_server_url) as conn: + conn.search(self.domain_component, query, attributes=self.search_fields) + return [Clipper(entry.uid.value, entry.cn.value) for entry in conn.entries] + + +# --- +# Composition of autocomplete units +# --- + + +class Compose: + """Search with several units and remove duplicate results. + + The ``search_units`` class attribute should be a list of tuples of the form ``(name, + uniq_key, search_unit)``. + + The ``search`` method produces a dictionnary whose keys are the ``name``s given in + ``search_units`` and whose values are iterables produced by the different search + units. + + The ``uniq_key``s are used to remove duplicates: for instance, say that search unit + 1 has ``uniq_key = "username"`` and search unit 2 has ``uniq_key = "clipper"``, then + search results from unit 2 whose ``.clipper`` attribute is equal to the + ``.username`` attribute of some result from unit 1 are omitted. + + Typical Example: + + >>> from django.contrib.auth.models import User + >>> + >>> class UserSearch(ModelSearch): + ... model = User + ... search_fields = ["username", "first_name", "last_name"] + >>> + >>> class UserAndClipperSearch(Compose): + ... search_units = [ + ... ("users", "username", UserSearch), + ... ("clippers", "clipper", LDAPSearch), + ... ] + + In this example, clipper accounts that already have an associated user (i.e. with a + username equal to the clipper login), will not appear in the results. + """ + + search_units = [] + + def search(self, keywords): + uniq_results = set() + results = {} + for name, uniq_key, search_unit in self.search_units: + res = search_unit().search(keywords) + res = [r for r in res if getattr(r, uniq_key) not in uniq_results] + uniq_results |= set((getattr(r, uniq_key) for r in res)) + results[name] = res + return results