Merge branch 'kerl/autocomplete' into 'master'

L'autocomplétion est isolée et réutilisable par d'autres apps

See merge request klub-dev-ens/gestioCOF!390
This commit is contained in:
Ludovic Stephan 2020-05-07 16:28:46 +02:00
commit 64ceb813c6
7 changed files with 238 additions and 124 deletions

View file

@ -1,94 +1,56 @@
from django import shortcuts
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.auth import get_user_model
from django.db.models import Q
from django.http import Http404
from django.views.generic import TemplateView
from gestioncof.decorators import buro_required
from gestioncof.models import CofProfile
from shared.views import autocomplete
if getattr(settings, "LDAP_SERVER_URL", None):
from ldap3 import Connection
else:
# shared.tests.testcases.TestCaseMixin.mockLDAP needs
# Connection to be defined in order to mock it.
Connection = None
User = get_user_model()
class Clipper(object):
def __init__(self, clipper, fullname):
if fullname is None:
fullname = ""
assert isinstance(clipper, str)
assert isinstance(fullname, str)
self.clipper = clipper
self.fullname = fullname
class COFMemberSearch(autocomplete.ModelSearch):
model = User
search_fields = ["username", "first_name", "last_name"]
def __str__(self):
return "{} ({})".format(self.clipper, self.fullname)
def __eq__(self, other):
return self.clipper == other.clipper and self.fullname == other.fullname
def get_queryset_filter(self, *args, **kwargs):
qset_filter = super().get_queryset_filter(*args, **kwargs)
qset_filter &= Q(profile__is_cof=True)
return qset_filter
@buro_required
def autocomplete(request):
if "q" not in request.GET:
raise Http404
q = request.GET["q"]
data = {"q": q}
class COFOthersSearch(autocomplete.ModelSearch):
model = User
search_fields = ["username", "first_name", "last_name"]
queries = {}
bits = q.split()
def get_queryset_filter(self, *args, **kwargs):
qset_filter = super().get_queryset_filter(*args, **kwargs)
qset_filter &= Q(profile__is_cof=False)
return qset_filter
# Fetching data from User and CofProfile tables
queries["members"] = CofProfile.objects.filter(is_cof=True)
queries["users"] = User.objects.filter(profile__is_cof=False)
for bit in bits:
queries["members"] = queries["members"].filter(
Q(user__first_name__icontains=bit)
| Q(user__last_name__icontains=bit)
| Q(user__username__icontains=bit)
| Q(login_clipper__icontains=bit)
)
queries["users"] = queries["users"].filter(
Q(first_name__icontains=bit)
| Q(last_name__icontains=bit)
| Q(username__icontains=bit)
)
queries["members"] = queries["members"].distinct()
queries["users"] = queries["users"].distinct()
# Clearing redundancies
usernames = set(queries["members"].values_list("login_clipper", flat="True")) | set(
queries["users"].values_list("profile__login_clipper", flat="True")
)
class COFSearch(autocomplete.Compose):
search_units = [
("members", "username", COFMemberSearch),
("others", "username", COFOthersSearch),
("clippers", "clipper", autocomplete.LDAPSearch),
]
# Fetching data from the SPI
if getattr(settings, "LDAP_SERVER_URL", None):
# Fetching
ldap_query = "(&{:s})".format(
"".join(
"(|(cn=*{bit:s}*)(uid=*{bit:s}*))".format(bit=bit)
for bit in bits
if bit.isalnum()
)
)
if ldap_query != "(&)":
# If none of the bits were legal, we do not perform the query
entries = None
with Connection(settings.LDAP_SERVER_URL) as conn:
conn.search("dc=spi,dc=ens,dc=fr", ldap_query, attributes=["uid", "cn"])
entries = conn.entries
# Clearing redundancies
queries["clippers"] = [
Clipper(entry.uid.value, entry.cn.value)
for entry in entries
if entry.uid.value and entry.uid.value not in usernames
]
# Resulting data
data.update(queries)
data["options"] = sum(len(query) for query in queries)
cof_search = COFSearch()
return shortcuts.render(request, "autocomplete_user.html", data)
class AutocompleteView(TemplateView):
template_name = "gestioncof/search_results.html"
def get_context_data(self, *args, **kwargs):
ctx = super().get_context_data(*args, **kwargs)
if "q" not in self.request.GET:
raise Http404
q = self.request.GET["q"]
ctx["q"] = q
ctx.update(cof_search.search(q.split()))
return ctx
autocomplete = buro_required(AutocompleteView.as_view())

View file

