Generic auto-completion mechanism

This commit is contained in:
Martin Pépin 2019-12-11 22:00:10 +01:00
parent b8cd5f1da5
commit b1d8bb04c4
7 changed files with 235 additions and 122 deletions

View file

@ -1,94 +1,56 @@
from django import shortcuts from django.contrib.auth import get_user_model
from django.conf import settings
from django.contrib.auth.models import User
from django.db.models import Q from django.db.models import Q
from django.http import Http404 from django.http import Http404
from django.views.generic import TemplateView
from gestioncof.decorators import buro_required from gestioncof.decorators import buro_required
from gestioncof.models import CofProfile from shared.views import autocomplete
if getattr(settings, "LDAP_SERVER_URL", None): User = get_user_model()
from ldap3 import Connection
else:
# shared.tests.testcases.TestCaseMixin.mockLDAP needs
# Connection to be defined in order to mock it.
Connection = None
class Clipper(object): class COFMemberSearch(autocomplete.ModelSearch):
def __init__(self, clipper, fullname): model = User
if fullname is None: search_fields = ["username", "first_name", "last_name"]
fullname = ""
assert isinstance(clipper, str)
assert isinstance(fullname, str)
self.clipper = clipper
self.fullname = fullname
def __str__(self): def get_queryset_filter(self, *args, **kwargs):
return "{} ({})".format(self.clipper, self.fullname) qset_filter = super().get_queryset_filter(*args, **kwargs)
qset_filter &= Q(profile__is_cof=True)
def __eq__(self, other): return qset_filter
return self.clipper == other.clipper and self.fullname == other.fullname
@buro_required class COFOthersSearch(autocomplete.ModelSearch):
def autocomplete(request): model = User
if "q" not in request.GET: search_fields = ["username", "first_name", "last_name"]
raise Http404
q = request.GET["q"]
data = {"q": q}
queries = {} def get_queryset_filter(self, *args, **kwargs):
bits = q.split() qset_filter = super().get_queryset_filter(*args, **kwargs)
qset_filter &= Q(profile__is_cof=False)
return qset_filter
# Fetching data from User and CofProfile tables
queries["members"] = CofProfile.objects.filter(is_cof=True)
queries["users"] = User.objects.filter(profile__is_cof=False)
for bit in bits:
queries["members"] = queries["members"].filter(
Q(user__first_name__icontains=bit)
| Q(user__last_name__icontains=bit)
| Q(user__username__icontains=bit)
| Q(login_clipper__icontains=bit)
)
queries["users"] = queries["users"].filter(
Q(first_name__icontains=bit)
| Q(last_name__icontains=bit)
| Q(username__icontains=bit)
)
queries["members"] = queries["members"].distinct()
queries["users"] = queries["users"].distinct()
# Clearing redundancies class COFSearch(autocomplete.Compose):
usernames = set(queries["members"].values_list("login_clipper", flat="True")) | set( search_units = [
queries["users"].values_list("profile__login_clipper", flat="True") ("members", "username", COFMemberSearch),
) ("others", "username", COFOthersSearch),
("clippers", "clipper", autocomplete.LDAPSearch),
# Fetching data from the SPI
if getattr(settings, "LDAP_SERVER_URL", None):
# Fetching
ldap_query = "(&{:s})".format(
"".join(
"(|(cn=*{bit:s}*)(uid=*{bit:s}*))".format(bit=bit)
for bit in bits
if bit.isalnum()
)
)
if ldap_query != "(&)":
# If none of the bits were legal, we do not perform the query
entries = None
with Connection(settings.LDAP_SERVER_URL) as conn:
conn.search("dc=spi,dc=ens,dc=fr", ldap_query, attributes=["uid", "cn"])
entries = conn.entries
# Clearing redundancies
queries["clippers"] = [
Clipper(entry.uid.value, entry.cn.value)
for entry in entries
if entry.uid.value and entry.uid.value not in usernames
] ]
# Resulting data
data.update(queries)
data["options"] = sum(len(query) for query in queries)
return shortcuts.render(request, "autocomplete_user.html", data) cof_search = COFSearch()
class AutocompleteView(TemplateView):
template_name = "gestioncof/search_results.html"
def get_context_data(self, *args, **kwargs):
ctx = super().get_context_data(*args, **kwargs)
if "q" not in self.request.GET:
raise Http404
q = self.request.GET["q"]
ctx["q"] = q
ctx.update(cof_search.search(q.split()))
return ctx
autocomplete = buro_required(AutocompleteView.as_view())

