Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support celery tasks on APIv2 to process heavy operations smoothly #1094

Merged
merged 15 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions acl/tests/test_api_v2.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import json

from mock import mock

from acl.models import ACLBase
from airone.lib.acl import ACLType
from airone.lib.test import AironeViewTest
from airone.lib.types import AttrTypeValue
from entity import tasks
from entity.models import Entity, EntityAttr
from role.models import Role

Expand Down Expand Up @@ -474,6 +477,9 @@ def _put_acl():
self.assertEqual(resp.status_code, 200)
self.assertTrue(role.is_permitted(acl, ACLType.Full))

@mock.patch(
"entity.tasks.create_entity_v2.delay", mock.Mock(side_effect=tasks.create_entity_v2)
)
def test_list_history(self):
self.initialization_for_retrieve_test()
self.client.post("/entity/api/v2/", json.dumps({"name": "test"}), "application/json")
Expand Down Expand Up @@ -594,6 +600,9 @@ def test_list_history_with_role(self):
],
)

@mock.patch(
"entity.tasks.create_entity_v2.delay", mock.Mock(side_effect=tasks.create_entity_v2)
)
def test_list_history_with_entity_attr(self):
self.initialization_for_retrieve_test()

Expand Down
15 changes: 15 additions & 0 deletions airone/lib/drf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import yaml
from django.conf import settings
from rest_framework import serializers
from rest_framework.exceptions import APIException, ParseError, ValidationError
from rest_framework.parsers import BaseParser
from rest_framework.renderers import BaseRenderer
Expand Down Expand Up @@ -150,3 +151,17 @@ def _convert_error_code(detail):
response.data = _convert_error_code(response.data)

return response


class AironeUserDefault(serializers.CurrentUserDefault):
"""
It enables to get user from the custom field in the context.
The original CurrentUserDefault fetches it from request context,
so it fails if the context doesn't have request.
"""

def __call__(self, serializer_field):
if "_user" in serializer_field.context:
return serializer_field.context["_user"]

return super().__call__(serializer_field)
7 changes: 5 additions & 2 deletions airone/lib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ def wrapper(kls, job_id):
# update Job status from PREPARING to PROCEEDING
job.update(Job.STATUS["PROCESSING"])

# running Job processing
ret = func(kls, job)
try:
# running Job processing
ret: int | tuple = func(kls, job)
except Exception:
ret = Job.STATUS["ERROR"]

# update Job status after finishing Job processing
if isinstance(ret, int):
Expand Down
2 changes: 1 addition & 1 deletion apiclient/typescript-fetch/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dmm-com/airone-apiclient-typescript-fetch",
"version": "0.0.9",
"version": "0.0.10",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@userlocalhost @hinashi please update the apiclient package if this PR seems okay.

