Skip to content

Commit

Permalink
Closes pklauke#42 : add HistoryProcessInstance
Browse files Browse the repository at this point in the history
  • Loading branch information
nesies authored and nesies committed Aug 2, 2023
1 parent 3faac40 commit f9e2df6
Showing 1 changed file with 254 additions and 0 deletions.
254 changes: 254 additions & 0 deletions pycamunda/historyprocessinst.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
# -*- coding: utf-8 -*-

"""This module provides access to the external task REST api of Camunda."""

from __future__ import annotations
import dataclasses
import typing

import pycamunda
import pycamunda.base
import pycamunda.variable
import pycamunda.batch
from pycamunda.request import QueryParameter

URL_SUFFIX = '/history/process-instance'


__all__ = [
'GetList'
]


@dataclasses.dataclass
class HistoryProcessInstance:
"""Data class of HistoryProcessInstance returned by the REST api of Camunda."""
id_: str
business_key: str
process_definition_id: str
process_definition_key: str
process_definition_name: str
process_definition_version: str
start_time: str
end_time: str
removal_time: str
duration_in_millis: str
start_user_id: str
start_activity_id: str
delete_reason: str
root_process_instance_id: str
super_process_instance_id: str
super_case_instance_id: str
case_instance_id: str
tenant_id: str
state: str

@classmethod
def load(cls, data: typing.Mapping[str, typing.Any]) -> HistoryProcessInstance:
history_process_instance = cls(
id_=data['id'],
business_key=data['businessKey'],
process_definition_id=data['processDefinitionId'],
process_definition_key=data['processDefinitionKey'],
process_definition_name=data['processDefinitionName'],
process_definition_version=data['processDefinitionVersion'],
start_time=data['startTime'],
end_time=data['endTime'],
removal_time=data['removalTime'],
duration_in_millis=data['durationInMillis'],
start_user_id=data['startUserId'],
start_activity_id=data['startActivityId'],
delete_reason=data['deleteReason'],
root_process_instance_id=data['rootProcessInstanceId'],
super_process_instance_id=data['superProcessInstanceId'],
super_case_instance_id=data['superCaseInstanceId'],
case_instance_id=data['caseInstanceId'],
tenant_id=data['tenantId'],
state=data['state']
)
return history_process_instance


class GetList(pycamunda.base.CamundaRequest):
process_instance_id = QueryParameter('processInstanceId')
# process_instance_ids = QueryParameter('processInstanceIds')
process_definition_id = QueryParameter('processDefinitionId')
process_definition_key = QueryParameter('processDefinitionKey')
# process_definition_key_in = QueryParameter('processDefinitionKeyIn')
process_definition_name = QueryParameter('processDefinitionName')
process_definition_name_like = QueryParameter('processDefinitionNameLike')
process_definition_key_not_in = QueryParameter('processDefinitionKeyNotIn')
first_result = QueryParameter('firstResult')
max_results = QueryParameter('maxResults')
process_instance_business_key = QueryParameter('processInstanceBusinessKey')
process_instance_business_key_in = QueryParameter('processInstanceBusinessKeyIn')
process_instance_business_key_like = QueryParameter('processInstanceBusinessKeyLike')
root_process_instances = QueryParameter('rootProcessInstances')
unfinished = QueryParameter('unfinished')
with_incidents = QueryParameter('withIncidents')
with_root_incidents = QueryParameter('withRootIncidents')
incident_type = QueryParameter('incidentType')
incident_status = QueryParameter('incidentStatus')
incident_message = QueryParameter('incidentMessage')
incident_message_like = QueryParameter('incidentMessageLike')
started_before = QueryParameter('startedBefore')
started_after = QueryParameter('startedAfter')
finished_before = QueryParameter('finishedBefore')
finished_after = QueryParameter('finishedAfter')
executed_activity_after = QueryParameter('executedActivityAfter')
executed_activity_before = QueryParameter('executedActivityBefore')
executed_job_after = QueryParameter('executedJobAfter')
executed_job_before = QueryParameter('executedJobBefore')
started_by = QueryParameter('startedBy')
super_process_instance_id = QueryParameter('superProcessInstanceId')
sub_process_instance_id = QueryParameter('subProcessInstanceId')
super_case_instance_id = QueryParameter('superCaseInstanceId')
sub_case_instance_id = QueryParameter('subCaseInstanceId')
case_instance_id = QueryParameter('caseInstanceId')
tenant_id_in = QueryParameter('tenantIdIn')
without_tenant_id = QueryParameter('withoutTenantId')
executed_activity_id_in = QueryParameter('executedActivityIdIn')
active_activity_id_in = QueryParameter('activeActivityIdIn')
active = QueryParameter('active')
suspended = QueryParameter('suspended')
completed = QueryParameter('completed')
externally_terminated = QueryParameter('externallyTerminated')
internally_terminated = QueryParameter('internallyTerminated')
variables = QueryParameter('variables')
variable_names_ignore_case = QueryParameter('variableNamesIgnoreCase')
variable_values_ignore_case = QueryParameter('variableValuesIgnoreCase')
sort_by = QueryParameter(
'sortBy',
mapping={
'id_': 'id',
'instance_id': 'instanceId',
'definition_id': 'definitionId',
'definition_key': 'definitionKey',
'definition_name': 'definitionName',
'definition_version': 'definitionVersion',
'business_key': 'businessKey',
'start_time': 'startTime',
'end_time': 'endTime',
'duration': 'duration',
'tenant_id': 'tenanId'
}
)
ascending = QueryParameter(
'sortOrder',
mapping={True: 'asc', False: 'desc'},
provide=lambda self, obj, obj_type: vars(obj).get('sort_by', None) is not None
)

