@@ -345,6 +345,11 @@ def update_documents(kls, entity: Entity, is_update: bool = False):
345
345
346
346
# Process entries in chunks to avoid too large mutation groups
347
347
entry_chunks = [entry_list [i : i + 100 ] for i in range (0 , len (entry_list ), 100 )]
348
+ entry_referrals : set [tuple [str , int ]] = (
349
+ set ()
350
+ ) # {(spanner_entry_id, referral_origin_entry_id), ...}
351
+
352
+ # First, write all entries and their attributes
348
353
for chunk in entry_chunks :
349
354
# Process each entry in the chunk with its own mutation group
350
355
with repo .database .mutation_groups () as mg :
@@ -387,15 +392,28 @@ def update_documents(kls, entity: Entity, is_update: bool = False):
387
392
)
388
393
389
394
for attrv in attr .prefetch_values :
390
- spanner_attribute_values .append (
391
- AdvancedSearchAttributeValue .create_instance (
392
- entry_id = spanner_entry_id ,
393
- attribute_id = spanner_attr_id ,
394
- attribute_value_id = str (uuid .uuid4 ()),
395
- entity_attr = entity_attr ,
396
- attrv = attrv ,
397
- )
395
+ value = AdvancedSearchAttributeValue .create_instance (
396
+ entry_id = spanner_entry_id ,
397
+ attribute_id = spanner_attr_id ,
398
+ attribute_value_id = str (uuid .uuid4 ()),
399
+ entity_attr = entity_attr ,
400
+ attrv = attrv ,
398
401
)
402
+ spanner_attribute_values .append (value )
403
+
404
+ # Collect referral relationships
405
+ match entity_attr .type :
406
+ case AttrType .OBJECT | AttrType .NAMED_OBJECT :
407
+ if attrv .referral :
408
+ entry_referrals .add (
409
+ (spanner_entry_id , attrv .referral .id )
410
+ )
411
+ case AttrType .ARRAY_OBJECT | AttrType .ARRAY_NAMED_OBJECT :
412
+ for array_value in attrv .data_array .all ():
413
+ if array_value .referral :
414
+ entry_referrals .add (
415
+ (spanner_entry_id , array_value .referral .id )
416
+ )
399
417
400
418
# Create a mutation group for this entry and its related data
401
419
group = mg .group ()
@@ -407,8 +425,52 @@ def update_documents(kls, entity: Entity, is_update: bool = False):
407
425
408
426
# Batch write all mutation groups for this chunk
409
427
responses = mg .batch_write ()
410
- if not all (response .status .code == 0 for response in responses ):
411
- raise Exception (f"Failed to batch write to Spanner: { responses } " )
428
+ if any (response .status .code != 0 for response in responses ):
429
+ error_details = [
430
+ (
431
+ f"code: { response .status .code } , "
432
+ f"message: { response .status .message } "
433
+ )
434
+ for response in responses
435
+ if response .status .code != 0
436
+ ]
437
+ raise Exception (f"Failed to batch write to Spanner: { error_details } " )
438
+
439
+ if entry_referrals :
440
+ # Get mapping from OriginEntryId to EntryId for referrals
441
+ referral_origin_ids = {ref_id for _ , ref_id in entry_referrals }
442
+ entry_id_mapping = repo .get_entry_id_mapping (list (referral_origin_ids ))
443
+
444
+ # Convert OriginEntryId to EntryId and filter out any missing mappings
445
+ converted_referrals = [
446
+ (entry_id , entry_id_mapping [ref_id ])
447
+ for entry_id , ref_id in entry_referrals
448
+ if ref_id in entry_id_mapping
449
+ ]
450
+
451
+ if converted_referrals :
452
+ with repo .database .mutation_groups () as mg :
453
+ referral_chunks = [
454
+ converted_referrals [i : i + 1000 ]
455
+ for i in range (0 , len (converted_referrals ), 1000 )
456
+ ]
457
+ for chunk in referral_chunks :
458
+ group = mg .group ()
459
+ repo .insert_entry_referrals (chunk , group )
460
+
461
+ responses = mg .batch_write ()
462
+ if any (response .status .code != 0 for response in responses ):
463
+ error_details = [
464
+ (
465
+ f"code: { response .status .code } , "
466
+ f"message: { response .status .message } "
467
+ )
468
+ for response in responses
469
+ if response .status .code != 0
470
+ ]
471
+ raise Exception (
472
+ f"Failed to batch write referrals to Spanner: { error_details } "
473
+ )
412
474
413
475
except Exception as e :
414
476
Logger .warning (f"Failed to sync data to Spanner: { e } " )
0 commit comments