diff --git a/CHANGELOG.md b/CHANGELOG.md index 9170b90d9..9ece652b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,17 +10,12 @@ All versions prior to 0.9.0 are untracked. ### Added -* API: `Signer.sign()` can now take a `Hashed` as an input, - performing a signature on a pre-computed hash value - ([#860](https://github.com/sigstore/sigstore-python/pull/860)) +* API: `Signer.sign_artifact()` has been added, replacing the removed + `Signer.sign()` API -* API: `Signer.sign()` can now take an in-toto `Statement` as an input, - producing a DSSE-formatted signature rather than a "bare" signature - ([#804](https://github.com/sigstore/sigstore-python/pull/804)) - -* API: `SigningResult.content` has been added, representing either the - `hashedrekord` entry's message signature or the `dsse` entry's envelope - ([#804](https://github.com/sigstore/sigstore-python/pull/804)) +* API: `Signer.sign_intoto()` has been added. It takes an in-toto `Statement` + as an input, producing a DSSE-formatted signature rather than a "bare" + signature ([#804](https://github.com/sigstore/sigstore-python/pull/804)) * API: "v3" Sigstore bundles are now supported during verification ([#901](https://github.com/sigstore/sigstore-python/pull/901)) @@ -41,15 +36,16 @@ All versions prior to 0.9.0 are untracked. * **BREAKING API CHANGE**: `VerificationMaterials` has been removed. The public verification APIs now accept `sigstore.verify.models.Bundle`. +* **BREAKING API CHANGE**: `Signer.sign(...)` has been removed. Use + either `sign_artifact(...)` or `sign_intoto(...)`, depending on whether + you're signing opaque bytes or an in-toto statement. + * **BREAKING API CHANGE**: `VerificationResult` has been removed. The public verification and policy APIs now raise `sigstore.errors.VerificationError` on failure. ### Changed -* **BREAKING API CHANGE**: The `Signer.sign(...)` API now returns a `sigstore.verify.models.Bundle`, - instead of a `SigningResult` ([#862](https://github.com/sigstore/sigstore-python/pull/862)) - * **BREAKING API CHANGE**: `Verifier.verify(...)` now takes a `bytes | Hashed` as its verification input, rather than implicitly receiving the input through the `VerificationMaterials` parameter @@ -59,10 +55,6 @@ All versions prior to 0.9.0 are untracked. a `Hashed` parameter to convey the digest used for Rekor entry lookup ([#904](https://github.com/sigstore/sigstore-python/pull/904)) -* **BREAKING API CHANGE**: `Signer.sign(...)` now takes a `bytes` instead of - an `IO[bytes]` for input. Other input types (such as `Hashed` and - `Statement`) are unchanged ([#921](https://github.com/sigstore/sigstore-python/pull/921)) - * **BREAKING API CHANGE**: `Verifier.verify(...)` now takes a `sigstore.verify.models.Bundle`, instead of a `VerificationMaterials` ([#937](https://github.com/sigstore/sigstore-python/pull/937)) diff --git a/sigstore/_cli.py b/sigstore/_cli.py index 9f6063db7..6cc4298a4 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -622,7 +622,7 @@ def _sign(args: argparse.Namespace) -> None: # digest and sign the prehash rather than buffering it fully. digest = sha256_digest(io) try: - result = signer.sign(input_=digest) + result = signer.sign_artifact(input_=digest) except ExpiredIdentity as exp_identity: print("Signature failed: identity token has expired") raise exp_identity diff --git a/sigstore/sign.py b/sigstore/sign.py index 66978280d..0a2cac007 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -31,7 +31,7 @@ signing_ctx = SigningContext.production() with signing_ctx.signer(identity, cache=True) as signer: - result = signer.sign(artifact) + result = signer.sign_artifact(artifact) print(result) ``` """ @@ -58,7 +58,6 @@ from sigstore import hashes as sigstore_hashes from sigstore._internal.fulcio import ( ExpiredCertificate, - FulcioCertificateSigningResponse, FulcioClient, ) from sigstore._internal.rekor.client import RekorClient @@ -98,14 +97,12 @@ def __init__( self._identity_token = identity_token self._signing_ctx: SigningContext = signing_ctx self.__cached_private_key: Optional[ec.EllipticCurvePrivateKey] = None - self.__cached_signing_certificate: Optional[ - FulcioCertificateSigningResponse - ] = None + self.__cached_signing_certificate: Optional[x509.Certificate] = None if cache: _logger.debug("Generating ephemeral keys...") self.__cached_private_key = ec.generate_private_key(ec.SECP256R1()) _logger.debug("Requesting ephemeral certificate...") - self.__cached_signing_certificate = self._signing_cert(self._private_key) + self.__cached_signing_certificate = self._signing_cert() @property def _private_key(self) -> ec.EllipticCurvePrivateKey: @@ -117,12 +114,22 @@ def _private_key(self) -> ec.EllipticCurvePrivateKey: def _signing_cert( self, - private_key: ec.EllipticCurvePrivateKey, - ) -> FulcioCertificateSigningResponse: - """Get or request a signing certificate from Fulcio.""" + ) -> x509.Certificate: + """ + Get or request a signing certificate from Fulcio. + + Internally, this performs a CSR against Fulcio and verifies that + the returned certificate is present in Fulcio's CT log. + """ + + # Our CSR cannot possibly succeed if our underlying identity token + # is expired. + if not self._identity_token.in_validity_period(): + raise ExpiredIdentity + # If it exists, verify if the current certificate is expired if self.__cached_signing_certificate: - not_valid_after = self.__cached_signing_certificate.cert.not_valid_after_utc + not_valid_after = self.__cached_signing_certificate.not_valid_after_utc if datetime.now(timezone.utc) > not_valid_after: raise ExpiredCertificate return self.__cached_signing_certificate @@ -147,50 +154,91 @@ def _signing_cert( critical=True, ) ) - certificate_request = builder.sign(private_key, hashes.SHA256()) + certificate_request = builder.sign(self._private_key, hashes.SHA256()) certificate_response = self._signing_ctx._fulcio.signing_cert.post( certificate_request, self._identity_token ) - return certificate_response + # Verify the SCT + sct = certificate_response.sct + cert = certificate_response.cert + chain = certificate_response.chain + + verify_sct(sct, cert, chain, self._signing_ctx._trusted_root.ct_keyring()) - def sign( + _logger.debug("Successfully verified SCT...") + + return cert + + def _finalize_sign( self, - input_: bytes | dsse.Statement | sigstore_hashes.Hashed, + cert: x509.Certificate, + content: MessageSignature | dsse.Envelope, + proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse, ) -> Bundle: """ - Sign an input, and return a `Bundle` corresponding to the signed result. + Perform the common "finalizing" steps in a Sigstore signing flow. + """ + # Submit the proposed entry to the transparency log + entry = self._signing_ctx._rekor.log.entries.post(proposed_entry) - The input can be one of three forms: + _logger.debug(f"Transparency log entry created with index: {entry.log_index}") - 1. A `bytes` buffer; - 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs - that are too large to buffer into memory); - 3. An in-toto `Statement` object. + return Bundle._from_parts(cert, content, entry) - In cases (1) and (2), the signing operation will produce a `hashedrekord` - entry within the bundle. In case (3), the signing operation will produce - a DSSE envelope and corresponding `dsse` entry within the bundle. + def sign_intoto( + self, + input_: dsse.Statement, + ) -> Bundle: """ - private_key = self._private_key + Sign the given in-toto statement, and return a `Bundle` containing + the signed result. - if not self._identity_token.in_validity_period(): - raise ExpiredIdentity + This API is **only** for in-toto statements; to sign arbitrary artifacts, + use `sign_artifact` instead. + """ + cert = self._signing_cert() - try: - certificate_response = self._signing_cert(private_key) - except ExpiredCertificate as e: - raise e + # Prepare inputs + b64_cert = base64.b64encode( + cert.public_bytes(encoding=serialization.Encoding.PEM) + ) - # Verify the SCT - sct = certificate_response.sct - cert = certificate_response.cert - chain = certificate_response.chain + # Sign the statement, producing a DSSE envelope + content = dsse._sign(self._private_key, input_) - verify_sct(sct, cert, chain, self._signing_ctx._trusted_root.ct_keyring()) + # Create the proposed DSSE log entry + proposed_entry = rekor_types.Dsse( + spec=rekor_types.dsse.DsseV001Schema( + proposed_content=rekor_types.dsse.ProposedContent( + envelope=content.to_json(), + verifiers=[b64_cert.decode()], + ), + ), + ) - _logger.debug("Successfully verified SCT...") + return self._finalize_sign(cert, content, proposed_entry) + + def sign_artifact( + self, + input_: bytes | sigstore_hashes.Hashed, + ) -> Bundle: + """ + Sign an artifact, and return a `Bundle` corresponding to the signed result. + + The input can be one of two forms: + + 1. A `bytes` buffer; + 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs + that are too large to buffer into memory). + + Regardless of the input format, the signing operation will produce a + `hashedrekord` entry within the bundle. No other entry types + are supported by this API. + """ + + cert = self._signing_cert() # Prepare inputs b64_cert = base64.b64encode( @@ -198,59 +246,39 @@ def sign( ) # Sign artifact - content: MessageSignature | dsse.Envelope - proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse - if isinstance(input_, dsse.Statement): - content = dsse._sign(private_key, input_) - - # Create the proposed DSSE entry - proposed_entry = rekor_types.Dsse( - spec=rekor_types.dsse.DsseV001Schema( - proposed_content=rekor_types.dsse.ProposedContent( - envelope=content.to_json(), - verifiers=[b64_cert.decode()], - ), - ), - ) - else: - hashed_input = sha256_digest(input_) + hashed_input = sha256_digest(input_) - artifact_signature = private_key.sign( - hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed()) - ) + artifact_signature = self._private_key.sign( + hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed()) + ) - content = MessageSignature( - message_digest=HashOutput( - algorithm=hashed_input.algorithm, - digest=hashed_input.digest, - ), - signature=artifact_signature, - ) + content = MessageSignature( + message_digest=HashOutput( + algorithm=hashed_input.algorithm, + digest=hashed_input.digest, + ), + signature=artifact_signature, + ) - # Create the proposed hashedrekord entry - proposed_entry = rekor_types.Hashedrekord( - spec=rekor_types.hashedrekord.HashedrekordV001Schema( - signature=rekor_types.hashedrekord.Signature( - content=base64.b64encode(artifact_signature).decode(), - public_key=rekor_types.hashedrekord.PublicKey( - content=b64_cert.decode() - ), - ), - data=rekor_types.hashedrekord.Data( - hash=rekor_types.hashedrekord.Hash( - algorithm=hashed_input._as_hashedrekord_algorithm(), - value=hashed_input.digest.hex(), - ) + # Create the proposed hashedrekord entry + proposed_entry = rekor_types.Hashedrekord( + spec=rekor_types.hashedrekord.HashedrekordV001Schema( + signature=rekor_types.hashedrekord.Signature( + content=base64.b64encode(artifact_signature).decode(), + public_key=rekor_types.hashedrekord.PublicKey( + content=b64_cert.decode() ), ), - ) - - # Submit the proposed entry to the transparency log - entry = self._signing_ctx._rekor.log.entries.post(proposed_entry) - - _logger.debug(f"Transparency log entry created with index: {entry.log_index}") + data=rekor_types.hashedrekord.Data( + hash=rekor_types.hashedrekord.Hash( + algorithm=hashed_input._as_hashedrekord_algorithm(), + value=hashed_input.digest.hex(), + ) + ), + ), + ) - return Bundle._from_parts(cert, content, entry) + return self._finalize_sign(cert, content, proposed_entry) class SigningContext: diff --git a/test/unit/test_sign.py b/test/unit/test_sign.py index db755ef6d..fe7b69a9b 100644 --- a/test/unit/test_sign.py +++ b/test/unit/test_sign.py @@ -48,7 +48,7 @@ def test_sign_rekor_entry_consistent(signer_and_ident): payload = secrets.token_bytes(32) with ctx.signer(identity) as signer: - expected_entry = signer.sign(payload).log_entry + expected_entry = signer.sign_artifact(payload).log_entry actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry.log_index) @@ -74,7 +74,7 @@ def test_sct_verify_keyring_lookup_error(signer_and_ident, monkeypatch): payload = secrets.token_bytes(32) with pytest.raises(VerificationError, match=r"SCT verify failed:"): with ctx.signer(identity) as signer: - signer.sign(payload) + signer.sign_artifact(payload) @pytest.mark.online @@ -95,7 +95,7 @@ def test_sct_verify_keyring_error(signer_and_ident, monkeypatch): with pytest.raises(VerificationError): with ctx.signer(identity) as signer: - signer.sign(payload) + signer.sign_artifact(payload) @pytest.mark.online @@ -112,7 +112,7 @@ def test_identity_proof_claim_lookup(signer_and_ident, monkeypatch): payload = secrets.token_bytes(32) with ctx.signer(identity) as signer: - expected_entry = signer.sign(payload).log_entry + expected_entry = signer.sign_artifact(payload).log_entry actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry.log_index) assert expected_entry.body == actual_entry.body @@ -135,7 +135,7 @@ def test_sign_prehashed(staging): ) with sign_ctx.signer(identity) as signer: - bundle = signer.sign(hashed) + bundle = signer.sign_artifact(hashed) assert bundle._inner.message_signature.message_digest.algorithm == hashed.algorithm assert bundle._inner.message_signature.message_digest.digest == hashed.digest @@ -167,6 +167,6 @@ def test_sign_dsse(staging): ).build() with ctx.signer(identity) as signer: - bundle = signer.sign(stmt) + bundle = signer.sign_intoto(stmt) # Ensures that all of our inner types serialize as expected. bundle.to_json()