Skip to content

Add Asynchronous External Task Client and Worker #107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 24, 2025
171 changes: 171 additions & 0 deletions camunda/client/async_external_task_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import logging
from http import HTTPStatus

import httpx

from camunda.client.engine_client import ENGINE_LOCAL_BASE_URL
from camunda.utils.log_utils import log_with_context
from camunda.utils.response_utils import raise_exception_if_not_ok
from camunda.utils.utils import str_to_list
from camunda.utils.auth_basic import AuthBasic, obfuscate_password
from camunda.utils.auth_bearer import AuthBearer
from camunda.variables.variables import Variables

logger = logging.getLogger(__name__)


class AsyncExternalTaskClient:
default_config = {
"maxConcurrentTasks": 10, # Number of concurrent tasks you can process
"lockDuration": 300000, # in milliseconds
"asyncResponseTimeout": 30000,
"retries": 3,
"retryTimeout": 300000,
"httpTimeoutMillis": 30000,
"timeoutDeltaMillis": 5000,
"includeExtensionProperties": True, # enables Camunda Extension Properties
"deserializeValues": True, # deserialize values when fetch a task by default
"usePriority": False,
"sorting": None
}

def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, config=None):
config = config if config is not None else {}
self.worker_id = worker_id
self.external_task_base_url = engine_base_url + "/external-task"
self.config = type(self).default_config.copy()
self.config.update(config)
self.is_debug = config.get('isDebug', False)
self.http_timeout_seconds = self.config.get('httpTimeoutMillis') / 1000
self._log_with_context(f"Created External Task client with config: {obfuscate_password(self.config)}")

def get_fetch_and_lock_url(self):
return f"{self.external_task_base_url}/fetchAndLock"

async def fetch_and_lock(self, topic_names, process_variables=None, variables=None):
url = self.get_fetch_and_lock_url()
body = {
"workerId": str(self.worker_id), # convert to string to make it JSON serializable
"maxTasks": 1,
"topics": self._get_topics(topic_names, process_variables, variables),
"asyncResponseTimeout": self.config["asyncResponseTimeout"],
"usePriority": self.config["usePriority"],
"sorting": self.config["sorting"]
}

if self.is_debug:
self._log_with_context(f"Trying to fetch and lock with request payload: {body}")
http_timeout_seconds = self.__get_fetch_and_lock_http_timeout_seconds()

async with httpx.AsyncClient() as client:
response = await client.post(url, headers=self._get_headers(), json=body, timeout=http_timeout_seconds)
raise_exception_if_not_ok(response)

resp_json = response.json()
if self.is_debug:
self._log_with_context(f"Fetch and lock response JSON: {resp_json} for request: {body}")
return resp_json

def __get_fetch_and_lock_http_timeout_seconds(self):
# Use HTTP timeout slightly more than async response / long polling timeout
return (self.config["timeoutDeltaMillis"] + self.config["asyncResponseTimeout"]) / 1000

def _get_topics(self, topic_names, process_variables, variables):
topics = []
for topic in str_to_list(topic_names):
topics.append({
"topicName": topic,
"lockDuration": self.config["lockDuration"],
"processVariables": process_variables if process_variables else {},
# Enables Camunda Extension Properties
"includeExtensionProperties": self.config.get("includeExtensionProperties") or False,
"deserializeValues": self.config["deserializeValues"],
"variables": variables
})
return topics

async def complete(self, task_id, global_variables, local_variables=None):
url = self.get_task_complete_url(task_id)

body = {
"workerId": self.worker_id,
"variables": Variables.format(global_variables),
"localVariables": Variables.format(local_variables)
}

async with httpx.AsyncClient() as client:
response = await client.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
raise_exception_if_not_ok(response)
return response.status_code == HTTPStatus.NO_CONTENT

def get_task_complete_url(self, task_id):
return f"{self.external_task_base_url}/{task_id}/complete"

async def failure(self, task_id, error_message, error_details, retries, retry_timeout):
url = self.get_task_failure_url(task_id)
logger.info(f"Setting retries to: {retries} for task: {task_id}")
body = {
"workerId": self.worker_id,
"errorMessage": error_message,
"retries": retries,
"retryTimeout": retry_timeout,
}
if error_details:
body["errorDetails"] = error_details

async with httpx.AsyncClient() as client:
response = await client.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
raise_exception_if_not_ok(response)
return response.status_code == HTTPStatus.NO_CONTENT

def get_task_failure_url(self, task_id):
return f"{self.external_task_base_url}/{task_id}/failure"

async def bpmn_failure(self, task_id, error_code, error_message, variables=None):
url = self.get_task_bpmn_error_url(task_id)

body = {
"workerId": self.worker_id,
"errorCode": error_code,
"errorMessage": error_message,
"variables": Variables.format(variables),
}

if self.is_debug:
self._log_with_context(f"Trying to report BPMN error with request payload: {body}")

async with httpx.AsyncClient() as client:
response = await client.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
response.raise_for_status()
return response.status_code == HTTPStatus.NO_CONTENT

def get_task_bpmn_error_url(self, task_id):
return f"{self.external_task_base_url}/{task_id}/bpmnError"

@property
def auth_basic(self) -> dict:
if not self.config.get("auth_basic") or not isinstance(self.config.get("auth_basic"), dict):
return {}
token = AuthBasic(**self.config.get("auth_basic").copy()).token
return {"Authorization": token}

@property
def auth_bearer(self) -> dict:
if not self.config.get("auth_bearer") or not isinstance(self.config.get("auth_bearer"), dict):
return {}
token = AuthBearer(access_token=self.config["auth_bearer"]).access_token
return {"Authorization": token}

def _get_headers(self):
headers = {
"Content-Type": "application/json"
}
if self.auth_basic:
headers.update(self.auth_basic)
if self.auth_bearer:
headers.update(self.auth_bearer)
return headers

def _log_with_context(self, msg, log_level='info', **kwargs):
context = {"WORKER_ID": self.worker_id}
log_with_context(msg, context=context, log_level=log_level, **kwargs)
128 changes: 128 additions & 0 deletions camunda/client/tests/test_async_external_task_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import unittest
from http import HTTPStatus
from unittest.mock import patch, AsyncMock

import httpx

# Adjust the import based on your actual module path
from camunda.client.async_external_task_client import AsyncExternalTaskClient, ENGINE_LOCAL_BASE_URL


class AsyncExternalTaskClientTest(unittest.IsolatedAsyncioTestCase):
"""
Tests for async_external_task_client.py
"""

def setUp(self):
# Common setup if needed
self.default_worker_id = 1
self.default_engine_url = ENGINE_LOCAL_BASE_URL

async def test_creation_with_no_debug_config(self):
client = AsyncExternalTaskClient(self.default_worker_id, self.default_engine_url, {})
self.assertFalse(client.is_debug)
self.assertFalse(client.config.get("isDebug"))
# Check default_config merges:
self.assertEqual(client.config["maxConcurrentTasks"], 10)
self.assertEqual(client.config["lockDuration"], 300000)

async def test_creation_with_debug_config(self):
client = AsyncExternalTaskClient(self.default_worker_id, self.default_engine_url, {"isDebug": True})
self.assertTrue(client.is_debug)
self.assertTrue(client.config.get("isDebug"))

@patch("httpx.AsyncClient.post")
async def test_fetch_and_lock_success(self, mock_post):
# Provide actual JSON as bytes
content = b'[{"id": "someExternalTaskId", "topicName": "topicA"}]'
mock_post.return_value = httpx.Response(
status_code=200,
request=httpx.Request("POST", "http://example.com"),
content=content
)

client = AsyncExternalTaskClient(self.default_worker_id, self.default_engine_url, {})
# Perform call
tasks = await client.fetch_and_lock("topicA")

# Assertions
expected_url = f"{ENGINE_LOCAL_BASE_URL}/external-task/fetchAndLock"
self.assertEqual([{"id": "someExternalTaskId", "topicName": "topicA"}], tasks)
mock_post.assert_awaited_once() # Check post was awaited exactly once
args, kwargs = mock_post.call_args
self.assertEqual(expected_url, args[0], "Expected correct fetchAndLock endpoint URL")
# You could also check the payload or headers here:
self.assertIn("json", kwargs)
self.assertEqual(kwargs["json"]["workerId"], "1") # str(worker_id)

@patch("httpx.AsyncClient.post")
async def test_fetch_and_lock_server_error(self, mock_post):
# Create a real httpx.Response with status=500
server_err_resp = httpx.Response(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
request=httpx.Request("POST", "http://example.com/external-task/fetchAndLock"),
content=b"Internal Server Error"
)
# Each call to mock_post() returns this real response object
mock_post.return_value = server_err_resp

client = AsyncExternalTaskClient(self.default_worker_id, self.default_engine_url, {})

# Now we expect an exception
with self.assertRaises(httpx.HTTPStatusError) as ctx:
await client.fetch_and_lock("topicA")

# Optional: confirm the error message
self.assertIn("500 Internal Server Error", str(ctx.exception))

@patch("httpx.AsyncClient.post", new_callable=AsyncMock)
async def test_complete_success(self, mock_post):
mock_post.return_value.status_code = HTTPStatus.NO_CONTENT

client = AsyncExternalTaskClient(self.default_worker_id, self.default_engine_url, {})
result = await client.complete("myTaskId", {"globalVar": 1})

self.assertTrue(result)
mock_post.assert_awaited_once()
complete_url = f"{ENGINE_LOCAL_BASE_URL}/external-task/myTaskId/complete"
self.assertEqual(complete_url, mock_post.call_args[0][0])

