7
7
import com .linkedin .data .template .StringArray ;
8
8
import com .linkedin .data .template .UnionTemplate ;
9
9
import com .linkedin .metadata .dao .AspectKey ;
10
+ import com .linkedin .metadata .dao .BaseLocalDAO ;
10
11
import com .linkedin .metadata .dao .exception .ModelValidationException ;
11
12
import com .linkedin .metadata .dao .ingestion .AspectCallbackRegistry ;
12
13
import com .linkedin .metadata .dao .ingestion .AspectCallbackResponse ;
@@ -422,6 +423,7 @@ private void ingestAspect(Set<Class<? extends RecordTemplate>> aspectsToIgnore,
422
423
IngestionTrackingContext trackingContext , IngestionParams ingestionParams , AuditStamp auditStamp ,
423
424
boolean skipExtraProcessing ) {
424
425
if (!aspectsToIgnore .contains (aspect .getClass ())) {
426
+ BaseLocalDAO <INTERNAL_ASPECT_UNION , URN > shadowLocalDao = getShadowLocalDAO ();
425
427
if (getAspectRoutingGmsClientManager ().hasRegistered (aspect .getClass ())) {
426
428
try {
427
429
// get the updated aspect if there is a preupdate routing lambda registered
@@ -448,6 +450,10 @@ private void ingestAspect(Set<Class<? extends RecordTemplate>> aspectsToIgnore,
448
450
// the value of param skipExtraProcessing since any pre-update lambdas would have already been executed
449
451
// in the code above.
450
452
getLocalDAO ().rawAdd ((URN ) urn , aspect , auditStamp , trackingContext , ingestionParams );
453
+ // if there is a shadow local DAO, also call rawAdd on it to write it to the shadow db
454
+ if (shadowLocalDao != null ) {
455
+ shadowLocalDao .rawAdd ((URN ) urn , aspect , auditStamp , trackingContext , ingestionParams );
456
+ }
451
457
} catch (Exception exception ) {
452
458
log .error ("Couldn't ingest routing aspect {} for {}" , aspect .getClass ().getSimpleName (), urn , exception );
453
459
throw exception ;
@@ -456,8 +462,14 @@ private void ingestAspect(Set<Class<? extends RecordTemplate>> aspectsToIgnore,
456
462
if (skipExtraProcessing ) {
457
463
// call a simple version of BaseLocalDAO::add which skips pre-update lambdas.
458
464
getLocalDAO ().rawAdd ((URN ) urn , aspect , auditStamp , trackingContext , ingestionParams );
465
+ if (shadowLocalDao != null ) {
466
+ shadowLocalDao .rawAdd ((URN ) urn , aspect , auditStamp , trackingContext , ingestionParams );
467
+ }
459
468
} else {
460
469
getLocalDAO ().add ((URN ) urn , aspect , auditStamp , trackingContext , ingestionParams );
470
+ if (shadowLocalDao != null ) {
471
+ shadowLocalDao .rawAdd ((URN ) urn , aspect , auditStamp , trackingContext , ingestionParams );
472
+ }
461
473
}
462
474
}
463
475
}
0 commit comments