12
12
import com .linkedin .metadata .dao .utils .RecordUtils ;
13
13
import com .linkedin .metadata .dao .utils .SQLSchemaUtils ;
14
14
import com .linkedin .metadata .dao .utils .SQLStatementUtils ;
15
+ import com .linkedin .metadata .dao .utils .SchemaValidatorUtil ;
15
16
import com .linkedin .metadata .events .IngestionTrackingContext ;
16
17
import com .linkedin .metadata .query .ExtraInfo ;
17
18
import com .linkedin .metadata .query .ExtraInfoArray ;
@@ -74,6 +75,7 @@ public class EbeanLocalAccess<URN extends Urn> implements IEbeanLocalAccess<URN>
74
75
// key: table_name,
75
76
// value: Set(column1, column2, column3 ...)
76
77
private final Map <String , Set <String >> tableColumns = new ConcurrentHashMap <>();
78
+ private final SchemaValidatorUtil validator ;
77
79
78
80
public EbeanLocalAccess (EbeanServer server , ServerConfig serverConfig , @ Nonnull Class <URN > urnClass ,
79
81
UrnPathExtractor <URN > urnPathExtractor , boolean nonDollarVirtualColumnsEnabled ) {
@@ -83,6 +85,7 @@ public EbeanLocalAccess(EbeanServer server, ServerConfig serverConfig, @Nonnull
83
85
_entityType = ModelUtils .getEntityTypeFromUrnClass (_urnClass );
84
86
_schemaEvolutionManager = createSchemaEvolutionManager (serverConfig );
85
87
_nonDollarVirtualColumnsEnabled = nonDollarVirtualColumnsEnabled ;
88
+ validator = new SchemaValidatorUtil (server );
86
89
}
87
90
88
91
public void setUrnPathExtractor (@ Nonnull UrnPathExtractor <URN > urnPathExtractor ) {
@@ -276,7 +279,7 @@ public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(
276
279
for (int index = position ; index < end ; index ++) {
277
280
final Urn entityUrn = aspectKeys .get (index ).getUrn ();
278
281
final Class <ASPECT > aspectClass = (Class <ASPECT >) aspectKeys .get (index ).getAspectClass ();
279
- if (checkColumnExists (isTestMode ? getTestTableName (entityUrn ) : getTableName (entityUrn ),
282
+ if (validator . columnExists (isTestMode ? getTestTableName (entityUrn ) : getTableName (entityUrn ),
280
283
getAspectColumnName (entityUrn .getEntityType (), aspectClass ))) {
281
284
keysToQueryMap .computeIfAbsent (aspectClass , unused -> new HashSet <>()).add (entityUrn );
282
285
}
@@ -326,7 +329,7 @@ public ListResult<URN> listUrns(@Nullable IndexFilter indexFilter, @Nullable Ind
326
329
int start , int pageSize ) {
327
330
final SqlQuery sqlQuery = createFilterSqlQuery (indexFilter , indexSortCriterion , start , pageSize );
328
331
final List <SqlRow > sqlRows = sqlQuery .findList ();
329
- if (sqlRows .size () == 0 ) {
332
+ if (sqlRows .isEmpty () ) {
330
333
final List <SqlRow > totalCountResults = createFilterSqlQuery (indexFilter , indexSortCriterion , 0 , DEFAULT_PAGE_SIZE ).findList ();
331
334
final int actualTotalCount = totalCountResults .isEmpty () ? 0 : totalCountResults .get (0 ).getInteger ("_total_count" );
332
335
return toListResult (actualTotalCount , start , pageSize );
@@ -428,13 +431,13 @@ public Map<String, Long> countAggregate(@Nullable IndexFilter indexFilter,
428
431
getGeneratedColumnName (_entityType , indexGroupByCriterion .getAspect (), indexGroupByCriterion .getPath (),
429
432
_nonDollarVirtualColumnsEnabled );
430
433
// first, check for existence of the column we want to GROUP BY
431
- if (!checkColumnExists (tableName , groupByColumn )) {
434
+ if (!validator . columnExists (tableName , groupByColumn )) {
432
435
// if we are trying to GROUP BY the results on a column that does not exist, just return an empty map
433
436
return Collections .emptyMap ();
434
437
}
435
438
436
439
// now run the actual GROUP BY query
437
- final String groupBySql = SQLStatementUtils .createGroupBySql (_entityType , indexFilter , indexGroupByCriterion , _nonDollarVirtualColumnsEnabled );
440
+ final String groupBySql = SQLStatementUtils .createGroupBySql (_entityType , indexFilter , indexGroupByCriterion , _nonDollarVirtualColumnsEnabled , validator );
438
441
final SqlQuery sqlQuery = _server .createSqlQuery (groupBySql );
439
442
final List <SqlRow > sqlRows = sqlQuery .findList ();
440
443
Map <String , Long > resultMap = new HashMap <>();
@@ -461,7 +464,7 @@ public Map<String, Long> countAggregate(@Nullable IndexFilter indexFilter,
461
464
private SqlQuery createFilterSqlQuery (@ Nullable IndexFilter indexFilter ,
462
465
@ Nullable IndexSortCriterion indexSortCriterion , int offset , int pageSize ) {
463
466
StringBuilder filterSql = new StringBuilder ();
464
- filterSql .append (SQLStatementUtils .createFilterSql (_entityType , indexFilter , true , _nonDollarVirtualColumnsEnabled ));
467
+ filterSql .append (SQLStatementUtils .createFilterSql (_entityType , indexFilter , true , _nonDollarVirtualColumnsEnabled , validator ));
465
468
filterSql .append ("\n " );
466
469
filterSql .append (parseSortCriteria (_entityType , indexSortCriterion , _nonDollarVirtualColumnsEnabled ));
467
470
filterSql .append (String .format (" LIMIT %d" , Math .max (pageSize , 0 )));
@@ -475,7 +478,7 @@ private SqlQuery createFilterSqlQuery(@Nullable IndexFilter indexFilter,
475
478
private SqlQuery createFilterSqlQuery (@ Nullable IndexFilter indexFilter ,
476
479
@ Nullable IndexSortCriterion indexSortCriterion , @ Nullable URN lastUrn , int pageSize ) {
477
480
StringBuilder filterSql = new StringBuilder ();
478
- filterSql .append (SQLStatementUtils .createFilterSql (_entityType , indexFilter , false , _nonDollarVirtualColumnsEnabled ));
481
+ filterSql .append (SQLStatementUtils .createFilterSql (_entityType , indexFilter , false , _nonDollarVirtualColumnsEnabled , validator ));
479
482
480
483
if (lastUrn != null ) {
481
484
// because createFilterSql will only include a WHERE clause if there are non-urn filters, we need to make sure
@@ -621,23 +624,6 @@ private SchemaEvolutionManager createSchemaEvolutionManager(@Nonnull ServerConfi
621
624
return new FlywaySchemaEvolutionManager (config );
622
625
}
623
626
624
- /**
625
- * Check column exists in table.
626
- */
627
- public boolean checkColumnExists (@ Nonnull String tableName , @ Nonnull String columnName ) {
628
- // Fetch table columns on very first read and cache it in tableColumns
629
- if (!tableColumns .containsKey (tableName )) {
630
- final List <SqlRow > rows = _server .createSqlQuery (SQLStatementUtils .getAllColumnForTable (tableName )).findList ();
631
- Set <String > columns = new HashSet <>();
632
- for (SqlRow row : rows ) {
633
- columns .add (row .getString ("COLUMN_NAME" ).toLowerCase ());
634
- }
635
- tableColumns .put (tableName , columns );
636
- }
637
-
638
- return tableColumns .get (tableName ).contains (columnName .toLowerCase ());
639
- }
640
-
641
627
/**
642
628
* SQL implementation of find the latest {@link EbeanMetadataAspect}.
643
629
* @param connection {@link Connection} get from the current transaction, it should not be closed manually
0 commit comments