Skip to content

sigstore: extract LogEntry conversions to their own functions #992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 86 additions & 70 deletions sigstore/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,19 @@
Field,
StrictInt,
StrictStr,
TypeAdapter,
ValidationInfo,
field_validator,
)
from pydantic.dataclasses import dataclass
from rekor_types import Dsse, Hashedrekord, ProposedEntry
from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1
from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
Bundle as _Bundle,
)
from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1
from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1
from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import (
InclusionPromise,
InclusionProof,
)

Expand Down Expand Up @@ -187,6 +188,80 @@ def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
inclusion_promise=entry["verification"]["signedEntryTimestamp"],
)

@classmethod
def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry:
"""
Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
"""
tlog_entry = rekor_v1.TransparencyLogEntry()
tlog_entry.from_dict(dict_)

inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
# This check is required by us as the client, not the
# protobuf-specs themselves.
if inclusion_proof is None or inclusion_proof.checkpoint.envelope is None:
raise InvalidBundle("entry must contain inclusion proof")

parsed_inclusion_proof = LogInclusionProof(
checkpoint=inclusion_proof.checkpoint.envelope,
hashes=[h.hex() for h in inclusion_proof.hashes],
log_index=inclusion_proof.log_index,
root_hash=inclusion_proof.root_hash.hex(),
tree_size=inclusion_proof.tree_size,
)

return LogEntry(
uuid=None,
body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
integrated_time=tlog_entry.integrated_time,
log_id=tlog_entry.log_id.key_id.hex(),
log_index=tlog_entry.log_index,
inclusion_proof=parsed_inclusion_proof,
inclusion_promise=B64Str(
base64.b64encode(
tlog_entry.inclusion_promise.signed_entry_timestamp
).decode()
),
)

def _to_dict_rekor(self) -> dict[str, Any]:
inclusion_promise: rekor_v1.InclusionPromise | None = None
if self.inclusion_promise:
inclusion_promise = rekor_v1.InclusionPromise(
signed_entry_timestamp=base64.b64decode(self.inclusion_promise)
)

inclusion_proof = rekor_v1.InclusionProof(
log_index=self.inclusion_proof.log_index,
root_hash=bytes.fromhex(self.inclusion_proof.root_hash),
tree_size=self.inclusion_proof.tree_size,
hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes],
checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint),
)

tlog_entry = rekor_v1.TransparencyLogEntry(
log_index=self.log_index,
log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)),
integrated_time=self.integrated_time,
inclusion_promise=inclusion_promise,
inclusion_proof=inclusion_proof,
canonicalized_body=base64.b64decode(self.body),
)

# Fill in the appropriate kind
body_entry = TypeAdapter(ProposedEntry).validate_json(
tlog_entry.canonicalized_body
)
if not isinstance(body_entry, (Hashedrekord, Dsse)):
raise ValueError("LogEntry is not of expected type")

tlog_entry.kind_version = rekor_v1.KindVersion(
kind=body_entry.kind, version=body_entry.api_version
)

tlog_entry_dict: dict[str, Any] = tlog_entry.to_dict()
return tlog_entry_dict

