Skip to content

Commit

Permalink
REF: Factor out attributes into EventAttributes class
Browse files Browse the repository at this point in the history
skipci
  • Loading branch information
cortadocodes committed Feb 11, 2025
1 parent 9a86bce commit 74f11d8
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 492 deletions.
45 changes: 21 additions & 24 deletions octue/cloud/emulators/_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,31 +405,28 @@ def ask(
# If the originator isn't provided, assume that this service revision is the originator.
originator = originator or self.id

attributes = make_minimal_dictionary(
datetime="2024-04-11T10:46:48.236064",
uuid="a9de11b1-e88f-43fa-b3a4-40a590c3443f",
question_uuid=question_uuid,
parent_question_uuid=parent_question_uuid,
originator_question_uuid=originator_question_uuid,
forward_logs=subscribe_to_logs,
save_diagnostics=save_diagnostics,
parent=self.id,
originator=originator,
sender=self.id,
sender_type=PARENT_SENDER_TYPE,
sender_sdk_version=parent_sdk_version,
recipient=service_id,
retry_count=retry_count,
cpus=cpus,
memory=memory,
ephemeral_storage=ephemeral_storage,
)

try:
self.children[service_id].answer(
MockMessage.from_primitive(
data=question,
attributes={
"datetime": "2024-04-11T10:46:48.236064",
"uuid": "a9de11b1-e88f-43fa-b3a4-40a590c3443f",
"question_uuid": question_uuid,
"parent_question_uuid": parent_question_uuid,
"originator_question_uuid": originator_question_uuid,
"forward_logs": subscribe_to_logs,
"save_diagnostics": save_diagnostics,
"parent": self.id,
"originator": originator,
"sender": self.id,
"sender_type": PARENT_SENDER_TYPE,
"sender_sdk_version": parent_sdk_version,
"recipient": service_id,
"retry_count": retry_count,
"cpus": cpus,
"memory": memory,
"ephemeral_storage": ephemeral_storage,
},
)
)
self.children[service_id].answer(MockMessage.from_primitive(data=question, attributes=attributes))
except Exception as e: # noqa
logger.exception(e)

Expand Down
95 changes: 95 additions & 0 deletions octue/cloud/events/attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import datetime as dt
import json
import uuid as uuid_library

from octue.cloud import LOCAL_SDK_VERSION
from octue.utils.dictionaries import make_minimal_dictionary

SENDER_TYPE_OPPOSITES = {"CHILD": "PARENT", "PARENT": "CHILD"}


class EventAttributes:
def __init__(
self,
originator_question_uuid,
parent,
originator,
sender,
sender_type,
recipient,
uuid=None,
datetime=None,
question_uuid=None,
parent_question_uuid=None,
sender_sdk_version=None,
retry_count=0,
forward_logs=True,
save_diagnostics=True,
cpus=None,
memory=None,
ephemeral_storage=None,
):
# Attributes for all event kinds.
self.uuid = uuid or str(uuid_library.uuid4())
self.datetime = datetime or dt.datetime.now(tz=dt.timezone.utc).isoformat()
self.question_uuid = question_uuid or str(uuid_library.uuid4())
self.parent_question_uuid = parent_question_uuid
self.originator_question_uuid = originator_question_uuid
self.parent = parent
self.originator = originator
self.sender = sender
self.sender_type = sender_type
self.sender_sdk_version = sender_sdk_version or LOCAL_SDK_VERSION
self.recipient = recipient
self.retry_count = int(retry_count)

# Question event attributes.
self.forward_logs = bool(forward_logs)
self.save_diagnostics = save_diagnostics
self.cpus = cpus
self.memory = memory
self.ephemeral_storage = ephemeral_storage

def make_response_attributes(self):
attributes = self.to_dict()
attributes["sender"] = self.recipient
attributes["sender_type"] = SENDER_TYPE_OPPOSITES[self.sender_type]
attributes["sender_sdk_version"] = LOCAL_SDK_VERSION
attributes["recipient"] = self.sender
return EventAttributes(**attributes)

def to_dict(self):
return make_minimal_dictionary(
uuid=self.uuid,
datetime=self.datetime,
question_uuid=self.question_uuid,
parent_question_uuid=self.parent_question_uuid,
originator_question_uuid=self.originator_question_uuid,
parent=self.parent,
originator=self.originator,
sender=self.sender,
sender_type=self.sender_type,
sender_sdk_version=self.sender_sdk_version,
recipient=self.recipient,
retry_count=self.retry_count,
forward_logs=self.forward_logs,
save_diagnostics=self.save_diagnostics,
cpus=self.cpus,
memory=self.memory,
ephemeral_storage=self.ephemeral_storage,
)

def to_serialised_attributes(self):
serialised_attributes = {}

for key, value in self.to_dict().items():
if isinstance(value, bool):
value = str(int(value))
elif isinstance(value, (int, float)):
value = str(value)
elif value is None:
value = json.dumps(value)

serialised_attributes[key] = value

return serialised_attributes
54 changes: 3 additions & 51 deletions octue/cloud/events/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime
import uuid

from octue.cloud import LOCAL_SDK_VERSION
from octue.cloud.events.attributes import EventAttributes
from octue.utils.dictionaries import make_minimal_dictionary


Expand All @@ -26,7 +25,7 @@ def make_question_event(
if not attributes:
question_uuid = question_uuid or str(uuid.uuid4())

attributes = make_attributes(
attributes = EventAttributes(
question_uuid=question_uuid,
parent_question_uuid=question_uuid,
originator_question_uuid=question_uuid,
Expand All @@ -41,52 +40,5 @@ def make_question_event(

return {
"event": make_minimal_dictionary(input_values=input_values, input_manifest=input_manifest, kind="question"),
"attributes": attributes,
"attributes": attributes.to_dict(),
}


def make_attributes(
parent_question_uuid,
originator_question_uuid,
parent,
originator,
sender,
sender_type,
recipient,
question_uuid=None,
retry_count=0,
forward_logs=None,
save_diagnostics=None,
cpus=None,
memory=None,
ephemeral_storage=None,
):
attributes = {
"uuid": str(uuid.uuid4()),
"datetime": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
"question_uuid": question_uuid or str(uuid.uuid4()),
"parent_question_uuid": parent_question_uuid,
"originator_question_uuid": originator_question_uuid,
"parent": parent,
"originator": originator,
"sender": sender,
"sender_type": sender_type,
"sender_sdk_version": LOCAL_SDK_VERSION,
"recipient": recipient,
"retry_count": int(retry_count),
}

if sender_type == "PARENT":
if forward_logs:
attributes["forward_logs"] = bool(forward_logs)

attributes.update(
make_minimal_dictionary(
save_diagnostics=save_diagnostics,
cpus=cpus,
memory=memory,
ephemeral_storage=ephemeral_storage,
)
)

return attributes
20 changes: 4 additions & 16 deletions octue/cloud/events/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,8 @@
"description": "The UUID of the question the event is related to.",
},
"parent_question_uuid": {
"oneOf": [
{
"type": "string",
"description": "The UUID of the question that triggered this question.",
},
{"type": "null", "description": "If this is the originating question."},
]
"type": "string",
"description": "If this isn't the originating question, the UUID of the question that triggered this question. If it is, don't provide this.",
},
"originator_question_uuid": {
"type": "string",
Expand Down Expand Up @@ -106,7 +101,6 @@
"datetime",
"uuid",
"question_uuid",
"parent_question_uuid",
"originator_question_uuid",
"forward_logs",
"save_diagnostics",
Expand Down Expand Up @@ -137,13 +131,8 @@
"description": "The UUID of the question the event is related to.",
},
"parent_question_uuid": {
"oneOf": [
{
"type": "string",
"description": "The UUID of the question that triggered this question.",
},
{"type": "null", "description": "If this is the originating question."},
]
"type": "string",
"description": "If this isn't the originating question, the UUID of the question that triggered this question. If it is, don't provide this.",
},
"originator_question_uuid": {
"type": "string",
Expand Down Expand Up @@ -188,7 +177,6 @@
"datetime",
"uuid",
"question_uuid",
"parent_question_uuid",
"originator_question_uuid",
"parent",
"originator",
Expand Down
44 changes: 6 additions & 38 deletions octue/cloud/pub_sub/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,14 @@ class GoogleCloudPubSubHandler(logging.Handler):
"""A log handler that publishes log records to a Google Cloud Pub/Sub topic.
:param callable event_emitter: the `_emit_event` method of the service that instantiated this instance
:param str question_uuid: the UUID of the question to handle log records for
:param str|None parent_question_uuid: the UUID of the question these log records are related to
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question
:param str parent: the SRUID of the parent that asked the question these log records are related to
:param str originator: the SRUID of the service revision that triggered the tree of questions these log records are related to
:param str recipient: the SRUID of the service to send these log records to
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param octue.cloud.events.attributes.EventAttributes attributes:
:param float timeout: timeout in seconds for attempting to publish each log record
:return None:
"""

def __init__(
self,
event_emitter,
question_uuid,
parent_question_uuid,
originator_question_uuid,
parent,
originator,
recipient,
retry_count,
timeout=60,
*args,
**kwargs,
):
def __init__(self, event_emitter, attributes, timeout=60, *args, **kwargs):
super().__init__(*args, **kwargs)
self.question_uuid = question_uuid
self.parent_question_uuid = parent_question_uuid
self.originator_question_uuid = originator_question_uuid
self.parent = parent
self.originator = originator
self.recipient = recipient
self.retry_count = retry_count
self.attributes = attributes
self.timeout = timeout
self._emit_event = event_emitter

Expand All @@ -56,22 +31,15 @@ def emit(self, record):
"kind": "log_record",
"log_record": self._convert_log_record_to_primitives(record),
},
parent=self.parent,
originator=self.originator,
recipient=self.recipient,
retry_count=self.retry_count,
question_uuid=self.question_uuid,
parent_question_uuid=self.parent_question_uuid,
originator_question_uuid=self.originator_question_uuid,
# The sender type is repeated here as a string to avoid a circular import.
attributes={"sender_type": "CHILD"},
attributes=self.attributes,
wait=False,
)

except Exception: # noqa
self.handleError(record)

def _convert_log_record_to_primitives(self, log_record):
@staticmethod
def _convert_log_record_to_primitives(log_record):
"""Convert a log record to JSON-serialisable primitives by interpolating the args into the message, and
removing the exception info, which is potentially not JSON-serialisable. This is similar to the approach in
`logging.handlers.SocketHandler.makePickle`. Also strip any ANSI escape sequences from the message.
Expand Down
Loading

0 comments on commit 74f11d8

Please sign in to comment.