forked from DGNum/gestioCOF
Use the new shared autocomplete framework in kfet/
This commit is contained in:
parent
b9ba0a3829
commit
c5adc6b7d8
4 changed files with 79 additions and 126 deletions
|
@ -1,134 +1,89 @@
|
|||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.mixins import PermissionRequiredMixin
|
||||
from django.db.models import Q
|
||||
from django.http import Http404
|
||||
from django.shortcuts import render
|
||||
from django.views.generic import TemplateView
|
||||
|
||||
from gestioncof.models import User
|
||||
from kfet.decorators import teamkfet_required
|
||||
from kfet.models import Account
|
||||
from shared.views import autocomplete
|
||||
|
||||
if getattr(settings, "LDAP_SERVER_URL", None):
|
||||
from ldap3 import Connection
|
||||
else:
|
||||
# shared.tests.testcases.TestCaseMixin.mockLDAP needs
|
||||
# Connection to be defined in order to mock it.
|
||||
Connection = None
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
class Clipper(object):
|
||||
def __init__(self, clipper, fullname):
|
||||
if fullname is None:
|
||||
fullname = ""
|
||||
assert isinstance(clipper, str)
|
||||
assert isinstance(fullname, str)
|
||||
self.clipper = clipper
|
||||
self.fullname = fullname
|
||||
class KfetAccountSearch(autocomplete.ModelSearch):
|
||||
model = User
|
||||
search_fields = [
|
||||
"username",
|
||||
"first_name",
|
||||
"last_name",
|
||||
"profile__account_kfet__trigramme",
|
||||
]
|
||||
|
||||
def get_queryset_filter(self, *args, **kwargs):
|
||||
qset_filter = super().get_queryset_filter(*args, **kwargs)
|
||||
qset_filter &= Q(profile__account_kfet__isnull=False)
|
||||
return qset_filter
|
||||
|
||||
|
||||
@teamkfet_required
|
||||
def account_create(request):
|
||||
if "q" not in request.GET:
|
||||
class COFMemberSearch(autocomplete.ModelSearch):
|
||||
model = User
|
||||
search_fields = ["username", "first_name", "last_name"]
|
||||
|
||||
def get_queryset_filter(self, *args, **kwargs):
|
||||
qset_filter = super().get_queryset_filter(*args, **kwargs)
|
||||
qset_filter &= Q(profile__account_kfet__isnull=True) & Q(profile__is_cof=True)
|
||||
return qset_filter
|
||||
|
||||
|
||||
class OthersSearch(autocomplete.ModelSearch):
|
||||
model = User
|
||||
search_fields = ["username", "first_name", "last_name"]
|
||||
|
||||
def get_queryset_filter(self, *args, **kwargs):
|
||||
qset_filter = super().get_queryset_filter(*args, **kwargs)
|
||||
qset_filter &= Q(profile__account_kfet__isnull=True) & Q(profile__is_cof=False)
|
||||
return qset_filter
|
||||
|
||||
|
||||
class KfetAutocomplete(autocomplete.Compose):
|
||||
search_units = [
|
||||
("kfet", "username", KfetAccountSearch),
|
||||
("users_cof", "username", COFMemberSearch),
|
||||
("users_notcof", "username", OthersSearch),
|
||||
("clippers", "clipper", autocomplete.LDAPSearch),
|
||||
]
|
||||
|
||||
|
||||
kfet_autocomplete = KfetAutocomplete()
|
||||
|
||||
|
||||
class AccountCreateAutocompleteView(PermissionRequiredMixin, TemplateView):
|
||||
template_name = "kfet/account_create_autocomplete.html"
|
||||
permission_required = "kfet.is_team"
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
ctx = super().get_context_data(**kwargs)
|
||||
if "q" not in self.request.GET:
|
||||
raise Http404
|
||||
q = request.GET.get("q")
|
||||
|
||||
if len(q) == 0:
|
||||
return render(request, "kfet/account_create_autocomplete.html")
|
||||
|
||||
data = {"q": q}
|
||||
|
||||
queries = {}
|
||||
search_words = q.split()
|
||||
|
||||
# Fetching data from User, CofProfile and Account tables
|
||||
queries["kfet"] = Account.objects
|
||||
queries["users_cof"] = User.objects.filter(profile__is_cof=True)
|
||||
queries["users_notcof"] = User.objects.filter(profile__is_cof=False)
|
||||
|
||||
for word in search_words:
|
||||
queries["kfet"] = queries["kfet"].filter(
|
||||
Q(cofprofile__user__username__icontains=word)
|
||||
| Q(cofprofile__user__first_name__icontains=word)
|
||||
| Q(cofprofile__user__last_name__icontains=word)
|
||||
)
|
||||
queries["users_cof"] = queries["users_cof"].filter(
|
||||
Q(username__icontains=word)
|
||||
| Q(first_name__icontains=word)
|
||||
| Q(last_name__icontains=word)
|
||||
)
|
||||
queries["users_notcof"] = queries["users_notcof"].filter(
|
||||
Q(username__icontains=word)
|
||||
| Q(first_name__icontains=word)
|
||||
| Q(last_name__icontains=word)
|
||||
)
|
||||
|
||||
# Clearing redundancies
|
||||
queries["kfet"] = queries["kfet"].distinct()
|
||||
usernames = set(
|
||||
queries["kfet"].values_list("cofprofile__user__username", flat=True)
|
||||
)
|
||||
queries["kfet"] = [
|
||||
(account, account.cofprofile.user) for account in queries["kfet"]
|
||||
]
|
||||
|
||||
queries["users_cof"] = (
|
||||
queries["users_cof"].exclude(username__in=usernames).distinct()
|
||||
)
|
||||
queries["users_notcof"] = (
|
||||
queries["users_notcof"].exclude(username__in=usernames).distinct()
|
||||
)
|
||||
usernames |= set(queries["users_cof"].values_list("username", flat=True))
|
||||
usernames |= set(queries["users_notcof"].values_list("username", flat=True))
|
||||
|
||||
# Fetching data from the SPI
|
||||
if getattr(settings, "LDAP_SERVER_URL", None):
|
||||
# Fetching
|
||||
ldap_query = "(&{:s})".format(
|
||||
"".join(
|
||||
"(|(cn=*{bit:s}*)(uid=*{bit:s}*))".format(bit=word)
|
||||
for word in search_words
|
||||
if word.isalnum()
|
||||
)
|
||||
)
|
||||
if ldap_query != "(&)":
|
||||
# If none of the bits were legal, we do not perform the query
|
||||
entries = None
|
||||
with Connection(settings.LDAP_SERVER_URL) as conn:
|
||||
conn.search("dc=spi,dc=ens,dc=fr", ldap_query, attributes=["uid", "cn"])
|
||||
entries = conn.entries
|
||||
# Clearing redundancies
|
||||
queries["clippers"] = [
|
||||
Clipper(entry.uid.value, entry.cn.value)
|
||||
for entry in entries
|
||||
if entry.uid.value and entry.uid.value not in usernames
|
||||
]
|
||||
|
||||
# Resulting data
|
||||
data.update(queries)
|
||||
data["options"] = sum([len(query) for query in queries])
|
||||
|
||||
return render(request, "kfet/account_create_autocomplete.html", data)
|
||||
q = self.request.GET["q"]
|
||||
ctx["q"] = q
|
||||
results = kfet_autocomplete.search(q.split())
|
||||
ctx["options"] = sum((len(res) for res in results.values()))
|
||||
ctx.update(results)
|
||||
return ctx
|
||||
|
||||
|
||||
@teamkfet_required
|
||||
def account_search(request):
|
||||
if "q" not in request.GET:
|
||||
class AccountSearchAutocompleteView(PermissionRequiredMixin, TemplateView):
|
||||
template_name = "kfet/account_search_autocomplete.html"
|
||||
permission_required = "kfet.is_team"
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
ctx = super().get_context_data(**kwargs)
|
||||
if "q" not in self.request.GET:
|
||||
raise Http404
|
||||
q = request.GET.get("q")
|
||||
words = q.split()
|
||||
|
||||
data = {"q": q}
|
||||
|
||||
for word in words:
|
||||
query = Account.objects.filter(
|
||||
Q(cofprofile__user__username__icontains=word)
|
||||
| Q(cofprofile__user__first_name__icontains=word)
|
||||
| Q(cofprofile__user__last_name__icontains=word)
|
||||
).distinct()
|
||||
|
||||
query = [
|
||||
(account.trigramme, account.cofprofile.user.get_full_name())
|
||||
for account in query
|
||||
q = self.request.GET["q"]
|
||||
ctx["q"] = q
|
||||
ctx["accounts"] = [
|
||||
(user.profile.account_kfet.trigramme, user.get_full_name())
|
||||
for user in KfetAccountSearch().search(q.split())
|
||||
]
|
||||
|
||||
data["accounts"] = query
|
||||
return render(request, "kfet/account_search_autocomplete.html", data)
|
||||
return ctx
|
||||
|
|
|
@ -8,8 +8,8 @@
|
|||
</li>
|
||||
{% if kfet %}
|
||||
<li class="user_category"><span class="text">Comptes existants</span></li>
|
||||
{% for account, user in kfet %}
|
||||
<li><span class="text">{{ account }} [{{ user|highlight_user:q }}]</span></li>
|
||||
{% for user in kfet %}
|
||||
<li><span class="text">{{ user.account_kfet.account }} [{{ user|highlight_user:q }}]</span></li>
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
{% if users_cof %}
|
||||
|
|
|
@ -183,9 +183,7 @@ class AccountCreateAutocompleteViewTests(ViewTestCaseMixin, TestCase):
|
|||
self.assertEqual(r.status_code, 200)
|
||||
self.assertEqual(len(r.context["users_notcof"]), 0)
|
||||
self.assertEqual(len(r.context["users_cof"]), 0)
|
||||
self.assertSetEqual(
|
||||
set(r.context["kfet"]), set([(self.accounts["user"], self.users["user"])])
|
||||
)
|
||||
self.assertSetEqual(set(r.context["kfet"]), set([self.users["user"]]))
|
||||
|
||||
|
||||
class AccountSearchViewTests(ViewTestCaseMixin, TestCase):
|
||||
|
|
|
@ -38,13 +38,13 @@ urlpatterns = [
|
|||
),
|
||||
path(
|
||||
"autocomplete/account_new",
|
||||
autocomplete.account_create,
|
||||
autocomplete.AccountCreateAutocompleteView.as_view(),
|
||||
name="kfet.account.create.autocomplete",
|
||||
),
|
||||
# Account - Search
|
||||
path(
|
||||
"autocomplete/account_search",
|
||||
autocomplete.account_search,
|
||||
autocomplete.AccountSearchAutocompleteView.as_view(),
|
||||
name="kfet.account.search.autocomplete",
|
||||
),
|
||||
# Account - Read
|
||||
|
|
Loading…
Reference in a new issue