def encode_canonical(self) -> bytes:
"""
Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
Expand Down Expand Up @@ -366,55 +441,22 @@ def _verify_bundle(self) -> None:
# The inclusion promise is NOT required; if present, the client
# SHOULD verify it.
#
# Beneath all of this, we require that the inclusion proof be present.
inclusion_promise: InclusionPromise | None = tlog_entry.inclusion_promise
inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
# Before all of this, we require that the inclusion proof be present
# (when constructing the LogEntry).
log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict())

if media_type == BundleType.BUNDLE_0_1:
if not inclusion_promise:
if not log_entry.inclusion_promise:
raise InvalidBundle("bundle must contain an inclusion promise")
if inclusion_proof and not inclusion_proof.checkpoint.envelope:
if not log_entry.inclusion_proof.checkpoint:
_logger.debug(
"0.1 bundle contains inclusion proof without checkpoint; ignoring"
)
else:
if not inclusion_proof:
raise InvalidBundle("bundle must contain an inclusion proof")
if not inclusion_proof.checkpoint.envelope:
if not log_entry.inclusion_proof.checkpoint:
raise InvalidBundle("expected checkpoint in inclusion proof")

parsed_inclusion_proof: InclusionProof | None = None
if (
inclusion_proof is not None
and inclusion_proof.checkpoint.envelope is not None
):
parsed_inclusion_proof = LogInclusionProof(
checkpoint=inclusion_proof.checkpoint.envelope,
hashes=[h.hex() for h in inclusion_proof.hashes],
log_index=inclusion_proof.log_index,
root_hash=inclusion_proof.root_hash.hex(),
tree_size=inclusion_proof.tree_size,
)

# Sanity: the only way we can hit this is with a v1 bundle without
# an inclusion proof. Putting this check here rather than above makes
# it clear that this check is required by us as the client, not the
# protobuf-specs themselves.
if parsed_inclusion_proof is None:
raise InvalidBundle("bundle must contain inclusion proof")

self._log_entry = LogEntry(
uuid=None,
body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
integrated_time=tlog_entry.integrated_time,
log_id=tlog_entry.log_id.key_id.hex(),
log_index=tlog_entry.log_index,
inclusion_proof=parsed_inclusion_proof,
inclusion_promise=B64Str(
base64.b64encode(
tlog_entry.inclusion_promise.signed_entry_timestamp
).decode()
),
)
self._log_entry = log_entry

@property
def signing_certificate(self) -> Certificate:
Expand Down Expand Up @@ -476,30 +518,6 @@ def _from_parts(
"""
@private
"""
inclusion_promise: rekor_v1.InclusionPromise | None = None
if log_entry.inclusion_promise:
inclusion_promise = rekor_v1.InclusionPromise(
signed_entry_timestamp=base64.b64decode(log_entry.inclusion_promise)
)

inclusion_proof = rekor_v1.InclusionProof(
log_index=log_entry.inclusion_proof.log_index,
root_hash=bytes.fromhex(log_entry.inclusion_proof.root_hash),
tree_size=log_entry.inclusion_proof.tree_size,
hashes=[bytes.fromhex(hash_) for hash_ in log_entry.inclusion_proof.hashes],
checkpoint=rekor_v1.Checkpoint(
envelope=log_entry.inclusion_proof.checkpoint
),
)

tlog_entry = rekor_v1.TransparencyLogEntry(
log_index=log_entry.log_index,
log_id=common_v1.LogId(key_id=bytes.fromhex(log_entry.log_id)),
integrated_time=log_entry.integrated_time,
inclusion_promise=inclusion_promise,
inclusion_proof=inclusion_proof,
canonicalized_body=base64.b64decode(log_entry.body),
)

inner = _Bundle(
media_type=BundleType.BUNDLE_0_3.value,
Expand All @@ -511,13 +529,11 @@ def _from_parts(
# Fill in the appropriate variants.
if isinstance(content, common_v1.MessageSignature):
inner.message_signature = content
tlog_entry.kind_version = rekor_v1.KindVersion(
kind="hashedrekord", version="0.0.1"
)
else:
inner.dsse_envelope = content._inner
tlog_entry.kind_version = rekor_v1.KindVersion(kind="dsse", version="0.0.1")

tlog_entry = rekor_v1.TransparencyLogEntry()
tlog_entry.from_dict(log_entry._to_dict_rekor())
inner.verification_material.tlog_entries = [tlog_entry]

return cls(inner)
8 changes: 8 additions & 0 deletions test/unit/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ def test_missing_inclusion_proof(self):
inclusion_promise=None,
)

def test_logentry_roundtrip(self, signing_bundle):
_, bundle = signing_bundle("bundle.txt")

assert (
LogEntry._from_dict_rekor(bundle.log_entry._to_dict_rekor())
== bundle.log_entry
)


class TestLogInclusionProof:
def test_valid(self):
Expand Down