-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathviews.py
361 lines (291 loc) · 12.9 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
import re
from django.db.models import Q
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework import generics, status, viewsets
from rest_framework.exceptions import ValidationError
from rest_framework.permissions import BasePermission, IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
import custom_view
from airone.lib.acl import ACLType
from airone.lib.types import AttrTypeValue
from entity.models import Entity
from entry.api_v2.pagination import EntryReferralPagination
from entry.api_v2.serializers import (
EntryBaseSerializer,
EntryCopySerializer,
EntryExportSerializer,
EntryRetrieveSerializer,
EntryUpdateSerializer,
GetEntrySimpleSerializer,
)
from entry.models import AttributeValue, Entry
from entry.settings import CONFIG as ENTRY_CONFIG
from job.models import Job
from user.models import User
class EntryPermission(BasePermission):
def has_object_permission(self, request, view, obj):
user: User = request.user
permisson = {
"retrieve": ACLType.Readable,
"update": ACLType.Writable,
"destroy": ACLType.Full,
"restore": ACLType.Full,
"copy": ACLType.Full,
}
if not user.has_permission(obj, permisson.get(view.action)):
return False
return True
class EntryAPI(viewsets.ModelViewSet):
queryset = Entry.objects.all()
permission_classes = [IsAuthenticated & EntryPermission]
def get_serializer_class(self):
serializer = {
"retrieve": EntryRetrieveSerializer,
"update": EntryUpdateSerializer,
"copy": EntryCopySerializer,
}
return serializer.get(self.action, EntryBaseSerializer)
def destroy(self, request, pk):
entry: Entry = self.get_object()
if not entry.is_active:
raise ValidationError("specified entry has already been deleted")
user: User = request.user
if custom_view.is_custom("before_delete_entry", entry.schema.name):
custom_view.call_custom("before_delete_entry", entry.schema.name, user, entry)
# register operation History for deleting entry
user.seth_entry_del(entry)
# delete entry
entry.delete(deleted_user=user)
if custom_view.is_custom("after_delete_entry", entry.schema.name):
custom_view.call_custom("after_delete_entry", entry.schema.name, user, entry)
# Send notification to the webhook URL
job_notify: Job = Job.new_notify_delete_entry(user, entry)
job_notify.run()
return Response(status=status.HTTP_204_NO_CONTENT)
def restore(self, request, pk):
entry: Entry = self.get_object()
if entry.is_active:
raise ValidationError("specified entry has not deleted")
# checks that a same name entry corresponding to the entity is existed, or not.
if Entry.objects.filter(
schema=entry.schema, name=re.sub(r"_deleted_[0-9_]*$", "", entry.name), is_active=True
).exists():
raise ValidationError("specified entry has already exist other")
user: User = request.user
if custom_view.is_custom("before_restore_entry", entry.schema.name):
custom_view.call_custom("before_restore_entry", entry.schema.name, user, entry)
entry.set_status(Entry.STATUS_CREATING)
# restore entry
entry.restore()
if custom_view.is_custom("after_restore_entry", entry.schema.name):
custom_view.call_custom("after_restore_entry", entry.schema.name, user, entry)
# remove status flag which is set before calling this
entry.del_status(Entry.STATUS_CREATING)
# Send notification to the webhook URL
job_notify_event = Job.new_notify_create_entry(user, entry)
job_notify_event.run()
return Response(status=status.HTTP_201_CREATED)
def copy(self, request, pk):
src_entry: Entry = self.get_object()
if not src_entry.is_active:
raise ValidationError("specified entry is not active")
# validate post parameter
serializer = self.get_serializer(src_entry, data=request.data)
serializer.is_valid(raise_exception=True)
# TODO Conversion to support the old UI
params = {
"new_name_list": request.data["copy_entry_names"],
"post_data": request.data,
}
# run copy job
job = Job.new_copy(request.user, src_entry, text="Preparing to copy entry", params=params)
job.run()
return Response({}, status=status.HTTP_200_OK)
@extend_schema(
parameters=[
OpenApiParameter("query", OpenApiTypes.STR, OpenApiParameter.QUERY),
],
)
class searchAPI(viewsets.ReadOnlyModelViewSet):
serializer_class = EntryBaseSerializer
def get_queryset(self):
queryset = []
query = self.request.query_params.get("query", None)
if not query:
return queryset
results = Entry.search_entries_for_simple(query, limit=ENTRY_CONFIG.MAX_SEARCH_ENTRIES)
return results["ret_values"]
class AdvancedSearchAPI(APIView):
"""
NOTE for now it's just copied from /api/v1/entry/search, but it should be
rewritten with DRF components.
"""
MAX_LIST_ENTRIES = 100
MAX_QUERY_SIZE = 64
def post(self, request, format=None):
hint_entities = request.data.get("entities")
hint_entry_name = request.data.get("entry_name", "")
hint_attrs = request.data.get("attrinfo")
hint_has_referral = request.data.get("has_referral", False)
hint_referral_name = request.data.get("referral_name", "")
is_output_all = request.data.get("is_output_all", True)
entry_limit = request.data.get("entry_limit", self.MAX_LIST_ENTRIES)
hint_referral = False
if hint_has_referral:
hint_referral = hint_referral_name
if (
not isinstance(hint_entities, list)
or not isinstance(hint_entry_name, str)
or not isinstance(hint_attrs, list)
or not isinstance(is_output_all, bool)
or not isinstance(hint_referral, (str, bool))
or not isinstance(entry_limit, int)
):
return Response(
"The type of parameter is incorrect", status=status.HTTP_400_BAD_REQUEST
)
# forbid to input large size request
if len(hint_entry_name) > self.MAX_QUERY_SIZE:
return Response("Sending parameter is too large", status=400)
# check attribute params
for hint_attr in hint_attrs:
if "name" not in hint_attr:
return Response("The name key is required for attrinfo parameter", status=400)
if not isinstance(hint_attr["name"], str):
return Response("Invalid value for attrinfo parameter", status=400)
if hint_attr.get("keyword"):
if not isinstance(hint_attr["keyword"], str):
return Response("Invalid value for attrinfo parameter", status=400)
# forbid to input large size request
if len(hint_attr["keyword"]) > self.MAX_QUERY_SIZE:
return Response("Sending parameter is too large", status=400)
# check entities params
if not hint_entities:
return Response("The entities parameters are required", status=400)
hint_entity_ids = []
for hint_entity in hint_entities:
entity = None
if isinstance(hint_entity, int):
entity = Entity.objects.filter(id=hint_entity, is_active=True).first()
elif isinstance(hint_entity, str):
if hint_entity.isnumeric():
entity = Entity.objects.filter(
Q(id=hint_entity) | Q(name=hint_entity), Q(is_active=True)
).first()
else:
entity = Entity.objects.filter(name=hint_entity, is_active=True).first()
if entity and request.user.has_permission(entity, ACLType.Readable):
hint_entity_ids.append(entity.id)
resp = Entry.search_entries(
request.user,
hint_entity_ids,
hint_attrs,
entry_limit,
hint_entry_name,
hint_referral,
is_output_all,
)
# convert field values to fit entry retrieve API data type, as a workaround.
# FIXME should be replaced with DRF serializer etc
for entry in resp["ret_values"]:
for name, attr in entry["attrs"].items():
def _get_typed_value(type: int) -> str:
if type & AttrTypeValue["array"]:
if type & AttrTypeValue["string"]:
return "asArrayString"
elif type & AttrTypeValue["named"]:
return "asArrayNamedObject"
elif type & AttrTypeValue["object"]:
return "asArrayObject"
elif type & AttrTypeValue["group"]:
return "asArrayGroup"
elif type & AttrTypeValue["string"] or type & AttrTypeValue["text"]:
return "asString"
elif type & AttrTypeValue["named"]:
return "asNamedObject"
elif type & AttrTypeValue["object"]:
return "asObject"
elif type & AttrTypeValue["boolean"]:
return "asBoolean"
elif type & AttrTypeValue["date"]:
return "asString"
elif type & AttrTypeValue["group"]:
return "asGroup"
raise ValidationError(f"unexpected type: {type}")
entry["attrs"][name] = {
"is_readble": attr["is_readble"],
"type": attr["type"],
"value": {
_get_typed_value(attr["type"]): attr["value"],
},
}
return Response({"result": resp})
@extend_schema(
parameters=[
OpenApiParameter("keyword", OpenApiTypes.STR, OpenApiParameter.QUERY),
],
)
class EntryReferralAPI(viewsets.ReadOnlyModelViewSet):
serializer_class = GetEntrySimpleSerializer
pagination_class = EntryReferralPagination
def get_queryset(self):
entry_id = self.kwargs["pk"]
keyword = self.request.query_params.get("keyword", None)
entry = Entry.objects.filter(pk=entry_id).first()
if not entry:
return []
ids = AttributeValue.objects.filter(
Q(referral=entry, is_latest=True) | Q(referral=entry, parent_attrv__is_latest=True)
).values_list("parent_attr__parent_entry", flat=True)
# if entity_name param exists, add schema name to reduce filter execution time
query = Q(pk__in=ids, is_active=True)
if keyword:
query &= Q(name__iregex=r"%s" % keyword)
return Entry.objects.filter(query)
class EntryExportAPI(generics.GenericAPIView):
serializer_class = EntryExportSerializer
def post(self, request, entity_id: int):
if not Entity.objects.filter(id=entity_id).exists():
return Response(
"Failed to get entity of specified id", status=status.HTTP_400_BAD_REQUEST
)
serializer = EntryExportSerializer(data=request.data)
if not serializer.is_valid():
return Response(
"Parameters in post body is invalid", status=status.HTTP_400_BAD_REQUEST
)
job_params = {
"export_format": serializer.validated_data["format"],
"target_id": entity_id,
}
# check whether same job is sent
job_status_not_finished = [Job.STATUS["PREPARING"], Job.STATUS["PROCESSING"]]
if (
Job.get_job_with_params(request.user, job_params)
.filter(status__in=job_status_not_finished)
.exists()
):
return Response(
"Same export processing is under execution", status=status.HTTP_400_BAD_REQUEST
)
entity = Entity.objects.get(id=entity_id)
if not request.user.has_permission(entity, ACLType.Readable):
return Response(
'Permission denied to _value "%s"' % entity.name, status=status.HTTP_400_BAD_REQUEST
)
# create a job to export search result and run it
job = Job.new_export(
request.user,
**{
"text": "entry_%s.%s" % (entity.name, job_params["export_format"]),
"target": entity,
"params": job_params,
},
)
job.run()
return Response(
{"result": "Succeed in registering export processing. " + "Please check Job list."},
status=status.HTTP_200_OK,
)