diff --git a/octue/cloud/pub_sub/service.py b/octue/cloud/pub_sub/service.py index 2433df085..0acc0da6d 100644 --- a/octue/cloud/pub_sub/service.py +++ b/octue/cloud/pub_sub/service.py @@ -17,6 +17,7 @@ from octue.cloud.events.utils import make_attributes from octue.cloud.events.validation import raise_if_event_is_invalid from octue.cloud.pub_sub import Subscription, Topic +from octue.cloud.pub_sub.bigquery import get_events from octue.cloud.pub_sub.events import GoogleCloudPubSubEventHandler, extract_event from octue.cloud.pub_sub.logging import GoogleCloudPubSubHandler from octue.cloud.service_id import ( @@ -451,30 +452,31 @@ def wait_for_answer( finally: subscription.delete() - def cancel( - self, - question_uuid, - parent_question_uuid, - originator_question_uuid, - parent, - originator, - retry_count, - timeout=30, - ): + def cancel(self, question_uuid, event_store_table_id, timeout=30): + questions = get_events(table_id=event_store_table_id, question_uuid=question_uuid, kinds=["question"]) + + if len(questions) == 0: + raise ValueError("No question found with question UUID %r.", question_uuid) + + if len(questions) > 1: + raise ValueError("Multiple questions found with same question UUID %r.", question_uuid) + + question_attributes = questions[0] + self._emit_event( {"kind": "cancellation"}, question_uuid=question_uuid, - parent_question_uuid=parent_question_uuid, - originator_question_uuid=originator_question_uuid, - parent=parent, - originator=originator, - recipient=parent, - retry_count=retry_count, + parent_question_uuid=question_attributes["parent_question_uuid"], + originator_question_uuid=question_attributes["originator_question_uuid"], + parent=question_attributes["parent"], + originator=question_attributes["originator"], + recipient=question_attributes["recipient"], + retry_count=question_attributes["retry_count"], attributes={"sender_type": PARENT_SENDER_TYPE}, timeout=timeout, ) - logger.info("%r requested cancellation of question %r.", self, question_uuid) + logger.info("Cancellation of question %r requested.", self, question_uuid) def send_exception( self, diff --git a/octue/resources/child.py b/octue/resources/child.py index 707292cd2..feb1b3ad0 100644 --- a/octue/resources/child.py +++ b/octue/resources/child.py @@ -237,22 +237,5 @@ def ask_multiple( # Convert dictionary to list in asking order. return [answer[1] for answer in sorted(answers.items(), key=lambda item: item[0])] - def cancel( - self, - question_uuid, - parent_question_uuid, - originator_question_uuid, - parent, - originator, - retry_count, - timeout=30, - ): - self._service.cancel( - question_uuid, - parent_question_uuid, - originator_question_uuid, - parent, - originator, - retry_count, - timeout=timeout, - ) + def cancel(self, question_uuid, event_store_table_id, timeout=30): + self._service.cancel(question_uuid=question_uuid, event_store_table_id=event_store_table_id, timeout=timeout)