48
48
from azul .deployment import (
49
49
aws ,
50
50
)
51
+ from azul .digests import (
52
+ Hasher ,
53
+ get_resumable_hasher ,
54
+ hasher_from_str ,
55
+ hasher_to_str ,
56
+ )
51
57
from azul .es import (
52
58
ESClientFactory ,
53
59
)
@@ -226,7 +232,8 @@ def mirror_part_message(self,
226
232
file : File ,
227
233
part : FilePart ,
228
234
upload_id : str ,
229
- etags : Sequence [str ]
235
+ etags : Sequence [str ],
236
+ hasher : Hasher
230
237
) -> SQSFifoMessage :
231
238
return SQSFifoMessage (
232
239
body = {
@@ -235,7 +242,8 @@ def mirror_part_message(self,
235
242
'upload_id' : upload_id ,
236
243
'action' : MirrorAction .mirror_part .to_json (),
237
244
'part' : part .to_json (),
238
- 'etags' : etags
245
+ 'etags' : etags ,
246
+ 'hasher' : hasher_to_str (hasher )
239
247
},
240
248
group_id = self .mirror_service .mirror_object_key (file )
241
249
)
@@ -244,15 +252,17 @@ def finalize_file_message(self,
244
252
catalog : CatalogName ,
245
253
file : File ,
246
254
upload_id : str ,
247
- etags : Sequence [str ]
255
+ etags : Sequence [str ],
256
+ hasher : Hasher
248
257
) -> SQSFifoMessage :
249
258
return SQSFifoMessage (
250
259
body = {
251
260
'catalog' : catalog ,
252
261
'file' : file .to_json (),
253
262
'upload_id' : upload_id ,
254
263
'action' : MirrorAction .finalize_file .to_json (),
255
- 'etags' : etags
264
+ 'etags' : etags ,
265
+ 'hasher' : hasher_to_str (hasher )
256
266
},
257
267
group_id = self .mirror_service .mirror_object_key (file )
258
268
)
@@ -729,49 +739,57 @@ def mirror_file(self,
729
739
log .info ('Successfully mirrored file %r via standard upload' , file .uuid )
730
740
else :
731
741
log .info ('Mirroring file %r via multi-part upload' , file .uuid )
742
+ _ , digest_type = file .digest ()
743
+ hasher = get_resumable_hasher (digest_type )
732
744
upload_id = self .mirror_service .begin_mirroring_file (file )
733
745
first_part = FilePart .first (file , part_size )
734
- etag = self .mirror_service .mirror_file_part (catalog , file , first_part , upload_id )
746
+ etag = self .mirror_service .mirror_file_part (catalog , file , first_part , upload_id , hasher )
735
747
next_part = first_part .next (file )
736
748
assert next_part is not None
737
- messages = [self .mirror_part_message (catalog , file , next_part , upload_id , [etag ])]
749
+ messages = [self .mirror_part_message (catalog , file , next_part , upload_id , [etag ], hasher )]
738
750
self .queue_mirror_messages (messages )
739
751
740
752
def mirror_file_part (self ,
741
753
catalog : CatalogName ,
742
754
file_json : JSON ,
743
755
part_json : JSON ,
744
756
upload_id : str ,
745
- etags : Sequence [str ]
757
+ etags : Sequence [str ],
758
+ hasher_data : str
746
759
):
747
760
file = self .load_file (catalog , file_json )
748
761
part = FilePart .from_json (part_json )
749
- etag = self .mirror_service .mirror_file_part (catalog , file , part , upload_id )
762
+ hasher = hasher_from_str (hasher_data )
763
+ etag = self .mirror_service .mirror_file_part (catalog , file , part , upload_id , hasher )
750
764
etags = [* etags , etag ]
751
765
next_part = part .next (file )
752
766
if next_part is None :
753
767
log .info ('File %r fully uploaded in %d parts' , file .uuid , len (etags ))
754
768
message = self .finalize_file_message (catalog ,
755
769
file ,
756
770
upload_id ,
757
- etags )
771
+ etags ,
772
+ hasher )
758
773
else :
759
774
message = self .mirror_part_message (catalog ,
760
775
file ,
761
776
next_part ,
762
777
upload_id ,
763
- etags )
778
+ etags ,
779
+ hasher )
764
780
self .queue_mirror_messages ([message ])
765
781
766
782
def finalize_file (self ,
767
783
catalog : CatalogName ,
768
784
file_json : JSON ,
769
785
upload_id : str ,
770
- etags : Sequence [str ]
786
+ etags : Sequence [str ],
787
+ hasher_data : str
771
788
):
772
789
file = self .load_file (catalog , file_json )
773
790
assert len (etags ) > 0
774
- self .mirror_service .finish_mirroring_file (file , upload_id , etags )
791
+ hasher = hasher_from_str (hasher_data )
792
+ self .mirror_service .finish_mirroring_file (file , upload_id , etags , hasher )
775
793
log .info ('Successfully mirrored file %r via multi-part upload' , file .uuid )
776
794
777
795
def load_file (self , catalog : CatalogName , file : JSON ) -> File :
0 commit comments