Skip to content

Commit

Permalink
Use celery-beat to schedule tool deletes (#1472)
Browse files Browse the repository at this point in the history
* Use celery-beat to schedule tool deletes

* Add readiness mgmt command for celery beat

Uses similar logic to celery for now. When celery beat init signal is
sent, we create a file. The management command checks for the prescence
of this file.

* Update launch.json.example to add celery beat
  • Loading branch information
michaeljcollinsuk authored Mar 4, 2025
1 parent 05a837a commit f273481
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 34 deletions.
32 changes: 22 additions & 10 deletions controlpanel/api/models/tool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Standard library
import json
from datetime import timedelta

# Third-party
Expand All @@ -8,11 +9,12 @@
from django.db.models import JSONField
from django.dispatch import receiver
from django.utils import timezone
from django_celery_beat.models import ClockedSchedule, PeriodicTask
from django_extensions.db.models import TimeStampedModel

# First-party/Local
from controlpanel.api import cluster, helm
from controlpanel.api.tasks.tools import uninstall_helm_release, uninstall_tool
from controlpanel.api.tasks.tools import uninstall_helm_release

log = structlog.getLogger(__name__)

Expand Down Expand Up @@ -149,16 +151,26 @@ def tool_type_name(self):

def uninstall_deployments(self):
"""
Sends task to uninstall the tool from all users namespaces. If DELAY_TOOL_UNINSTALL is True,
tasks will be sent to be run at 3am the next day. This is to avoid uninstalling the tool
when users are actively using it.
Sends task to uninstall the tool from all users namespaces. Task will be sent to be run at
3am the next day. This is to avoid uninstalling the tool when users are actively using it.
This time can be updated in the celery beat admin.
"""
eta = None
if settings.DELAY_TOOL_UNINSTALL:
eta = timezone.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(
days=1
)
uninstall_tool.apply_async_on_commit(args=[self.pk], eta=eta)
# can be amended later in django admin
default_run_at = timezone.now().replace(
hour=3, minute=0, second=0, microsecond=0
) + timedelta(days=1)
clocked, _ = ClockedSchedule.objects.get_or_create(clocked_time=default_run_at)
PeriodicTask.objects.update_or_create(
name=f"Uninstall active deployments: {self.description} ({self.pk})",
defaults={
"clocked": clocked,
"task": "controlpanel.api.tasks.tools.uninstall_tool",
"kwargs": json.dumps({"tool_pk": self.pk}),
"expires": default_run_at + timedelta(hours=3),
"one_off": True,
"enabled": True,
},
)


class ToolDeploymentQuerySet(models.QuerySet):
Expand Down
7 changes: 5 additions & 2 deletions controlpanel/api/tasks/tools.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Third-party
import structlog
from celery import shared_task

# First-party/Local
from controlpanel.api import helm
from controlpanel.utils import _get_model

log = structlog.getLogger(__name__)


# TODO do we need to use acks_late? try without first
@shared_task(acks_on_failure_or_timeout=False)
@shared_task(acks_on_failure_or_timeout=False) # this does nothing without using acks_late
def uninstall_tool(tool_pk):
Tool = _get_model("Tool")
try:
Expand All @@ -24,4 +27,4 @@ def uninstall_helm_release(namespace, release_name):
try:
helm.delete(namespace, release_name)
except helm.HelmReleaseNotFound as e:
print(e)
log.info(e)
6 changes: 6 additions & 0 deletions controlpanel/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dotenv
import structlog
from celery import Celery
from celery.signals import beat_init
from django.conf import settings
from kombu import Queue

Expand Down Expand Up @@ -43,6 +44,11 @@ def worker_health_check(self):
log.debug("Worker health ping task executed")


@beat_init.connect
def beat_ready(**_):
Path(settings.WORKER_HEALTH_FILENAME).touch()


# ensures worker picks and runs tasks from all queues rather than just default queue
# alternative is to run the worker and pass queue name to -Q flag
app.conf.task_queues = [
Expand Down
25 changes: 25 additions & 0 deletions controlpanel/frontend/management/commands/celery_beat_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Standard library
import random
from datetime import datetime, timedelta
from pathlib import Path
from sys import exit

# Third-party
from django.conf import settings
from django.core.management.base import BaseCommand

# First-party/Local
from controlpanel.celery import worker_health_check


class Command(BaseCommand):
help = "Checks if celery beat is ready by checking for heath file"

def handle(self, *args, **options):

if not Path(settings.WORKER_HEALTH_FILENAME).is_file():
self.stderr.write(self.style.ERROR("Health file not found"))
exit(-1)

self.stdout.write(self.style.SUCCESS("Health file found"))
exit(0)
2 changes: 2 additions & 0 deletions controlpanel/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
"controlpanel.cli",
# Health check
"django_prometheus",
# for scheduled tasks
"django_celery_beat",
]

MIDDLEWARE = [
Expand Down
15 changes: 15 additions & 0 deletions doc/launch.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@
"--loglevel=info",
],
},
{
"name": "celery_beat",
"type": "debugpy",
"request": "launch",
"module": "celery",
"args": [
"-A",
"controlpanel",
"beat",
"--loglevel=debug",
"--scheduler",
"django_celery_beat.schedulers:DatabaseScheduler",
],
"justMyCode": false,
},
{
"name": "runserver",
"type": "debugpy",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ channels==4.2.0
channels-redis==4.2.1
daphne==4.1.2
Django==5.1.4
django-celery-beat==2.7.0
django-crequest==2018.5.11
django-extensions==3.2.3
django-filter==24.3
Expand Down
7 changes: 0 additions & 7 deletions settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,3 @@ S3_FOLDER_BUCKET_NAME:
WORKER_HEALTH_FILENAME: "/tmp/worker_health.txt"
USE_LOCAL_MESSAGE_BROKER: false
BROKER_URL: "sqs://"


DELAY_TOOL_UNINSTALL:
_DEFAULT: true
_HOST_test: false
_HOST_dev: false
_HOST_alpha: true
43 changes: 28 additions & 15 deletions tests/api/models/test_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,35 @@ def test_status_colour():
assert tool.status_colour == "red"


@patch("django.utils.timezone.now", return_value=timezone.datetime(2025, 1, 1, 0, 0, 0))
@pytest.mark.parametrize(
"delay, eta",
[
(False, None),
(True, timezone.datetime(2025, 1, 2, 3, 0, 0)), # 3am the next day
],
@patch(
"django.utils.timezone.now",
return_value=timezone.datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.timezone.utc),
)
def test_uninstall_deployments(timezone_now, delay, eta):
settings.DELAY_TOOL_UNINSTALL = delay
tool = Tool(pk=123)

with patch("controlpanel.api.models.tool.uninstall_tool") as uninstall_tool:
tool.uninstall_deployments()
uninstall_tool.apply_async_on_commit.assert_called_once_with(args=[tool.pk], eta=eta)
assert bool(timezone_now.mock_calls) == delay
@patch("controlpanel.api.models.tool.ClockedSchedule")
@patch("controlpanel.api.models.tool.PeriodicTask")
def test_uninstall_deployments(task, clocked, mock_now):
tool = Tool(pk=123, description="Test description", is_retired=True)
expected_run_at = timezone.datetime(2025, 1, 2, 3, 0, 0, tzinfo=timezone.timezone.utc)
clocked_mock = MagicMock()
clocked.objects.get_or_create.return_value = (clocked_mock, True)

tool.uninstall_deployments()

mock_now.assert_called_once()
clocked.objects.get_or_create.assert_called_once_with(
clocked_time=expected_run_at,
)
task.objects.update_or_create.assert_called_once_with(
name=f"Uninstall active deployments: {tool.description} ({tool.pk})",
defaults={
"clocked": clocked_mock,
"task": "controlpanel.api.tasks.tools.uninstall_tool",
"kwargs": f'{{"tool_pk": {tool.pk}}}',
"expires": expected_run_at + timezone.timedelta(hours=3),
"one_off": True,
"enabled": True,
},
)


@pytest.mark.django_db
Expand Down

0 comments on commit f273481

Please sign in to comment.