View file

@ -1,29 +0,0 @@
{% load utils %}
<ul>
{% if members %}
<li class="autocomplete-header">Membres du COF</li>
{% for member in members %}{% if forloop.counter < 5 %}
<li class="autocomplete-value"><a href="{% url 'user-registration' member.user.username %}">{{ member.user|highlight_user:q }}</a></li>
{% elif forloop.counter == 5 %}<li class="autocomplete-more">...</a>{% endif %}{% endfor %}
{% endif %}
{% if users %}
<li class="autocomplete-header">Utilisateurs de GestioCOF</li>
{% for user in users %}{% if forloop.counter < 5 %}
<li class="autocomplete-value"><a href="{% url 'user-registration' user.username %}">{{ user|highlight_user:q }}</a></li>
{% elif forloop.counter == 5 %}<li class="autocomplete-more">...</a>{% endif %}{% endfor %}
{% endif %}
{% if clippers %}
<li class="autocomplete-header">Utilisateurs <tt>clipper</tt></li>
{% for clipper in clippers %}{% if forloop.counter < 5 %}
<li class="autocomplete-value"><a href="{% url 'clipper-registration' clipper.clipper clipper.fullname %}">{{ clipper|highlight_clipper:q }}</a></li>
{% elif forloop.counter == 5 %}<li class="autocomplete-more">...</a>{% endif %}{% endfor %}
{% endif %}
{% if not options %}
<li class="autocomplete-header">Aucune correspondance trouvée</li>
{% else %}
<li class="autocomplete-header">Pas dans la liste ?</li>
{% endif %}
<li><a href="{% url 'empty-registration' %}">Créer un compte</a></li>
</ul>

View file

@ -0,0 +1,56 @@
{% load utils %}
<ul>
{% if members %}
<li class="autocomplete-header">Membres</li>
{% for user in members %}
{% if forloop.counter < 5 %}
<li class="autocomplete-value">
<a href="{% url "user-registration" user.username %}">
{{ user|highlight_user:q }}
</a>
</li>
{% elif forloop.counter == 5 %}
<li class="autocomplete-more">...</li>
{% endif %}
{% endfor %}
{% endif %}
{% if others %}
<li class="autocomplete-header">Non-membres</li>
{% for user in others %}
{% if forloop.counter < 5 %}
<li class="autocomplete-value">
<a href="{% url "user-registration" user.username %}">
{{ user|highlight_user:q }}
</a>
</li>
{% elif forloop.counter == 5 %}
<li class="autocomplete-more">...</li>
{% endif %}
{% endfor %}
{% endif %}
{% if clippers %}
<li class="autocomplete-header">Utilisateurs <tt>clipper</tt></li>
{% for clipper in clippers %}
{% if forloop.counter < 5 %}
<li class="autocomplete-value">
<a href="{% url "clipper-registration" clipper.clipper clipper.fullname %}">
{{ clipper|highlight_clipper:q }}
</a>
</li>
{% elif forloop.counter == 5 %}
<li class="autocomplete-more">...</li>
{% endif %}
{% endfor %}
{% endif %}
{% if total %}
<li class="autocomplete-header">Pas dans la liste ?</li>
{% else %}
<li class="autocomplete-header">Aucune correspondance trouvée</li>
{% endif %}
<li><a href="{% url "empty-registration" %}">Créer un compte</a></li>
</ul>

