import logging from collections import namedtuple from django.conf import settings from django.db.models import Q if getattr(settings, "LDAP_SERVER_URL", None): import ldap else: # shared.tests.testcases.TestCaseMixin.mockLDAP needs # an ldap object to be in the scope ldap = None django_logger = logging.getLogger("django.request") class SearchUnit: """Base class for all the search utilities. A search unit should implement a `search` method taking a list of keywords as argument and returning an iterable of search results. It might optionally implement the following methods and attributes: - result_uuid (method): a callable that takes one result as an input and returns an identifier that is globally unique across search units for this object. This is used to compare results coming from different search units in the `Compose` class. For instance, if the same user can be returned by the LDAP search and a model search instance, using the clipper login as a UUID in both units avoids this user to be returned twice by `Compose`. Returning `None` means that the object should be considered unique. """ # Mandatory method def search(self, _keywords): raise NotImplementedError( "Class implementing the SearchUnit interface should implement the search " "method" ) # Optional attributes and methods def result_uuid(self, result): """A universal unique identifier for the search results.""" return None # --- # Model-based search # --- class ModelSearch(SearchUnit): """Basic search engine for models based on filtering. The class should be configured through its `model` class attribute: the `search` method will return a queryset of instances of this model. The `search_fields` attributes indicates which fields to search in. Example: >>> from django.contrib.auth.models import User >>> >>> class UserSearch(ModelSearch): ... model = User ... search_fields = ["username", "first_name", "last_name"] >>> >>> user_search = UserSearch() # has type ModelSearch[User] >>> user_search.search(["toto", "foo"]) # returns a queryset of Users """ model = None search_fields = [] def get_queryset_filter(self, keywords): filter_q = Q() if not keywords: return filter_q for keyword in keywords: kw_filter = Q() for field in self.search_fields: kw_filter |= Q(**{"{}__icontains".format(field): keyword}) filter_q &= kw_filter return filter_q def search(self, keywords): """Returns the queryset of model instances matching all the keywords. The semantic of the search is the following: a model instance appears in the search results iff all of the keywords given as arguments occur in at least one of the search fields. """ return self.model.objects.filter(self.get_queryset_filter(keywords)) # --- # LDAP search # --- Clipper = namedtuple("Clipper", ["clipper", "fullname"]) class LDAPSearch(SearchUnit): ldap_server_url = getattr(settings, "LDAP_SERVER_URL", None) domain_component = "dc=spi,dc=ens,dc=fr" search_fields = ["cn", "uid"] def get_ldap_query(self, keywords): """Return a search query with the following semantics: A Clipper appears in the search results iff all of the keywords given as arguments occur in at least one of the search fields. """ # Dumb but safe keywords = filter(str.isalnum, keywords) ldap_filters = [] for keyword in keywords: ldap_filter = "(|{})".format( "".join( "({}=*{}*)".format(field, keyword) for field in self.search_fields ) ) ldap_filters.append(ldap_filter) return "(&{})".format("".join(ldap_filters)) def search(self, keywords): """Return a list of Clipper objects matching all the keywords.""" query = self.get_ldap_query(keywords) if ldap is None or query == "(&)": return [] try: ldap_obj = ldap.initialize(self.ldap_server_url) res = ldap_obj.search_s( self.domain_component, ldap.SCOPE_SUBTREE, query, self.search_fields ) return [ Clipper( clipper=attrs["uid"][0].decode("utf-8"), fullname=attrs["cn"][0].decode("utf-8"), ) for (_, attrs) in res ] except ldap.LDAPError as err: django_logger.error("An LDAP error occurred", exc_info=err) return [] def result_uuid(self, clipper): return clipper.clipper # --- # Composition of autocomplete units # --- class Compose: """Search with several units and remove duplicate results. The `search_units` class attribute should be a list of pairs of the form `(name, search_unit)`. The `search` method produces a dictionary whose keys are the `name`s given in `search_units` and whose values are iterables produced by the different search units. Typical Example: >>> from django.contrib.auth.models import User >>> >>> class UserSearch(ModelSearch): ... model = User ... search_fields = ["username", "first_name", "last_name"] ... ... def result_uuid(self, user): ... # Assuming that `.username` stores the clipper login of already ... # registered users, this avoids showing the same user twice (here and in ... # then ldap unit). ... return user.username >>> >>> class UserAndClipperSearch(Compose): ... search_units = [ ... ("users", UserSearch()), ... ("clippers", LDAPSearch()), ... ] In this example, clipper accounts that already have an associated user (i.e. with a username equal to the clipper login), will not appear in the results. """ search_units = [] def search(self, keywords): seen_uuids = set() results = {} for name, search_unit in self.search_units: uniq_res = [] for r in search_unit.search(keywords): uuid = search_unit.result_uuid(r) if uuid is None or uuid not in seen_uuids: uniq_res.append(r) if uuid is not None: seen_uuids.add(uuid) results[name] = uniq_res return results