@ -1,29 +0,0 @@
{% load utils %}
<ul>
{% if members %}
<li class="autocomplete-header">Membres du COF</li>
{% for member in members %}{% if forloop.counter < 5 %}
<li class="autocomplete-value"><a href="{% url 'user-registration' member.user.username %}">{{ member.user|highlight_user:q }}</a></li>
{% elif forloop.counter == 5 %}<li class="autocomplete-more">...</a>{% endif %}{% endfor %}
{% endif %}
{% if users %}
<li class="autocomplete-header">Utilisateurs de GestioCOF</li>
{% for user in users %}{% if forloop.counter < 5 %}
<li class="autocomplete-value"><a href="{% url 'user-registration' user.username %}">{{ user|highlight_user:q }}</a></li>
{% elif forloop.counter == 5 %}<li class="autocomplete-more">...</a>{% endif %}{% endfor %}
{% endif %}
{% if clippers %}
<li class="autocomplete-header">Utilisateurs <tt>clipper</tt></li>
{% for clipper in clippers %}{% if forloop.counter < 5 %}
<li class="autocomplete-value"><a href="{% url 'clipper-registration' clipper.clipper clipper.fullname %}">{{ clipper|highlight_clipper:q }}</a></li>
{% elif forloop.counter == 5 %}<li class="autocomplete-more">...</a>{% endif %}{% endfor %}
{% endif %}
{% if not options %}
<li class="autocomplete-header">Aucune correspondance trouvée</li>
{% else %}
<li class="autocomplete-header">Pas dans la liste ?</li>
{% endif %}
<li><a href="{% url 'empty-registration' %}">Créer un compte</a></li>
</ul>

View file

@ -0,0 +1,56 @@
{% load utils %}
<ul>
{% if members %}
<li class="autocomplete-header">Membres</li>
{% for user in members %}
{% if forloop.counter < 5 %}
<li class="autocomplete-value">
<a href="{% url "user-registration" user.username %}">
{{ user|highlight_user:q }}
</a>
</li>
{% elif forloop.counter == 5 %}
<li class="autocomplete-more">...</li>
{% endif %}
{% endfor %}
{% endif %}
{% if others %}
<li class="autocomplete-header">Non-membres</li>
{% for user in others %}
{% if forloop.counter < 5 %}
<li class="autocomplete-value">
<a href="{% url "user-registration" user.username %}">
{{ user|highlight_user:q }}
</a>
</li>
{% elif forloop.counter == 5 %}
<li class="autocomplete-more">...</li>
{% endif %}
{% endfor %}
{% endif %}
{% if clippers %}
<li class="autocomplete-header">Utilisateurs <tt>clipper</tt></li>
{% for clipper in clippers %}
{% if forloop.counter < 5 %}
<li class="autocomplete-value">
<a href="{% url "clipper-registration" clipper.clipper clipper.fullname %}">
{{ clipper|highlight_clipper:q }}
</a>
</li>
{% elif forloop.counter == 5 %}
<li class="autocomplete-more">...</li>
{% endif %}
{% endfor %}
{% endif %}
{% if total %}
<li class="autocomplete-header">Pas dans la liste ?</li>
{% else %}
<li class="autocomplete-header">Aucune correspondance trouvée</li>
{% endif %}
<li><a href="{% url "empty-registration" %}">Créer un compte</a></li>
</ul>

View file

@ -15,9 +15,9 @@ from django.test import Client, TestCase, override_settings
from django.urls import reverse
from bda.models import Salle, Tirage
from gestioncof.autocomplete import Clipper
from gestioncof.models import CalendarSubscription, Club, Event, Survey, SurveyAnswer
from gestioncof.tests.testcases import ViewTestCaseMixin
from shared.views.autocomplete import Clipper
from .utils import create_member, create_root, create_user
@ -285,21 +285,19 @@ class RegistrationAutocompleteViewTests(ViewTestCaseMixin, TestCase):
self.mockLDAP([])
def _test(self, query, expected_users, expected_members, expected_clippers):
def _test(self, query, expected_others, expected_members, expected_clippers):
r = self.client.get(self.url, {"q": query})
self.assertEqual(r.status_code, 200)
self.assertQuerysetEqual(
r.context["users"], map(repr, expected_users), ordered=False
r.context["others"], map(repr, expected_others), ordered=False
)
self.assertQuerysetEqual(
r.context["members"],
map(lambda u: repr(u.profile), expected_members),
ordered=False,
r.context["members"], map(repr, expected_members), ordered=False,
)
self.assertCountEqual(
map(str, r.context.get("clippers", [])), map(str, expected_clippers)
map(str, r.context["clippers"]), map(str, expected_clippers)
)
def test_username(self):
@ -322,7 +320,7 @@ class RegistrationAutocompleteViewTests(ViewTestCaseMixin, TestCase):
mock_ldap.search.assert_called_once_with(
"dc=spi,dc=ens,dc=fr",
"(&(|(cn=*aa*)(uid=*aa*))(|(cn=*bb*)(uid=*bb*)))",
attributes=["uid", "cn"],
attributes=["cn", "uid"],
)
def test_clipper_escaped(self):
@ -333,14 +331,14 @@ class RegistrationAutocompleteViewTests(ViewTestCaseMixin, TestCase):
mock_ldap.search.assert_not_called()
def test_clipper_no_duplicate(self):
self.mockLDAP([("uid", "uu_u1")])
self.mockLDAP([("uid", "abc")])
self._test("uu u1", [self.u1], [], [Clipper("uid", "uu_u1")])
self._test("abc", [self.u1], [], [Clipper("uid", "abc")])
self.u1.profile.login_clipper = "uid"
self.u1.profile.save()
self.u1.username = "uid"
self.u1.save()
self._test("uu u1", [self.u1], [], [])
self._test("abc", [self.u1], [], [])
class HomeViewTests(ViewTestCaseMixin, TestCase):