View file

@ -15,9 +15,9 @@ from django.test import Client, TestCase, override_settings
from django.urls import reverse from django.urls import reverse
from bda.models import Salle, Tirage from bda.models import Salle, Tirage
from gestioncof.autocomplete import Clipper
from gestioncof.models import CalendarSubscription, Club, Event, Survey, SurveyAnswer from gestioncof.models import CalendarSubscription, Club, Event, Survey, SurveyAnswer
from gestioncof.tests.testcases import ViewTestCaseMixin from gestioncof.tests.testcases import ViewTestCaseMixin
from shared.views.autocomplete import Clipper
from .utils import create_member, create_root, create_user from .utils import create_member, create_root, create_user
@ -285,21 +285,19 @@ class RegistrationAutocompleteViewTests(ViewTestCaseMixin, TestCase):
self.mockLDAP([]) self.mockLDAP([])
def _test(self, query, expected_users, expected_members, expected_clippers): def _test(self, query, expected_others, expected_members, expected_clippers):
r = self.client.get(self.url, {"q": query}) r = self.client.get(self.url, {"q": query})
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertQuerysetEqual( self.assertQuerysetEqual(
r.context["users"], map(repr, expected_users), ordered=False r.context["others"], map(repr, expected_others), ordered=False
) )
self.assertQuerysetEqual( self.assertQuerysetEqual(
r.context["members"], r.context["members"], map(repr, expected_members), ordered=False,
map(lambda u: repr(u.profile), expected_members),
ordered=False,
) )
self.assertCountEqual( self.assertCountEqual(
map(str, r.context.get("clippers", [])), map(str, expected_clippers) map(str, r.context["clippers"]), map(str, expected_clippers)
) )
def test_username(self): def test_username(self):
@ -322,7 +320,7 @@ class RegistrationAutocompleteViewTests(ViewTestCaseMixin, TestCase):
mock_ldap.search.assert_called_once_with( mock_ldap.search.assert_called_once_with(
"dc=spi,dc=ens,dc=fr", "dc=spi,dc=ens,dc=fr",
"(&(|(cn=*aa*)(uid=*aa*))(|(cn=*bb*)(uid=*bb*)))", "(&(|(cn=*aa*)(uid=*aa*))(|(cn=*bb*)(uid=*bb*)))",
attributes=["uid", "cn"], attributes=["cn", "uid"],
) )
def test_clipper_escaped(self): def test_clipper_escaped(self):
@ -333,14 +331,14 @@ class RegistrationAutocompleteViewTests(ViewTestCaseMixin, TestCase):
mock_ldap.search.assert_not_called() mock_ldap.search.assert_not_called()
def test_clipper_no_duplicate(self): def test_clipper_no_duplicate(self):
self.mockLDAP([("uid", "uu_u1")]) self.mockLDAP([("uid", "abc")])
self._test("uu u1", [self.u1], [], [Clipper("uid", "uu_u1")]) self._test("abc", [self.u1], [], [Clipper("uid", "abc")])
self.u1.profile.login_clipper = "uid" self.u1.username = "uid"
self.u1.profile.save() self.u1.save()
self._test("uu u1", [self.u1], [], []) self._test("abc", [self.u1], [], [])
class HomeViewTests(ViewTestCaseMixin, TestCase): class HomeViewTests(ViewTestCaseMixin, TestCase):

0
shared/__init__.py Normal file
View file

View file

@ -111,7 +111,7 @@ class TestCaseMixin:
mock_context_manager.return_value.__enter__.return_value = mock_connection mock_context_manager.return_value.__enter__.return_value = mock_connection
patcher = mock.patch( patcher = mock.patch(
"gestioncof.autocomplete.Connection", new=mock_context_manager "shared.views.autocomplete.Connection", new=mock_context_manager
) )
patcher.start() patcher.start()
self.addCleanup(patcher.stop) self.addCleanup(patcher.stop)

View file

