diff --git a/CHANGELOG.md b/CHANGELOG.md index 963af4a0..e27928a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ All versions prior to 0.9.0 are untracked. ### Added +* Added `LogEntry._kind_version`, which is now parsed earlier upon receipt from the rekor API, + either from the root of the response, or from the reponse's inner base64-encoded JSON `body`. + [#1370](https://github.com/sigstore/sigstore-python/pull/1370) + * Added support for ed25519 keys. [#1377](https://github.com/sigstore/sigstore-python/pull/1377) diff --git a/sigstore/models.py b/sigstore/models.py index bbcb1cc7..e4ff5462 100644 --- a/sigstore/models.py +++ b/sigstore/models.py @@ -58,9 +58,7 @@ from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1 from sigstore_protobuf_specs.dev.sigstore.common.v1 import Rfc3161SignedTimestamp from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1 -from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import ( - InclusionProof, -) +from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import InclusionProof, KindVersion from sigstore import dsse from sigstore._internal.merkle import verify_merkle_inclusion @@ -173,6 +171,11 @@ class LogEntry: log entry. """ + _kind_version: KindVersion + """ + The kind and version of the log entry. + """ + @classmethod def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: """ @@ -183,8 +186,15 @@ def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: entries = list(dict_.items()) if len(entries) != 1: raise ValueError("Received multiple entries in response") - uuid, entry = entries[0] + + # Fill in the appropriate kind + body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( + base64.b64decode(entry["body"]) + ) + if not isinstance(body_entry, (Hashedrekord, Dsse)): + raise InvalidBundle("log entry is not of expected type") + return LogEntry( uuid=uuid, body=entry["body"], @@ -195,6 +205,9 @@ def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: entry["verification"]["inclusionProof"] ), inclusion_promise=entry["verification"]["signedEntryTimestamp"], + _kind_version=KindVersion( + kind=body_entry.kind, version=body_entry.api_version + ), ) @classmethod @@ -234,6 +247,7 @@ def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: log_id=tlog_entry.log_id.key_id.hex(), log_index=tlog_entry.log_index, inclusion_proof=parsed_inclusion_proof, + _kind_version=tlog_entry.kind_version, inclusion_promise=inclusion_promise, ) @@ -256,6 +270,7 @@ def _to_rekor(self) -> rekor_v1.TransparencyLogEntry: log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)), integrated_time=self.integrated_time, inclusion_proof=inclusion_proof, + kind_version=self._kind_version, canonicalized_body=base64.b64decode(self.body), ) if self.inclusion_promise: @@ -264,17 +279,6 @@ def _to_rekor(self) -> rekor_v1.TransparencyLogEntry: ) tlog_entry.inclusion_promise = inclusion_promise - # Fill in the appropriate kind - body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( - tlog_entry.canonicalized_body - ) - if not isinstance(body_entry, (Hashedrekord, Dsse)): - raise InvalidBundle("log entry is not of expected type") - - tlog_entry.kind_version = rekor_v1.KindVersion( - kind=body_entry.kind, version=body_entry.api_version - ) - return tlog_entry def encode_canonical(self) -> bytes: