Skip to content

Commit 6674f4b

Browse files
committed
[R] Verify digest of file objects in mirror (#7069)
1 parent 7859db4 commit 6674f4b

File tree

7 files changed

+95
-19
lines changed

7 files changed

+95
-19
lines changed

.mypy.ini

+5-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ modules =
5353
azul.indexer.action_controller,
5454
azul.indexer.mirror_controller,
5555
azul.service.app_controller,
56-
azul.indexer.mirror_service
56+
azul.indexer.mirror_service,
57+
azul.digests
5758
packages =
5859
azul.openapi
5960

@@ -69,3 +70,6 @@ follow_untyped_imports = True
6970

7071
[mypy-aws_requests_auth.boto_utils]
7172
follow_untyped_imports = True
73+
74+
[mypy-resumablesha256.*]
75+
ignore_missing_imports = True

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ jmespath==1.0.1
1717
more-itertools==10.7.0
1818
msgpack==1.1.0
1919
requests==2.32.3
20+
resumablesha256==1.0
2021
rsa==4.7.2 # < 4.8, see https://github.com/DataBiosphere/azul/issues/6774. Resolve build-time dependency ambguity
2122
setuptools==80.0.0 # Match requirements.pip.txt. Python 3.12 removed distutils, which we depended on transitively. Luckily, setuptools includes a vendored copy.
2223
urllib3==1.26.20 # < 2, see https://github.com/DataBiosphere/azul/issues/6777, match with types-urllib in requirements.dev.txt

scripts/mirror_file.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
from azul.args import (
1616
AzulArgumentHelpFormatter,
1717
)
18+
from azul.digests import (
19+
get_resumable_hasher,
20+
)
1821
from azul.http import (
1922
http_client,
2023
)
@@ -63,15 +66,17 @@ def mirror_file(catalog: CatalogName, file_uuid: str, part_size: int) -> str:
6366
file = get_file(catalog, file_uuid)
6467
service = MirrorService()
6568
upload_id = service.begin_mirroring_file(file)
69+
digest_value, digest_type = file.digest()
70+
hasher = get_resumable_hasher(digest_type)
6671

6772
def mirror_parts():
6873
part = FilePart.first(file, part_size)
6974
while part is not None:
70-
yield service.mirror_file_part(catalog, file, part, upload_id)
75+
yield service.mirror_file_part(catalog, file, part, upload_id, hasher)
7176
part = part.next(file)
7277

7378
etags = list(mirror_parts())
74-
service.finish_mirroring_file(file, upload_id, etags=etags)
79+
service.finish_mirroring_file(file, upload_id, etags=etags, hasher=hasher)
7580
return service.get_mirror_url(file)
7681

7782

src/azul/azulclient.py

+30-12
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@
4848
from azul.deployment import (
4949
aws,
5050
)
51+
from azul.digests import (
52+
Hasher,
53+
get_resumable_hasher,
54+
hasher_from_str,
55+
hasher_to_str,
56+
)
5157
from azul.es import (
5258
ESClientFactory,
5359
)
@@ -226,7 +232,8 @@ def mirror_part_message(self,
226232
file: File,
227233
part: FilePart,
228234
upload_id: str,
229-
etags: Sequence[str]
235+
etags: Sequence[str],
236+
hasher: Hasher
230237
) -> SQSFifoMessage:
231238
return SQSFifoMessage(
232239
body={
@@ -235,7 +242,8 @@ def mirror_part_message(self,
235242
'upload_id': upload_id,
236243
'action': MirrorAction.mirror_part.to_json(),
237244
'part': part.to_json(),
238-
'etags': etags
245+
'etags': etags,
246+
'hasher': hasher_to_str(hasher)
239247
},
240248
group_id=self.mirror_service.mirror_object_key(file)
241249
)
@@ -244,15 +252,17 @@ def finalize_file_message(self,
244252
catalog: CatalogName,
245253
file: File,
246254
upload_id: str,
247-
etags: Sequence[str]
255+
etags: Sequence[str],
256+
hasher: Hasher
248257
) -> SQSFifoMessage:
249258
return SQSFifoMessage(
250259
body={
251260
'catalog': catalog,
252261
'file': file.to_json(),
253262
'upload_id': upload_id,
254263
'action': MirrorAction.finalize_file.to_json(),
255-
'etags': etags
264+
'etags': etags,
265+
'hasher': hasher_to_str(hasher)
256266
},
257267
group_id=self.mirror_service.mirror_object_key(file)
258268
)
@@ -729,49 +739,57 @@ def mirror_file(self,
729739
log.info('Successfully mirrored file %r via standard upload', file.uuid)
730740
else:
731741
log.info('Mirroring file %r via multi-part upload', file.uuid)
742+
_, digest_type = file.digest()
743+
hasher = get_resumable_hasher(digest_type)
732744
upload_id = self.mirror_service.begin_mirroring_file(file)
733745
first_part = FilePart.first(file, part_size)
734-
etag = self.mirror_service.mirror_file_part(catalog, file, first_part, upload_id)
746+
etag = self.mirror_service.mirror_file_part(catalog, file, first_part, upload_id, hasher)
735747
next_part = first_part.next(file)
736748
assert next_part is not None
737-
messages = [self.mirror_part_message(catalog, file, next_part, upload_id, [etag])]
749+
messages = [self.mirror_part_message(catalog, file, next_part, upload_id, [etag], hasher)]
738750
self.queue_mirror_messages(messages)
739751

740752
def mirror_file_part(self,
741753
catalog: CatalogName,
742754
file_json: JSON,
743755
part_json: JSON,
744756
upload_id: str,
745-
etags: Sequence[str]
757+
etags: Sequence[str],
758+
hasher_data: str
746759
):
747760
file = self.load_file(catalog, file_json)
748761
part = FilePart.from_json(part_json)
749-
etag = self.mirror_service.mirror_file_part(catalog, file, part, upload_id)
762+
hasher = hasher_from_str(hasher_data)
763+
etag = self.mirror_service.mirror_file_part(catalog, file, part, upload_id, hasher)
750764
etags = [*etags, etag]
751765
next_part = part.next(file)
752766
if next_part is None:
753767
log.info('File %r fully uploaded in %d parts', file.uuid, len(etags))
754768
message = self.finalize_file_message(catalog,
755769
file,
756770
upload_id,
757-
etags)
771+
etags,
772+
hasher)
758773
else:
759774
message = self.mirror_part_message(catalog,
760775
file,
761776
next_part,
762777
upload_id,
763-
etags)
778+
etags,
779+
hasher)
764780
self.queue_mirror_messages([message])
765781

766782
def finalize_file(self,
767783
catalog: CatalogName,
768784
file_json: JSON,
769785
upload_id: str,
770-
etags: Sequence[str]
786+
etags: Sequence[str],
787+
hasher_data: str
771788
):
772789
file = self.load_file(catalog, file_json)
773790
assert len(etags) > 0
774-
self.mirror_service.finish_mirroring_file(file, upload_id, etags)
791+
hasher = hasher_from_str(hasher_data)
792+
self.mirror_service.finish_mirroring_file(file, upload_id, etags, hasher)
775793
log.info('Successfully mirrored file %r via multi-part upload', file.uuid)
776794

777795
def load_file(self, catalog: CatalogName, file: JSON) -> File:

src/azul/digests.py

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import base64
2+
import pickle
3+
from typing import (
4+
Any,
5+
)
6+
7+
import resumablesha256
8+
9+
from azul import (
10+
R,
11+
)
12+
13+
# TODO: add type stub
14+
Hasher = Any
15+
16+
17+
def get_resumable_hasher(digest_type: str) -> Hasher:
18+
assert digest_type == 'sha256', R('Only sha256 is currently supported')
19+
return resumablesha256.sha256()
20+
21+
22+
def hasher_to_str(hasher: Hasher) -> str:
23+
return base64.b64encode(pickle.dumps(hasher)).decode('ascii')
24+
25+
26+
def hasher_from_str(s: str) -> Hasher:
27+
return pickle.loads(base64.b64decode(s))

src/azul/indexer/mirror_controller.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,13 @@ def _mirror(self, message: JSON):
5757
message['file'],
5858
message['part'],
5959
message['upload_id'],
60-
message['etags'])
60+
message['etags'],
61+
message['hasher'])
6162
elif action is MirrorAction.finalize_file:
6263
self.client.finalize_file(message['catalog'],
6364
message['file'],
6465
message['upload_id'],
65-
message['etags'])
66+
message['etags'],
67+
message['hasher'])
6668
else:
6769
assert False, action

