import logging from collections import namedtuple from dal import autocomplete from django.conf import settings from django.db.models import Q if getattr(settings, "LDAP_SERVER_URL", None): import ldap else: # shared.tests.testcases.TestCaseMixin.mockLDAP needs # an ldap object to be in the scope ldap = None django_logger = logging.getLogger("django.request") class SearchUnit: """Base class for all the search utilities. A search unit should implement a `search` method taking a list of keywords as argument and returning an iterable of search results. """ def search(self, _keywords): raise NotImplementedError( "Class implementing the SeachUnit interface should implement the search " "method" ) # --- # Model-based search # --- class ModelSearch(SearchUnit): """Basic search engine for models based on filtering. The class should be configured through its `model` class attribute: the `search` method will return a queryset of instances of this model. The `search_fields` attributes indicates which fields to search in. Example: >>> from django.contrib.auth.models import User >>> >>> class UserSearch(ModelSearch): ... model = User ... search_fields = ["username", "first_name", "last_name"] >>> >>> user_search = UserSearch() # has type ModelSearch[User] >>> user_search.search(["toto", "foo"]) # returns a queryset of Users """ model = None search_fields = [] def get_queryset_filter(self, keywords): filter_q = Q() if not keywords: return filter_q for keyword in keywords: kw_filter = Q() for field in self.search_fields: kw_filter |= Q(**{"{}__icontains".format(field): keyword}) filter_q &= kw_filter return filter_q def search(self, keywords): """Returns the queryset of model instances matching all the keywords. The semantic of the search is the following: a model instance appears in the search results iff all of the keywords given as arguments occur in at least one of the search fields. """ return self.model.objects.filter(self.get_queryset_filter(keywords)) class Select2QuerySetView(ModelSearch, autocomplete.Select2QuerySetView): """Compatibility layer between ModelSearch and Select2QuerySetView.""" def get_queryset(self): keywords = self.q.split() return super().search(keywords) # --- # LDAP search # --- Clipper = namedtuple("Clipper", ["clipper", "fullname"]) class LDAPSearch(SearchUnit): ldap_server_url = getattr(settings, "LDAP_SERVER_URL", None) domain_component = "dc=spi,dc=ens,dc=fr" search_fields = ["cn", "uid"] def get_ldap_query(self, keywords): """Return a search query with the following semantics: A Clipper appears in the search results iff all of the keywords given as arguments occur in at least one of the search fields. """ # Dumb but safe keywords = filter(str.isalnum, keywords) ldap_filters = [] for keyword in keywords: ldap_filter = "(|{})".format( "".join( "({}=*{}*)".format(field, keyword) for field in self.search_fields ) ) ldap_filters.append(ldap_filter) return "(&{})".format("".join(ldap_filters)) def search(self, keywords): """Return a list of Clipper objects matching all the keywords.""" query = self.get_ldap_query(keywords) if ldap is None or query == "(&)": return [] try: ldap_obj = ldap.initialize(self.ldap_server_url) res = ldap_obj.search_s( self.domain_component, ldap.SCOPE_SUBTREE, query, self.search_fields ) return [ Clipper( clipper=attrs["uid"][0].decode("utf-8"), fullname=attrs["cn"][0].decode("utf-8"), ) for (_, attrs) in res ] except ldap.LDAPError as err: django_logger.error("An LDAP error occurred", exc_info=err) return [] # --- # Composition of autocomplete units # --- class Compose: """Search with several units and remove duplicate results. The `search_units` class attribute should be a list of tuples of the form `(name, uniq_key, search_unit)`. The `search` method produces a dictionary whose keys are the `name`s given in `search_units` and whose values are iterables produced by the different search units. The `uniq_key`s are used to remove duplicates: for instance, say that search unit 1 has `uniq_key = "username"` and search unit 2 has `uniq_key = "clipper"`, then search results from unit 2 whose `.clipper` attribute is equal to the `.username` attribute of some result from unit 1 are omitted. Typical Example: >>> from django.contrib.auth.models import User >>> >>> class UserSearch(ModelSearch): ... model = User ... search_fields = ["username", "first_name", "last_name"] >>> >>> class UserAndClipperSearch(Compose): ... search_units = [ ... ("users", "username", UserSearch), ... ("clippers", "clipper", LDAPSearch), ... ] In this example, clipper accounts that already have an associated user (i.e. with a username equal to the clipper login), will not appear in the results. """ search_units = [] def search(self, keywords): uniq_results = set() results = {} for name, uniq_key, search_unit in self.search_units: res = search_unit().search(keywords) res = [r for r in res if getattr(r, uniq_key) not in uniq_results] uniq_results |= set((getattr(r, uniq_key) for r in res)) results[name] = res return results