Skip to content

Commit

Permalink
ENH: Simplify question cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
cortadocodes committed Feb 6, 2025
1 parent a514d4c commit cf91180
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 36 deletions.
36 changes: 19 additions & 17 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from octue.cloud.events.utils import make_attributes
from octue.cloud.events.validation import raise_if_event_is_invalid
from octue.cloud.pub_sub import Subscription, Topic
from octue.cloud.pub_sub.bigquery import get_events
from octue.cloud.pub_sub.events import GoogleCloudPubSubEventHandler, extract_event
from octue.cloud.pub_sub.logging import GoogleCloudPubSubHandler
from octue.cloud.service_id import (
Expand Down Expand Up @@ -451,30 +452,31 @@ def wait_for_answer(
finally:
subscription.delete()

def cancel(
self,
question_uuid,
parent_question_uuid,
originator_question_uuid,
parent,
originator,
retry_count,
timeout=30,
):
def cancel(self, question_uuid, event_store_table_id, timeout=30):
questions = get_events(table_id=event_store_table_id, question_uuid=question_uuid, kinds=["question"])

if len(questions) == 0:
raise ValueError("No question found with question UUID %r.", question_uuid)

if len(questions) > 1:
raise ValueError("Multiple questions found with same question UUID %r.", question_uuid)

question_attributes = questions[0]

self._emit_event(
{"kind": "cancellation"},
question_uuid=question_uuid,
parent_question_uuid=parent_question_uuid,
originator_question_uuid=originator_question_uuid,
parent=parent,
originator=originator,
recipient=parent,
retry_count=retry_count,
parent_question_uuid=question_attributes["parent_question_uuid"],
originator_question_uuid=question_attributes["originator_question_uuid"],
parent=question_attributes["parent"],
originator=question_attributes["originator"],
recipient=question_attributes["recipient"],
retry_count=question_attributes["retry_count"],
attributes={"sender_type": PARENT_SENDER_TYPE},
timeout=timeout,
)

logger.info("%r requested cancellation of question %r.", self, question_uuid)
logger.info("Cancellation of question %r requested.", self, question_uuid)

def send_exception(
self,
Expand Down
21 changes: 2 additions & 19 deletions octue/resources/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,22 +237,5 @@ def ask_multiple(
# Convert dictionary to list in asking order.
return [answer[1] for answer in sorted(answers.items(), key=lambda item: item[0])]

def cancel(
self,
question_uuid,
parent_question_uuid,
originator_question_uuid,
parent,
originator,
retry_count,
timeout=30,
):
self._service.cancel(
question_uuid,
parent_question_uuid,
originator_question_uuid,
parent,
originator,
retry_count,
timeout=timeout,
)
def cancel(self, question_uuid, event_store_table_id, timeout=30):
self._service.cancel(question_uuid=question_uuid, event_store_table_id=event_store_table_id, timeout=timeout)

0 comments on commit cf91180

Please sign in to comment.