kpsul/shared/autocomplete.py

252 lines
7.9 KiB
Python
Raw Normal View History

2020-06-28 23:48:20 +02:00
import logging
2019-12-11 22:00:10 +01:00
from collections import namedtuple
from django.conf import settings
from django.db.models import Q
2020-07-01 22:29:07 +02:00
from django.utils.translation import gettext_lazy as _
2020-01-02 16:01:13 +01:00
2019-12-11 22:00:10 +01:00
if getattr(settings, "LDAP_SERVER_URL", None):
import ldap
2019-12-11 22:00:10 +01:00
else:
# shared.tests.testcases.TestCaseMixin.mockLDAP needs
# an ldap object to be in the scope
ldap = None
2019-12-11 22:00:10 +01:00
2020-06-28 23:48:20 +02:00
django_logger = logging.getLogger("django.request")
2019-12-11 22:00:10 +01:00
class SearchUnit:
"""Base class for all the search utilities.
2020-05-07 15:44:37 +02:00
A search unit should implement a `search` method taking a list of keywords as
2019-12-11 22:00:10 +01:00
argument and returning an iterable of search results.
It might optionally implement the following methods and attributes:
2020-07-01 22:29:07 +02:00
- verbose_name (attribute): a nice name to refer to the results of this search unit
in templates. Examples: "COF Members", "K-Fêt accounts", etc.
- result_verbose_name (method): a callable that takes one search result as an input
and returns a nice name to refer to this particular result in templates.
Example: `lambda user: user.get_full_name()`
- result_link (method): a callable that takes one search result and returns a url
to make this particular search result clickable on the search page. For instance
this can be a link to a detail view of the object.
- result_uuid (method): a callable that takes one result as an input and returns an
identifier that is globally unique across search units for this object.
This is used to compare results coming from different search units in the
`Compose` class. For instance, if the same user can be returned by the LDAP
search and a model search instance, using the clipper login as a UUID in both
units avoids this user to be returned twice by `Compose`.
Returning `None` means that the object should be considered unique.
2019-12-11 22:00:10 +01:00
"""
# Mandatory method
2019-12-11 22:00:10 +01:00
def search(self, _keywords):
raise NotImplementedError(
"Class implementing the SearchUnit interface should implement the search "
2019-12-11 22:00:10 +01:00
"method"
)
2020-01-02 16:01:13 +01:00
# Optional attributes and methods
2020-07-01 22:29:07 +02:00
verbose_name = None
def result_verbose_name(self, result):
"""Hook to customize the way results are displayed."""
return str(result)
def result_link(self, result):
"""Hook to add a link on individual results on the search page."""
return None
def result_uuid(self, result):
"""A universal unique identifier for the search results."""
return None
2019-12-11 22:00:10 +01:00
# ---
# Model-based search
# ---
class ModelSearch(SearchUnit):
2020-01-02 16:01:13 +01:00
"""Basic search engine for models based on filtering.
2020-05-07 15:44:37 +02:00
The class should be configured through its `model` class attribute: the `search`
method will return a queryset of instances of this model. The `search_fields`
attributes indicates which fields to search in.
2020-01-03 17:26:12 +01:00
Example:
>>> from django.contrib.auth.models import User
>>>
>>> class UserSearch(ModelSearch):
... model = User
... search_fields = ["username", "first_name", "last_name"]
>>>
>>> user_search = UserSearch() # has type ModelSearch[User]
>>> user_search.search(["toto", "foo"]) # returns a queryset of Users
2020-01-02 16:01:13 +01:00
"""
model = None
search_fields = []
2020-01-02 16:01:13 +01:00
2020-07-01 22:29:07 +02:00
def __init__(self):
if self.verbose_name is None:
2020-10-21 16:02:01 +02:00
self.verbose_name = "{} search".format(self.model._meta.verbose_name)
2020-07-01 22:29:07 +02:00
def get_queryset_filter(self, keywords):
2020-01-02 16:01:13 +01:00
filter_q = Q()
if not keywords:
return filter_q
for keyword in keywords:
kw_filter = Q()
for field in self.search_fields:
kw_filter |= Q(**{"{}__icontains".format(field): keyword})
filter_q &= kw_filter
return filter_q
def search(self, keywords):
2020-01-02 16:01:13 +01:00
"""Returns the queryset of model instances matching all the keywords.
The semantic of the search is the following: a model instance appears in the
search results iff all of the keywords given as arguments occur in at least one
of the search fields.
"""
return self.model.objects.filter(self.get_queryset_filter(keywords))
2019-12-11 22:00:10 +01:00
# ---
# LDAP search
# ---
2020-08-03 14:30:12 +02:00
Clipper = namedtuple("Clipper", ["clipper", "fullname", "mail"])
2019-12-11 22:00:10 +01:00
class LDAPSearch(SearchUnit):
ldap_server_url = getattr(settings, "LDAP_SERVER_URL", None)
domain_component = "dc=spi,dc=ens,dc=fr"
2020-08-03 14:54:58 +02:00
search_fields = ["cn", "uid"]
attr_list = ["cn", "uid", "mail"]
2019-12-11 22:00:10 +01:00
2020-07-01 22:29:07 +02:00
verbose_name = _("Comptes clippers")
2019-12-11 22:00:10 +01:00
def get_ldap_query(self, keywords):
2020-05-07 15:44:37 +02:00
"""Return a search query with the following semantics:
A Clipper appears in the search results iff all of the keywords given as
arguments occur in at least one of the search fields.
"""
2019-12-11 22:00:10 +01:00
# Dumb but safe
keywords = filter(str.isalnum, keywords)
ldap_filters = []
for keyword in keywords:
ldap_filter = "(|{})".format(
"".join(
"({}=*{}*)".format(field, keyword) for field in self.search_fields
)
)
ldap_filters.append(ldap_filter)
return "(&{})".format("".join(ldap_filters))
def search(self, keywords):
2020-05-07 15:44:37 +02:00
"""Return a list of Clipper objects matching all the keywords."""
2019-12-11 22:00:10 +01:00
query = self.get_ldap_query(keywords)
if ldap is None or query == "(&)":
2019-12-11 22:00:10 +01:00
return []
2020-06-28 23:48:20 +02:00
try:
ldap_obj = ldap.initialize(self.ldap_server_url)
res = ldap_obj.search_s(
2020-08-03 14:54:58 +02:00
self.domain_component, ldap.SCOPE_SUBTREE, query, self.attr_list
)
2020-06-28 23:48:20 +02:00
return [
Clipper(
clipper=attrs["uid"][0].decode("utf-8"),
fullname=attrs["cn"][0].decode("utf-8"),
2020-08-03 14:30:12 +02:00
mail=attrs["mail"][0].decode("utf-8"),
2020-06-28 23:48:20 +02:00
)
for (_, attrs) in res
if "uid" in attrs # Hack to discard weird accounts like root
2020-06-28 23:48:20 +02:00
]
except ldap.LDAPError as err:
django_logger.error("An LDAP error occurred", exc_info=err)
return []
2019-12-11 22:00:10 +01:00
2020-07-01 22:29:07 +02:00
def result_verbose_name(self, clipper):
return "{} ({})".format(clipper.fullname, clipper.clipper)
def result_uuid(self, clipper):
return clipper.clipper
2019-12-11 22:00:10 +01:00
# ---
# Composition of autocomplete units
# ---
class Compose:
"""Search with several units and remove duplicate results.
The `search_units` class attribute should be a list of pairs of the form `(name,
search_unit)`.
2019-12-11 22:00:10 +01:00
2020-05-07 15:44:37 +02:00
The `search` method produces a dictionary whose keys are the `name`s given in
`search_units` and whose values are iterables produced by the different search
2019-12-11 22:00:10 +01:00
units.
Typical Example:
>>> from django.contrib.auth.models import User
>>>
>>> class UserSearch(ModelSearch):
... model = User
... search_fields = ["username", "first_name", "last_name"]
...
... def result_uuid(self, user):
... # Assuming that `.username` stores the clipper login of already
2020-07-05 11:14:51 +02:00
... # registered users, this avoids showing the same user twice (here and in
... # then ldap unit).
... return user.username
2019-12-11 22:00:10 +01:00
>>>
>>> class UserAndClipperSearch(Compose):
... search_units = [
... ("users", UserSearch()),
... ("clippers", LDAPSearch()),
2019-12-11 22:00:10 +01:00
... ]
In this example, clipper accounts that already have an associated user (i.e. with a
username equal to the clipper login), will not appear in the results.
"""
search_units = []
def search(self, keywords):
seen_uuids = set()
2019-12-11 22:00:10 +01:00
results = {}
for name, search_unit in self.search_units:
uniq_res = []
for r in search_unit.search(keywords):
uuid = search_unit.result_uuid(r)
if uuid is None or uuid not in seen_uuids:
uniq_res.append(r)
if uuid is not None:
seen_uuids.add(uuid)
results[name] = uniq_res
2019-12-11 22:00:10 +01:00
return results