@ -1,8 +1,37 @@
from collections import namedtuple
from dal import autocomplete from dal import autocomplete
from django.conf import settings
from django.db.models import Q from django.db.models import Q
if getattr(settings, "LDAP_SERVER_URL", None):
from ldap3 import Connection
else:
# shared.tests.testcases.TestCaseMixin.mockLDAP needs
# Connection to be defined
Connection = None
class ModelSearch:
class SearchUnit:
"""Base class for all the search utilities.
A search unit should implement a ``search`` method taking a list of keywords as
argument and returning an iterable of search results.
"""
def search(self, _keywords):
raise NotImplementedError(
"Class implementing the SeachUnit interface should implement the search "
"method"
)
# ---
# Model-based search
# ---
class ModelSearch(SearchUnit):
"""Basic search engine for models based on filtering. """Basic search engine for models based on filtering.
The class should be configured through its ``model`` class attribute: the ``search`` The class should be configured through its ``model`` class attribute: the ``search``
@ -55,3 +84,100 @@ class Select2QuerySetView(ModelSearch, autocomplete.Select2QuerySetView):
def get_queryset(self): def get_queryset(self):
keywords = self.q.split() keywords = self.q.split()
return super().search(keywords) return super().search(keywords)
# ---
# LDAP search
# ---
Clipper = namedtuple("Clipper", "clipper fullname")
class LDAPSearch(SearchUnit):
ldap_server_url = getattr(settings, "LDAP_SERVER_URL", None)
domain_component = "dc=spi,dc=ens,dc=fr"
search_fields = ["cn", "uid"]
def get_ldap_query(self, keywords):
# Dumb but safe
keywords = filter(str.isalnum, keywords)
ldap_filters = []
for keyword in keywords:
ldap_filter = "(|{})".format(
"".join(
"({}=*{}*)".format(field, keyword) for field in self.search_fields
)
)
ldap_filters.append(ldap_filter)
return "(&{})".format("".join(ldap_filters))
def search(self, keywords):
"""Return a list of Clipper objects matching all the keywords.
The semantic of the search is the following: a Clipper appears in the
search results iff all of the keywords given as arguments occur in at least one
of the search fields.
"""
query = self.get_ldap_query(keywords)
if Connection is None or query == "(&)":
return []
with Connection(self.ldap_server_url) as conn:
conn.search(self.domain_component, query, attributes=self.search_fields)
return [Clipper(entry.uid.value, entry.cn.value) for entry in conn.entries]
# ---
# Composition of autocomplete units
# ---
class Compose:
"""Search with several units and remove duplicate results.
The ``search_units`` class attribute should be a list of tuples of the form ``(name,
uniq_key, search_unit)``.
The ``search`` method produces a dictionnary whose keys are the ``name``s given in
``search_units`` and whose values are iterables produced by the different search
units.
The ``uniq_key``s are used to remove duplicates: for instance, say that search unit
1 has ``uniq_key = "username"`` and search unit 2 has ``uniq_key = "clipper"``, then
search results from unit 2 whose ``.clipper`` attribute is equal to the
``.username`` attribute of some result from unit 1 are omitted.
Typical Example:
>>> from django.contrib.auth.models import User
>>>
>>> class UserSearch(ModelSearch):
... model = User
... search_fields = ["username", "first_name", "last_name"]
>>>
>>> class UserAndClipperSearch(Compose):
... search_units = [
... ("users", "username", UserSearch),
... ("clippers", "clipper", LDAPSearch),
... ]
In this example, clipper accounts that already have an associated user (i.e. with a
username equal to the clipper login), will not appear in the results.
"""
search_units = []
def search(self, keywords):
uniq_results = set()
results = {}
for name, uniq_key, search_unit in self.search_units:
res = search_unit().search(keywords)
res = [r for r in res if getattr(r, uniq_key) not in uniq_results]
uniq_results |= set((getattr(r, uniq_key) for r in res))
results[name] = res
return results