src/azul/indexer/mirror_service.py

+21-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
from azul.deployment import (
2727
aws,
2828
)
29+
from azul.digests import (
30+
Hasher,
31+
get_resumable_hasher,
32+
)
2933
from azul.drs import (
3034
AccessMethod,
3135
)
@@ -137,6 +141,10 @@ def mirror_file(self, catalog: CatalogName, file: File):
137141
self._storage.put(object_key=self.mirror_object_key(file),
138142
data=file_content,
139143
content_type=file.content_type)
144+
_, digest_type = file.digest()
145+
hasher = get_resumable_hasher(digest_type)
146+
hasher.update(file_content)
147+
self._verify_digest(file, hasher)
140148
self.put_info(file)
141149

142150
def begin_mirroring_file(self, file: File) -> str:
@@ -152,14 +160,17 @@ def mirror_file_part(self,
152160
catalog: CatalogName,
153161
file: File,
154162
part: FilePart,
155-
upload_id: str
163+
upload_id: str,
164+
hasher: Hasher
156165
) -> str:
157166
"""
158167
Upload a part of a file to a multipart upload begun with
159168
:meth:`begin_mirroring_file` and return the uploaded part's ETag.
169+
The provided hasher is mutated to incorporated the part's content.
160170
"""
161171
upload = self._get_upload(file, upload_id)
162172
file_content = self._download(catalog, file, part)
173+
hasher.update(file_content)
163174
return self._storage.upload_multipart_part(file_content,
164175
part.index + 1,
165176
upload)
@@ -168,13 +179,14 @@ def finish_mirroring_file(self,
168179
file: File,
169180
upload_id: str,
170181
etags: Sequence[str],
171-
hasher
182+
hasher: Hasher
172183
):
173184
"""
174185
Complete a multipart upload begun with :meth:`begin_mirroring_file`.
175186
"""
176187
upload = self._get_upload(file, upload_id)
177188
self._storage.complete_multipart_upload(upload, etags)
189+
self._verify_digest(file, hasher)
178190
self._check_info(file)
179191
self.put_info(file)
180192

@@ -255,3 +267,10 @@ def _download(self,
255267
def _get_upload(self, file: File, upload_id: str) -> 'MultipartUpload':
256268
return self._storage.load_multipart_upload(object_key=self.mirror_object_key(file),
257269
upload_id=upload_id)
270+
271+
def _verify_digest(self, file: File, hasher: Hasher):
272+
expected_digest_value, digest_type = file.digest()
273+
actual_digest_value = hasher.hexdigest()
274+
assert expected_digest_value == actual_digest_value, R(
275+
'File digest value does not match its contents',
276+
file.uuid, digest_type, expected_digest_value, actual_digest_value)

0 commit comments

Comments
 (0)