"description": "AirOne APIv2 client in TypeScript",
"main": "src/autogenerated/index.ts",
"scripts": {
Expand Down
29 changes: 22 additions & 7 deletions entity/api_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rest_framework.exceptions import PermissionDenied, ValidationError

import custom_view
from airone.lib import drf
from airone.lib.acl import ACLType
from airone.lib.drf import DuplicatedObjectExistsError, ObjectNotExistsError, RequiredParameterError
from airone.lib.log import Logger
Expand Down Expand Up @@ -69,7 +70,7 @@ def validate(self, webhook):


class EntityAttrCreateSerializer(serializers.ModelSerializer):
created_user = serializers.HiddenField(default=serializers.CurrentUserDefault())
created_user = serializers.HiddenField(default=drf.AironeUserDefault())

class Meta:
model = EntityAttr
Expand Down Expand Up @@ -141,7 +142,7 @@ def validate(self, attr):
if "type" in attr and attr["type"] != entity_attr.type:
raise ValidationError("type cannot be changed")

user: User = self.context["request"].user
user: User = self.context.get("_user") or self.context["request"].user
if attr["is_deleted"] and not user.has_permission(entity_attr, ACLType.Full):
raise PermissionDenied("Does not have permission to delete")
if not attr["is_deleted"] and not user.has_permission(entity_attr, ACLType.Writable):
Expand Down Expand Up @@ -204,7 +205,7 @@ def _update_or_create(

entity: Entity
entity, is_created_entity = Entity.objects.get_or_create(
id=entity_id, defaults={**validated_data}
id=entity_id, created_user=user, defaults={**validated_data}
)
if not is_created_entity:
# record history for specific fields on update
Expand Down Expand Up @@ -324,11 +325,10 @@ class EntityCreateSerializer(EntitySerializer):
child=EntityAttrCreateSerializer(), write_only=True, required=False, default=[]
)
webhooks = WebhookCreateUpdateSerializer(many=True, write_only=True, required=False, default=[])
created_user = serializers.HiddenField(default=serializers.CurrentUserDefault())

class Meta:
model = Entity
fields = ["id", "name", "note", "is_toplevel", "attrs", "webhooks", "created_user"]
fields = ["id", "name", "note", "is_toplevel", "attrs", "webhooks"]
extra_kwargs = {"note": {"write_only": True}}

def validate_name(self, name):
Expand All @@ -353,8 +353,16 @@ def validate_webhooks(self, webhooks: list[WebhookCreateUpdateSerializer]):
return webhooks

def create(self, validated_data: EntityCreateData):
user: User = self.context["request"].user
user: User | None = None
if "request" in self.context:
user = self.context["request"].user
if "_user" in self.context:
user = self.context["_user"]

if user is None:
raise RequiredParameterError("user is required")

validated_data["created_user"] = user
if custom_view.is_custom("before_create_entity_V2"):
validated_data = custom_view.call_custom(
"before_create_entity_v2", None, user, validated_data
Expand Down Expand Up @@ -415,7 +423,14 @@ def validate_webhooks(self, webhooks: list[WebhookCreateUpdateSerializer]):
return webhooks

def update(self, entity: Entity, validated_data: EntityUpdateData):
user: User = self.context["request"].user
user: User | None = None
if "request" in self.context:
user = self.context["request"].user
if "_user" in self.context:
user = self.context["_user"]

if user is None:
raise RequiredParameterError("user is required")

if custom_view.is_custom("before_update_entity_v2"):
validated_data = custom_view.call_custom(
Expand Down
68 changes: 45 additions & 23 deletions entity/api_v2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from rest_framework.response import Response
from rest_framework.serializers import Serializer

import custom_view
from airone.lib.acl import ACLType, get_permitted_objects
from airone.lib.drf import ObjectNotExistsError, YAMLParser, YAMLRenderer
from airone.lib.http import http_get
Expand All @@ -29,6 +28,7 @@
from entity.models import Entity, EntityAttr
from entry.api_v2.serializers import EntryBaseSerializer, EntryCreateSerializer
from entry.models import Entry
from job.models import Job
from user.models import History, User


Expand Down Expand Up @@ -110,8 +110,8 @@ class EntityAPI(viewsets.ModelViewSet):
def get_serializer_class(self):
serializer = {
"list": EntityListSerializer,
"create": EntityCreateSerializer,
"update": EntityUpdateSerializer,
"create": serializers.Serializer,
"update": serializers.Serializer,
}
return serializer.get(self.action, EntityDetailSerializer)

Expand All @@ -129,9 +129,36 @@ def get_queryset(self):

return Entity.objects.filter(**filter_condition).exclude(**exclude_condition)

def destroy(self, request, pk):
@extend_schema(request=EntityCreateSerializer)
def create(self, request, *args, **kwargs):
user: User = request.user

serializer = EntityCreateSerializer(data=request.data, context={"_user": user})
serializer.is_valid(raise_exception=True)

job = Job.new_create_entity_v2(user, None, params=request.data)
job.run()

return Response(status=status.HTTP_202_ACCEPTED)

@extend_schema(request=EntityUpdateSerializer)
def update(self, request, *args, **kwargs):
user: User = request.user
entity: Entity = self.get_object()

serializer = EntityUpdateSerializer(
instance=entity, data=request.data, context={"_user": user}
)
serializer.is_valid(raise_exception=True)

job = Job.new_edit_entity_v2(user, entity, params=request.data)
job.run()

return Response(status=status.HTTP_202_ACCEPTED)

def destroy(self, request, *args, **kwargs):
user: User = request.user
entity: Entity = self.get_object()

if not entity.is_active:
raise ObjectNotExistsError("specified entity has already been deleted")
Expand All @@ -141,24 +168,10 @@ def destroy(self, request, pk):
"cannot delete Entity because one or more Entries are not deleted"
)

if custom_view.is_custom("before_delete_entity_v2"):
custom_view.call_custom("before_delete_entity_v2", None, user, entity)

# register operation History for deleting entity
history: History = user.seth_entity_del(entity)

entity.delete()

# Delete all attributes which target Entity have
entity_attr: EntityAttr
for entity_attr in entity.attrs.filter(is_active=True):
history.del_attr(entity_attr)
entity_attr.delete()

if custom_view.is_custom("after_delete_entity_v2"):
custom_view.call_custom("after_delete_entity_v2", None, user, entity)
job = Job.new_delete_entity_v2(user, entity, params=request.data)
job.run()

return Response(status=status.HTTP_204_NO_CONTENT)
return Response(status=status.HTTP_202_ACCEPTED)


class EntityEntryAPI(viewsets.ModelViewSet):
Expand All @@ -172,7 +185,7 @@ class EntityEntryAPI(viewsets.ModelViewSet):

def get_serializer_class(self):
serializer = {
"create": EntryCreateSerializer,
"create": serializers.Serializer,
}
return serializer.get(self.action, EntryBaseSerializer)

Expand All @@ -182,9 +195,18 @@ def get_queryset(self):
raise Http404
return self.queryset.filter(schema=entity)

@extend_schema(request=EntryCreateSerializer)
def create(self, request, entity_id):
user: User = request.user
request.data["schema"] = entity_id
return super().create(request)

serializer = EntryCreateSerializer(data=request.data, context={"_user": user})
serializer.is_valid(raise_exception=True)

job = Job.new_create_entry_v2(user, None, params=request.data)
job.run()

return Response(status=status.HTTP_202_ACCEPTED)


class EntityHistoryAPI(viewsets.ReadOnlyModelViewSet):
Expand Down
63 changes: 62 additions & 1 deletion entity/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json

import custom_view
from airone.celery import app
from airone.lib.job import may_schedule_until_job_is_ready
from airone.lib.types import AttrTypeValue
from entity.api_v2.serializers import EntityCreateSerializer, EntityUpdateSerializer
from entity.models import Entity, EntityAttr
from job.models import Job
from user.models import User
from user.models import History, User


@app.task(bind=True)
Expand Down Expand Up @@ -237,3 +240,61 @@ def delete_entity(self, job_id):

# update job status and save it
job.update(Job.STATUS["DONE"])


@app.task(bind=True)
@may_schedule_until_job_is_ready
def create_entity_v2(self, job: Job):
serializer = EntityCreateSerializer(data=json.loads(job.params), context={"_user": job.user})
if not serializer.is_valid():
return Job.STATUS["ERROR"]

serializer.create(serializer.validated_data)

# update job status and save it
return Job.STATUS["DONE"]


@app.task(bind=True)
@may_schedule_until_job_is_ready
def edit_entity_v2(self, job: Job):
entity: Entity | None = Entity.objects.filter(id=job.target.id, is_active=True).first()
if not entity:
job.update(Job.STATUS["ERROR"])
return

serializer = EntityUpdateSerializer(
instance=entity, data=json.loads(job.params), context={"_user": job.user}
)
if not serializer.is_valid():
return Job.STATUS["ERROR"]

serializer.update(entity, serializer.validated_data)

return Job.STATUS["DONE"]


@app.task(bind=True)
@may_schedule_until_job_is_ready
def delete_entity_v2(self, job: Job):
entity: Entity | None = Entity.objects.filter(id=job.target.id, is_active=True).first()
if not entity:
return Job.STATUS["ERROR"]

if custom_view.is_custom("before_delete_entity_v2"):
custom_view.call_custom("before_delete_entity_v2", None, job.user, entity)

# register operation History for deleting entity
history: History = job.user.seth_entity_del(entity)
entity.delete()

# Delete all attributes which target Entity have
entity_attr: EntityAttr
for entity_attr in entity.attrs.filter(is_active=True):
history.del_attr(entity_attr)
entity_attr.delete()

if custom_view.is_custom("after_delete_entity_v2"):
custom_view.call_custom("after_delete_entity_v2", None, job.user, entity)

return Job.STATUS["DONE"]
Loading
Loading