@@ -46,14 +46,16 @@ def get_s3_key(
46
46
prefix : str ,
47
47
) -> str :
48
48
"""
49
+ Figure out the S3 key location for the large attribute based on the DynamoDB
50
+ item's partition key, sort key, attribute name, and the value of the attribute.
51
+
49
52
:param pk: partition key.
50
53
:param sk: sort key, use None if no sort key.
51
54
:param attr: large attribute name.
52
55
:param value: large attribute value in binary format.
53
56
:param prefix: common S3 prefix.
54
57
55
58
:return: example "${prefix}/pk={pk}/sk={sk}/attr={attr}/md5={md5}"
56
-
57
59
"""
58
60
parts = list ()
59
61
if prefix : # pragma: no cover
@@ -71,6 +73,18 @@ def get_s3_key(
71
73
return "/" .join (parts )
72
74
73
75
76
+ T_S3_KEY_GETTER = T .Callable [
77
+ [
78
+ T .Union [str , int ],
79
+ T .Optional [T .Union [str , int ]],
80
+ str ,
81
+ bytes ,
82
+ str ,
83
+ ],
84
+ str ,
85
+ ]
86
+
87
+
74
88
def split_s3_uri (s3_uri : str ) -> T .Tuple [str , str ]:
75
89
parts = s3_uri .split ("/" , 3 )
76
90
return parts [2 ], parts [3 ]
@@ -224,6 +238,7 @@ def put_s3(
224
238
prefix : str ,
225
239
update_at : datetime ,
226
240
s3_put_object_kwargs : T .Optional [T .Dict [str , T .Dict [str , T .Any ]]] = None ,
241
+ s3_key_getter : T_S3_KEY_GETTER = get_s3_key ,
227
242
) -> PutS3Response :
228
243
"""
229
244
Put large attribute data to S3.
@@ -263,7 +278,7 @@ def put_s3(
263
278
s3_put_object_kwargs = dict ()
264
279
put_s3_response = PutS3Response (actions = [])
265
280
for attr , value in kvs .items ():
266
- s3_key = get_s3_key (pk = pk , sk = sk , attr = attr , value = value , prefix = prefix )
281
+ s3_key = s3_key_getter (pk , sk , attr , value , prefix )
267
282
s3_uri = join_s3_uri (bucket , s3_key )
268
283
if is_s3_object_exists (s3_client , bucket = bucket , key = s3_key ):
269
284
put_executed = False
@@ -303,6 +318,7 @@ def create_large_attribute_item(
303
318
prefix : str ,
304
319
update_at : datetime ,
305
320
s3_put_object_kwargs : T .Optional [T .Dict [str , T .Dict [str , T .Any ]]] = None ,
321
+ s3_key_getter : T_S3_KEY_GETTER = get_s3_key ,
306
322
attributes : T .Optional [T .Dict [str , T .Any ]] = None ,
307
323
clean_up_when_failed : bool = True ,
308
324
_error : T .Optional [Exception ] = None ,
@@ -340,6 +356,7 @@ def create_large_attribute_item(
340
356
prefix = prefix ,
341
357
update_at = update_at ,
342
358
s3_put_object_kwargs = s3_put_object_kwargs ,
359
+ s3_key_getter = s3_key_getter ,
343
360
)
344
361
try :
345
362
# this is for unit test purpose to simulate the DynamoDB operation failed
@@ -370,6 +387,7 @@ def update_large_attribute_item(
370
387
prefix : str ,
371
388
update_at : datetime ,
372
389
s3_put_object_kwargs : T .Optional [T .Dict [str , T .Dict [str , T .Any ]]] = None ,
390
+ s3_key_getter : T_S3_KEY_GETTER = get_s3_key ,
373
391
update_actions : T .Optional [T .List [Action ]] = None ,
374
392
consistent_read : bool = False ,
375
393
clean_up_when_succeeded : bool = True ,
@@ -412,6 +430,7 @@ def update_large_attribute_item(
412
430
prefix = prefix ,
413
431
update_at = update_at ,
414
432
s3_put_object_kwargs = s3_put_object_kwargs ,
433
+ s3_key_getter = s3_key_getter ,
415
434
)
416
435
got_old_model = (
417
436
False # IDE show warning that this line is useless, but we do need it
0 commit comments