def __init__(
self,
url: str,
id_: str = None,
process_instance_id: str = None,
process_definition_key: str = None,
sort_by: str = None,
active: bool = None,
suspended: bool = None,
completed: bool = None,
ascending: bool = True,
variables: str = None,
first_result: int = None,
max_results: int = None,
process_instance_business_key : str = None,
process_instance_business_key_in : str = None,
process_instance_business_key_like : str = None,
root_process_instances : str = None,
unfinished : boolean = None,
with_incidents : boolean = None,
with_root_incidents : str = None,
incident_type : str = None,
incident_status : str = None,
incident_message : str = None,
incident_message_like : str = None,
started_before : str = None,
started_after : str = None,
finished_before : str = None,
finished_after : str = None,
executed_activity_after : str = None,
executed_activity_before : str = None,
executed_job_after : str = None,
executed_job_before : str = None,
started_by : str = None,
super_process_instance_id : str = None,
sub_process_instance_id : str = None,
super_case_instance_id : str = None,
sub_case_instance_id : str = None,
case_instance_id : str = None,
tenant_id_in : str = None,
without_tenant_id : boolean = None,
executed_activity_id_in : str = None,
active_activity_id_in : str = None,
active : boolean = None,
suspended : boolean = None,
completed : boolean = None,
externally_terminated : boolean = None,
internally_terminated : boolean = None,
variables : str = None,
variable_names_ignore_case : boolean = None,
variable_values_ignore_case : boolean = None
):
"""Query for a list of external tasks using a list of parameters. The size of the result set
can be retrieved by using the Get Count request.
:param url: Camunda Rest engine URL.
:param id_: Filter by the id of the external task.
:param process_instance_id: Filter by the process_instance_id
"""
super().__init__(url=url + URL_SUFFIX)
self.id_ = id_
self.process_instance_id = process_instance_id
self.process_definition_key = process_definition_key
self.variables = variables
self.suspended = suspended
self.completed = completed
self.active = active
self.ascending = ascending
self.sort_by = sort_by
self.first_result = first_result
self.max_results = max_results
self.process_instance_business_key = process_instance_business_key
self.process_instance_business_key_in = process_instance_business_key_in
self.process_instance_business_key_like = process_instance_business_key_like
self.root_process_instances = root_process_instances
self.unfinished = unfinished
self.with_incidents = with_incidents
self.with_root_incidents = with_root_incidents
self.incident_type = incident_type
self.incident_status = incident_status
self.incident_message = incident_message
self.incident_message_like = incident_message_like
self.started_before = started_before
self.started_after = started_after
self.finished_before = finished_before
self.finished_after = finished_after
self.executed_activity_after = executed_activity_after
self.executed_activity_before = executed_activity_before
self.executed_job_after = executed_job_after
self.executed_job_before = executed_job_before
self.started_by = started_by
self.super_process_instance_id = super_process_instance_id
self.sub_process_instance_id = sub_process_instance_id
self.super_case_instance_id = super_case_instance_id
self.sub_case_instance_id = sub_case_instance_id
self.case_instance_id = case_instance_id
self.tenant_id_in = tenant_id_in
self.without_tenant_id = without_tenant_id
self.executed_activity_id_in = executed_activity_id_in
self.active_activity_id_in = active_activity_id_in
self.active = active
self.suspended = suspended
self.completed = completed
self.externally_terminated = externally_terminated
self.internally_terminated = internally_terminated
self.variables = variables
self.variable_names_ignore_case = variable_names_ignore_case
self.variable_values_ignore_case = variable_values_ignore_case

def __call__(self, *args, **kwargs) -> typing.Tuple[HistoryProcessInstance]:
"""Send the request."""
response = super().__call__(pycamunda.base.RequestMethod.GET, *args, **kwargs)
return tuple(HistoryProcessInstance.load(data_json) for data_json in response.json())

0 comments on commit f9e2df6

Please sign in to comment.