31
31
32
32
signing_ctx = SigningContext.production()
33
33
with signing_ctx.signer(identity, cache=True) as signer:
34
- result = signer.sign (artifact)
34
+ result = signer.sign_artifact (artifact)
35
35
print(result)
36
36
```
37
37
"""
58
58
from sigstore import hashes as sigstore_hashes
59
59
from sigstore ._internal .fulcio import (
60
60
ExpiredCertificate ,
61
- FulcioCertificateSigningResponse ,
62
61
FulcioClient ,
63
62
)
64
63
from sigstore ._internal .rekor .client import RekorClient
@@ -98,14 +97,12 @@ def __init__(
98
97
self ._identity_token = identity_token
99
98
self ._signing_ctx : SigningContext = signing_ctx
100
99
self .__cached_private_key : Optional [ec .EllipticCurvePrivateKey ] = None
101
- self .__cached_signing_certificate : Optional [
102
- FulcioCertificateSigningResponse
103
- ] = None
100
+ self .__cached_signing_certificate : Optional [x509 .Certificate ] = None
104
101
if cache :
105
102
_logger .debug ("Generating ephemeral keys..." )
106
103
self .__cached_private_key = ec .generate_private_key (ec .SECP256R1 ())
107
104
_logger .debug ("Requesting ephemeral certificate..." )
108
- self .__cached_signing_certificate = self ._signing_cert (self . _private_key )
105
+ self .__cached_signing_certificate = self ._signing_cert ()
109
106
110
107
@property
111
108
def _private_key (self ) -> ec .EllipticCurvePrivateKey :
@@ -117,12 +114,22 @@ def _private_key(self) -> ec.EllipticCurvePrivateKey:
117
114
118
115
def _signing_cert (
119
116
self ,
120
- private_key : ec .EllipticCurvePrivateKey ,
121
- ) -> FulcioCertificateSigningResponse :
122
- """Get or request a signing certificate from Fulcio."""
117
+ ) -> x509 .Certificate :
118
+ """
119
+ Get or request a signing certificate from Fulcio.
120
+
121
+ Internally, this performs a CSR against Fulcio and verifies that
122
+ the returned certificate is present in Fulcio's CT log.
123
+ """
124
+
125
+ # Our CSR cannot possibly succeed if our underlying identity token
126
+ # is expired.
127
+ if not self ._identity_token .in_validity_period ():
128
+ raise ExpiredIdentity
129
+
123
130
# If it exists, verify if the current certificate is expired
124
131
if self .__cached_signing_certificate :
125
- not_valid_after = self .__cached_signing_certificate .cert . not_valid_after_utc
132
+ not_valid_after = self .__cached_signing_certificate .not_valid_after_utc
126
133
if datetime .now (timezone .utc ) > not_valid_after :
127
134
raise ExpiredCertificate
128
135
return self .__cached_signing_certificate
@@ -147,110 +154,131 @@ def _signing_cert(
147
154
critical = True ,
148
155
)
149
156
)
150
- certificate_request = builder .sign (private_key , hashes .SHA256 ())
157
+ certificate_request = builder .sign (self . _private_key , hashes .SHA256 ())
151
158
152
159
certificate_response = self ._signing_ctx ._fulcio .signing_cert .post (
153
160
certificate_request , self ._identity_token
154
161
)
155
162
156
- return certificate_response
163
+ # Verify the SCT
164
+ sct = certificate_response .sct
165
+ cert = certificate_response .cert
166
+ chain = certificate_response .chain
167
+
168
+ verify_sct (sct , cert , chain , self ._signing_ctx ._trusted_root .ct_keyring ())
157
169
158
- def sign (
170
+ _logger .debug ("Successfully verified SCT..." )
171
+
172
+ return cert
173
+
174
+ def _finalize_sign (
159
175
self ,
160
- input_ : bytes | dsse .Statement | sigstore_hashes .Hashed ,
176
+ cert : x509 .Certificate ,
177
+ content : MessageSignature | dsse .Envelope ,
178
+ proposed_entry : rekor_types .Hashedrekord | rekor_types .Dsse ,
161
179
) -> Bundle :
162
180
"""
163
- Sign an input, and return a `Bundle` corresponding to the signed result.
181
+ Perform the common "finalizing" steps in a Sigstore signing flow.
182
+ """
183
+ # Submit the proposed entry to the transparency log
184
+ entry = self ._signing_ctx ._rekor .log .entries .post (proposed_entry )
164
185
165
- The input can be one of three forms:
186
+ _logger . debug ( f"Transparency log entry created with index: { entry . log_index } " )
166
187
167
- 1. A `bytes` buffer;
168
- 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs
169
- that are too large to buffer into memory);
170
- 3. An in-toto `Statement` object.
188
+ return Bundle ._from_parts (cert , content , entry )
171
189
172
- In cases (1) and (2), the signing operation will produce a `hashedrekord`
173
- entry within the bundle. In case (3), the signing operation will produce
174
- a DSSE envelope and corresponding `dsse` entry within the bundle.
190
+ def sign_intoto (
191
+ self ,
192
+ input_ : dsse .Statement ,
193
+ ) -> Bundle :
175
194
"""
176
- private_key = self ._private_key
195
+ Sign the given in-toto statement, and return a `Bundle` containing
196
+ the signed result.
177
197
178
- if not self ._identity_token .in_validity_period ():
179
- raise ExpiredIdentity
198
+ This API is **only** for in-toto statements; to sign arbitrary artifacts,
199
+ use `sign_artifact` instead.
200
+ """
201
+ cert = self ._signing_cert ()
180
202
181
- try :
182
- certificate_response = self . _signing_cert ( private_key )
183
- except ExpiredCertificate as e :
184
- raise e
203
+ # Prepare inputs
204
+ b64_cert = base64 . b64encode (
205
+ cert . public_bytes ( encoding = serialization . Encoding . PEM )
206
+ )
185
207
186
- # Verify the SCT
187
- sct = certificate_response .sct
188
- cert = certificate_response .cert
189
- chain = certificate_response .chain
208
+ # Sign the statement, producing a DSSE envelope
209
+ content = dsse ._sign (self ._private_key , input_ )
190
210
191
- verify_sct (sct , cert , chain , self ._signing_ctx ._trusted_root .ct_keyring ())
211
+ # Create the proposed DSSE log entry
212
+ proposed_entry = rekor_types .Dsse (
213
+ spec = rekor_types .dsse .DsseV001Schema (
214
+ proposed_content = rekor_types .dsse .ProposedContent (
215
+ envelope = content .to_json (),
216
+ verifiers = [b64_cert .decode ()],
217
+ ),
218
+ ),
219
+ )
192
220
193
- _logger .debug ("Successfully verified SCT..." )
221
+ return self ._finalize_sign (cert , content , proposed_entry )
222
+
223
+ def sign_artifact (
224
+ self ,
225
+ input_ : bytes | sigstore_hashes .Hashed ,
226
+ ) -> Bundle :
227
+ """
228
+ Sign an artifact, and return a `Bundle` corresponding to the signed result.
229
+
230
+ The input can be one of two forms:
231
+
232
+ 1. A `bytes` buffer;
233
+ 2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs
234
+ that are too large to buffer into memory).
235
+
236
+ Regardless of the input format, the signing operation will produce a
237
+ `hashedrekord` entry within the bundle. No other entry types
238
+ are supported by this API.
239
+ """
240
+
241
+ cert = self ._signing_cert ()
194
242
195
243
# Prepare inputs
196
244
b64_cert = base64 .b64encode (
197
245
cert .public_bytes (encoding = serialization .Encoding .PEM )
198
246
)
199
247
200
248
# Sign artifact
201
- content : MessageSignature | dsse .Envelope
202
- proposed_entry : rekor_types .Hashedrekord | rekor_types .Dsse
203
- if isinstance (input_ , dsse .Statement ):
204
- content = dsse ._sign (private_key , input_ )
205
-
206
- # Create the proposed DSSE entry
207
- proposed_entry = rekor_types .Dsse (
208
- spec = rekor_types .dsse .DsseV001Schema (
209
- proposed_content = rekor_types .dsse .ProposedContent (
210
- envelope = content .to_json (),
211
- verifiers = [b64_cert .decode ()],
212
- ),
213
- ),
214
- )
215
- else :
216
- hashed_input = sha256_digest (input_ )
249
+ hashed_input = sha256_digest (input_ )
217
250
218
- artifact_signature = private_key .sign (
219
- hashed_input .digest , ec .ECDSA (hashed_input ._as_prehashed ())
220
- )
251
+ artifact_signature = self . _private_key .sign (
252
+ hashed_input .digest , ec .ECDSA (hashed_input ._as_prehashed ())
253
+ )
221
254
222
- content = MessageSignature (
223
- message_digest = HashOutput (
224
- algorithm = hashed_input .algorithm ,
225
- digest = hashed_input .digest ,
226
- ),
227
- signature = artifact_signature ,
228
- )
255
+ content = MessageSignature (
256
+ message_digest = HashOutput (
257
+ algorithm = hashed_input .algorithm ,
258
+ digest = hashed_input .digest ,
259
+ ),
260
+ signature = artifact_signature ,
261
+ )
229
262
230
- # Create the proposed hashedrekord entry
231
- proposed_entry = rekor_types .Hashedrekord (
232
- spec = rekor_types .hashedrekord .HashedrekordV001Schema (
233
- signature = rekor_types .hashedrekord .Signature (
234
- content = base64 .b64encode (artifact_signature ).decode (),
235
- public_key = rekor_types .hashedrekord .PublicKey (
236
- content = b64_cert .decode ()
237
- ),
238
- ),
239
- data = rekor_types .hashedrekord .Data (
240
- hash = rekor_types .hashedrekord .Hash (
241
- algorithm = hashed_input ._as_hashedrekord_algorithm (),
242
- value = hashed_input .digest .hex (),
243
- )
263
+ # Create the proposed hashedrekord entry
264
+ proposed_entry = rekor_types .Hashedrekord (
265
+ spec = rekor_types .hashedrekord .HashedrekordV001Schema (
266
+ signature = rekor_types .hashedrekord .Signature (
267
+ content = base64 .b64encode (artifact_signature ).decode (),
268
+ public_key = rekor_types .hashedrekord .PublicKey (
269
+ content = b64_cert .decode ()
244
270
),
245
271
),
246
- )
247
-
248
- # Submit the proposed entry to the transparency log
249
- entry = self ._signing_ctx ._rekor .log .entries .post (proposed_entry )
250
-
251
- _logger .debug (f"Transparency log entry created with index: { entry .log_index } " )
272
+ data = rekor_types .hashedrekord .Data (
273
+ hash = rekor_types .hashedrekord .Hash (
274
+ algorithm = hashed_input ._as_hashedrekord_algorithm (),
275
+ value = hashed_input .digest .hex (),
276
+ )
277
+ ),
278
+ ),
279
+ )
252
280
253
- return Bundle . _from_parts (cert , content , entry )
281
+ return self . _finalize_sign (cert , content , proposed_entry )
254
282
255
283
256
284
class SigningContext :
0 commit comments