0
shared/__init__.py Normal file
View file

View file

@ -111,7 +111,7 @@ class TestCaseMixin:
mock_context_manager.return_value.__enter__.return_value = mock_connection
patcher = mock.patch(
"gestioncof.autocomplete.Connection", new=mock_context_manager
"shared.views.autocomplete.Connection", new=mock_context_manager
)
patcher.start()
self.addCleanup(patcher.stop)

View file

@ -1,12 +1,41 @@
from collections import namedtuple
from dal import autocomplete
from django.conf import settings
from django.db.models import Q
if getattr(settings, "LDAP_SERVER_URL", None):
from ldap3 import Connection
else:
# shared.tests.testcases.TestCaseMixin.mockLDAP needs
# Connection to be defined
Connection = None
class ModelSearch:
class SearchUnit:
"""Base class for all the search utilities.
A search unit should implement a `search` method taking a list of keywords as
argument and returning an iterable of search results.
"""
def search(self, _keywords):
raise NotImplementedError(
"Class implementing the SeachUnit interface should implement the search "
"method"
)
# ---
# Model-based search
# ---
class ModelSearch(SearchUnit):
"""Basic search engine for models based on filtering.
The class should be configured through its ``model`` class attribute: the ``search``
method will return a queryset of instances of this model. The ``search_fields``
The class should be configured through its `model` class attribute: the `search`
method will return a queryset of instances of this model. The `search_fields`
attributes indicates which fields to search in.
Example:
@ -55,3 +84,101 @@ class Select2QuerySetView(ModelSearch, autocomplete.Select2QuerySetView):
def get_queryset(self):
keywords = self.q.split()
return super().search(keywords)
# ---
# LDAP search
# ---
Clipper = namedtuple("Clipper", ["clipper", "fullname"])
class LDAPSearch(SearchUnit):
ldap_server_url = getattr(settings, "LDAP_SERVER_URL", None)
domain_component = "dc=spi,dc=ens,dc=fr"
search_fields = ["cn", "uid"]
def get_ldap_query(self, keywords):
"""Return a search query with the following semantics:
A Clipper appears in the search results iff all of the keywords given as
arguments occur in at least one of the search fields.
"""
# Dumb but safe
keywords = filter(str.isalnum, keywords)
ldap_filters = []
for keyword in keywords:
ldap_filter = "(|{})".format(
"".join(
"({}=*{}*)".format(field, keyword) for field in self.search_fields
)
)
ldap_filters.append(ldap_filter)
return "(&{})".format("".join(ldap_filters))
def search(self, keywords):
"""Return a list of Clipper objects matching all the keywords."""
query = self.get_ldap_query(keywords)
if Connection is None or query == "(&)":
return []
with Connection(self.ldap_server_url) as conn:
conn.search(self.domain_component, query, attributes=self.search_fields)
return [Clipper(entry.uid.value, entry.cn.value) for entry in conn.entries]
# ---
# Composition of autocomplete units
# ---
class Compose:
"""Search with several units and remove duplicate results.
The `search_units` class attribute should be a list of tuples of the form `(name,
uniq_key, search_unit)`.
The `search` method produces a dictionary whose keys are the `name`s given in
`search_units` and whose values are iterables produced by the different search
units.
The `uniq_key`s are used to remove duplicates: for instance, say that search unit
1 has `uniq_key = "username"` and search unit 2 has `uniq_key = "clipper"`, then
search results from unit 2 whose `.clipper` attribute is equal to the
`.username` attribute of some result from unit 1 are omitted.
Typical Example:
>>> from django.contrib.auth.models import User
>>>
>>> class UserSearch(ModelSearch):
... model = User
... search_fields = ["username", "first_name", "last_name"]
>>>
>>> class UserAndClipperSearch(Compose):
... search_units = [
... ("users", "username", UserSearch),
... ("clippers", "clipper", LDAPSearch),
... ]
In this example, clipper accounts that already have an associated user (i.e. with a
username equal to the clipper login), will not appear in the results.
"""
search_units = []
def search(self, keywords):
uniq_results = set()
results = {}
for name, uniq_key, search_unit in self.search_units:
res = search_unit().search(keywords)
res = [r for r in res if getattr(r, uniq_key) not in uniq_results]
uniq_results |= set((getattr(r, uniq_key) for r in res))
results[name] = res
return results