kpsul/shared/views/autocomplete.py

227 lines
6.8 KiB
Python
Raw Normal View History

2020-06-28 23:48:20 +02:00
import logging
2019-12-11 22:00:10 +01:00
from collections import namedtuple
from dal import autocomplete
2019-12-11 22:00:10 +01:00
from django.conf import settings
from django.db.models import Q
2020-01-02 16:01:13 +01:00
2019-12-11 22:00:10 +01:00
if getattr(settings, "LDAP_SERVER_URL", None):
import ldap
2019-12-11 22:00:10 +01:00
else:
# shared.tests.testcases.TestCaseMixin.mockLDAP needs
# an ldap object to be in the scope
ldap = None
2019-12-11 22:00:10 +01:00
2020-06-28 23:48:20 +02:00
django_logger = logging.getLogger("django.request")
2019-12-11 22:00:10 +01:00
class SearchUnit:
"""Base class for all the search utilities.
2020-05-07 15:44:37 +02:00
A search unit should implement a `search` method taking a list of keywords as
2019-12-11 22:00:10 +01:00
argument and returning an iterable of search results.
It might optionally implement the following methods and attributes:
- result_uuid (method): a callable that takes one result as an input and returns an
identifier that is globally unique across search units for this object.
This is used to compare results coming from different search units in the
`Compose` class. For instance, if the same user can be returned by the LDAP
search and a model search instance, using the clipper login as a UUID in both
units avoids this user to be returned twice by `Compose`.
Returning `None` means that the object should be considered unique.
2019-12-11 22:00:10 +01:00
"""
# Mandatory method
2019-12-11 22:00:10 +01:00
def search(self, _keywords):
raise NotImplementedError(
"Class implementing the SearchUnit interface should implement the search "
2019-12-11 22:00:10 +01:00
"method"
)
2020-01-02 16:01:13 +01:00
# Optional attributes and methods
def result_uuid(self, result):
"""A universal unique identifier for the search results."""
return None
2019-12-11 22:00:10 +01:00
# ---
# Model-based search
# ---
class ModelSearch(SearchUnit):
2020-01-02 16:01:13 +01:00
"""Basic search engine for models based on filtering.
2020-05-07 15:44:37 +02:00
The class should be configured through its `model` class attribute: the `search`
method will return a queryset of instances of this model. The `search_fields`
attributes indicates which fields to search in.
2020-01-03 17:26:12 +01:00
Example:
>>> from django.contrib.auth.models import User
>>>
>>> class UserSearch(ModelSearch):
... model = User
... search_fields = ["username", "first_name", "last_name"]
>>>
>>> user_search = UserSearch() # has type ModelSearch[User]
>>> user_search.search(["toto", "foo"]) # returns a queryset of Users
2020-01-02 16:01:13 +01:00
"""
model = None
search_fields = []
2020-01-02 16:01:13 +01:00
def get_queryset_filter(self, keywords):
2020-01-02 16:01:13 +01:00
filter_q = Q()
if not keywords:
return filter_q
for keyword in keywords:
kw_filter = Q()
for field in self.search_fields:
kw_filter |= Q(**{"{}__icontains".format(field): keyword})
filter_q &= kw_filter
return filter_q
def search(self, keywords):
2020-01-02 16:01:13 +01:00
"""Returns the queryset of model instances matching all the keywords.
The semantic of the search is the following: a model instance appears in the
search results iff all of the keywords given as arguments occur in at least one
of the search fields.
"""
return self.model.objects.filter(self.get_queryset_filter(keywords))
class Select2QuerySetView(ModelSearch, autocomplete.Select2QuerySetView):
"""Compatibility layer between ModelSearch and Select2QuerySetView."""
def get_queryset(self):
keywords = self.q.split()
return super().search(keywords)
2019-12-11 22:00:10 +01:00
# ---
# LDAP search
# ---
2020-05-07 15:44:37 +02:00
Clipper = namedtuple("Clipper", ["clipper", "fullname"])
2019-12-11 22:00:10 +01:00
class LDAPSearch(SearchUnit):
ldap_server_url = getattr(settings, "LDAP_SERVER_URL", None)
domain_component = "dc=spi,dc=ens,dc=fr"
search_fields = ["cn", "uid"]
def get_ldap_query(self, keywords):
2020-05-07 15:44:37 +02:00
"""Return a search query with the following semantics:
A Clipper appears in the search results iff all of the keywords given as
arguments occur in at least one of the search fields.
"""
2019-12-11 22:00:10 +01:00
# Dumb but safe
keywords = filter(str.isalnum, keywords)
ldap_filters = []
for keyword in keywords:
ldap_filter = "(|{})".format(
"".join(
"({}=*{}*)".format(field, keyword) for field in self.search_fields
)
)
ldap_filters.append(ldap_filter)
return "(&{})".format("".join(ldap_filters))
def search(self, keywords):
2020-05-07 15:44:37 +02:00
"""Return a list of Clipper objects matching all the keywords."""
2019-12-11 22:00:10 +01:00
query = self.get_ldap_query(keywords)
if ldap is None or query == "(&)":
2019-12-11 22:00:10 +01:00
return []
2020-06-28 23:48:20 +02:00
try:
ldap_obj = ldap.initialize(self.ldap_server_url)
res = ldap_obj.search_s(
self.domain_component, ldap.SCOPE_SUBTREE, query, self.search_fields
)
2020-06-28 23:48:20 +02:00
return [
Clipper(
clipper=attrs["uid"][0].decode("utf-8"),
fullname=attrs["cn"][0].decode("utf-8"),
)
for (_, attrs) in res
]
except ldap.LDAPError as err:
django_logger.error("An LDAP error occurred", exc_info=err)
return []
2019-12-11 22:00:10 +01:00
def result_uuid(self, clipper):
return clipper.clipper
2019-12-11 22:00:10 +01:00
# ---
# Composition of autocomplete units
# ---
class Compose:
"""Search with several units and remove duplicate results.
The `search_units` class attribute should be a list of pairs of the form `(name,
search_unit)`.
2019-12-11 22:00:10 +01:00
2020-05-07 15:44:37 +02:00
The `search` method produces a dictionary whose keys are the `name`s given in
`search_units` and whose values are iterables produced by the different search
2019-12-11 22:00:10 +01:00
units.
Typical Example:
>>> from django.contrib.auth.models import User
>>>
>>> class UserSearch(ModelSearch):
... model = User
... search_fields = ["username", "first_name", "last_name"]
...
... def result_uuid(self, user):
... # Assuming that `.username` stores the clipper login of already
... # registered this avoids showing the same user twice (here and in the
... # ldap unit).
... return user.username
2019-12-11 22:00:10 +01:00
>>>
>>> class UserAndClipperSearch(Compose):
... search_units = [
... ("users", UserSearch()),
... ("clippers", LDAPSearch()),
2019-12-11 22:00:10 +01:00
... ]
In this example, clipper accounts that already have an associated user (i.e. with a
username equal to the clipper login), will not appear in the results.
"""
search_units = []
def search(self, keywords):
seen_uuids = set()
2019-12-11 22:00:10 +01:00
results = {}
for name, search_unit in self.search_units:
uniq_res = []
for r in search_unit.search(keywords):
uuid = search_unit.result_uuid(r)
if uuid is None or uuid not in seen_uuids:
uniq_res.append(r)
if uuid is not None:
seen_uuids.add(uuid)
results[name] = uniq_res
2019-12-11 22:00:10 +01:00
return results