@patch("httpx.AsyncClient.post", new_callable=AsyncMock)
async def test_failure_with_error_details(self, mock_post):
mock_post.return_value.status_code = HTTPStatus.NO_CONTENT

client = AsyncExternalTaskClient(self.default_worker_id, self.default_engine_url, {})
result = await client.failure(
task_id="myTaskId",
error_message="some error",
error_details="stacktrace info",
retries=3,
retry_timeout=10000
)

self.assertTrue(result)
mock_post.assert_awaited_once()
failure_url = f"{ENGINE_LOCAL_BASE_URL}/external-task/myTaskId/failure"
self.assertEqual(failure_url, mock_post.call_args[0][0])
self.assertIn("errorDetails", mock_post.call_args[1]["json"])

@patch("httpx.AsyncClient.post", new_callable=AsyncMock)
async def test_bpmn_failure_success(self, mock_post):
mock_post.return_value.status_code = HTTPStatus.NO_CONTENT

client = AsyncExternalTaskClient(self.default_worker_id, self.default_engine_url, {"isDebug": True})
result = await client.bpmn_failure(
task_id="myTaskId",
error_code="BPMN_ERROR",
error_message="an example BPMN error",
variables={"foo": "bar"}
)

self.assertTrue(result)
mock_post.assert_awaited_once()
bpmn_url = f"{ENGINE_LOCAL_BASE_URL}/external-task/myTaskId/bpmnError"
args, kwargs = mock_post.call_args
self.assertEqual(bpmn_url, args[0])
self.assertEqual(kwargs["json"]["errorCode"], "BPMN_ERROR")
self.assertTrue(client.is_debug) # Confirm the debug flag is set

42 changes: 42 additions & 0 deletions camunda/client/tests/test_async_external_task_client_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import unittest
from http import HTTPStatus
from unittest.mock import AsyncMock, patch

from camunda.client.async_external_task_client import AsyncExternalTaskClient
from camunda.client.engine_client import ENGINE_LOCAL_BASE_URL


class AsyncExternalTaskClientAuthTest(unittest.IsolatedAsyncioTestCase):
async def test_auth_basic_fetch_and_lock_no_debug(self):
with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post:
mock_post.return_value.status_code = HTTPStatus.OK
mock_post.return_value.json.return_value = []

client = AsyncExternalTaskClient(
1,
ENGINE_LOCAL_BASE_URL,
{"auth_basic": {"username": "demo", "password": "demo"}}
)
await client.fetch_and_lock("someTopic")

# Confirm "Authorization" header is present
headers_used = mock_post.call_args[1]["headers"]
self.assertIn("Authorization", headers_used)
self.assertTrue(headers_used["Authorization"].startswith("Basic "))

async def test_auth_basic_fetch_and_lock_with_debug(self):
with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post:
mock_post.return_value.status_code = HTTPStatus.OK
mock_post.return_value.json.return_value = []

client = AsyncExternalTaskClient(
1,
ENGINE_LOCAL_BASE_URL,
{"auth_basic": {"username": "demo", "password": "demo"}, "isDebug": True}
)
await client.fetch_and_lock("someTopic")

# Confirm "Authorization" header is present
headers_used = mock_post.call_args[1]["headers"]
self.assertIn("Authorization", headers_used)
self.assertTrue(headers_used["Authorization"].startswith("Basic "))
43 changes: 43 additions & 0 deletions camunda/client/tests/test_async_external_task_client_bearer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import unittest
from http import HTTPStatus
from unittest.mock import patch, AsyncMock

from camunda.client.async_external_task_client import AsyncExternalTaskClient
from camunda.client.engine_client import ENGINE_LOCAL_BASE_URL


class AsyncExternalTaskClientAuthTest(unittest.IsolatedAsyncioTestCase):

async def test_auth_bearer_fetch_and_lock_no_debug(self):
token = "some.super.long.jwt"
with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post:
mock_post.return_value.status_code = HTTPStatus.OK
mock_post.return_value.json.return_value = []

client = AsyncExternalTaskClient(
1,
ENGINE_LOCAL_BASE_URL,
{"auth_bearer": {"access_token": token}}
)
await client.fetch_and_lock("someTopic")

headers_used = mock_post.call_args[1]["headers"]
self.assertIn("Authorization", headers_used)
self.assertEqual(f"Bearer {token}", headers_used["Authorization"])

async def test_auth_bearer_fetch_and_lock_with_debug(self):
token = "some.super.long.jwt"
with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post:
mock_post.return_value.status_code = HTTPStatus.OK
mock_post.return_value.json.return_value = []

client = AsyncExternalTaskClient(
1,
ENGINE_LOCAL_BASE_URL,
{"auth_bearer": {"access_token": token}, "isDebug": True}
)
await client.fetch_and_lock("someTopic")

headers_used = mock_post.call_args[1]["headers"]
self.assertIn("Authorization", headers_used)
self.assertEqual(f"Bearer {token}", headers_used["Authorization"])
Loading
Loading