Skip to content

Commit

Permalink
Backport to branch(3.13) : Add dtos and other classes for task (#2532)
Browse files Browse the repository at this point in the history
Co-authored-by: inv-jishnu <31100916+inv-jishnu@users.noreply.github.com>
  • Loading branch information
feeblefakie and inv-jishnu authored Feb 13, 2025
1 parent 90cbc4d commit 6eeb5d4
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 0 deletions.
14 changes: 14 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,20 @@ public enum CoreError implements ScalarDbError {
"Duplicated data mappings found for column '%s' in table '%s'",
"",
""),
DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN(
Category.USER_ERROR,
"0174",
"Missing required field or column mapping for clustering key %s",
"",
""),
DATA_LOADER_MISSING_PARTITION_KEY_COLUMN(
Category.USER_ERROR,
"0175",
"Missing required field or column mapping for partition key %s",
"",
""),
DATA_LOADER_MISSING_COLUMN(
Category.USER_ERROR, "0176", "Missing field or column mapping for %s", "", ""),

//
// Errors for the concurrency error category
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.scalar.db.dataloader.core.dataimport;

import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
import lombok.Builder;
import lombok.Data;

/** Import options to import data into one or more ScalarDB tables */
@Builder
@Data
public class ImportOptions {

@Builder.Default private final ImportMode importMode = ImportMode.UPSERT;
@Builder.Default private final boolean requireAllColumns = false;
@Builder.Default private final FileFormat fileFormat = FileFormat.JSON;
@Builder.Default private final boolean prettyPrint = false;
@Builder.Default private final boolean ignoreNullValues = false;
@Builder.Default private final LogMode logMode = LogMode.SPLIT_BY_DATA_CHUNK;

@Builder.Default
private final ControlFileValidationLevel controlFileValidationLevel =
ControlFileValidationLevel.MAPPED;

@Builder.Default private final char delimiter = ',';

@Builder.Default private final boolean logSuccessRecords = false;
@Builder.Default private final boolean logRawRecord = false;

private final int dataChunkSize;
private final int transactionBatchSize;
private final ControlFile controlFile;
private final String namespace;
private final String tableName;
private final int maxThreads;
private final String customHeaderRow;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.scalar.db.dataloader.core.dataimport.log;

/** Log modes available for import logging */
public enum LogMode {
SINGLE_FILE,
SPLIT_BY_DATA_CHUNK
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.scalar.db.dataloader.core.dataimport.task.mapping;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;

public class ImportDataMapping {

/**
* * Update the source data replace the source column name with the target column name according
* to control file table data
*
* @param source source data
* @param controlFileTable control file table to map source data
*/
public static void apply(ObjectNode source, ControlFileTable controlFileTable) {
// Copy the source field data to the target column if missing
for (ControlFileTableFieldMapping mapping : controlFileTable.getMappings()) {
String sourceField = mapping.getSourceField();
String targetColumn = mapping.getTargetColumn();

if (source.has(sourceField) && !source.has(targetColumn)) {
source.set(targetColumn, source.get(sourceField));
source.remove(sourceField);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.scalar.db.dataloader.core.dataimport.task.validation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.concurrent.Immutable;

/** The validation result for a data source record */
@Immutable
public final class ImportSourceRecordValidationResult {

private final List<String> errorMessages;
private final Set<String> columnsWithErrors;

/** Constructor */
public ImportSourceRecordValidationResult() {
this.errorMessages = new ArrayList<>();
this.columnsWithErrors = new HashSet<>();
}

/**
* Add a validation error message for a column. Also marking the column as containing an error.
*
* @param columnName column name
* @param errorMessage error message
*/
public void addErrorMessage(String columnName, String errorMessage) {
this.columnsWithErrors.add(columnName);
this.errorMessages.add(errorMessage);
}

/** @return Immutable list of validation error messages */
public List<String> getErrorMessages() {
return Collections.unmodifiableList(this.errorMessages);
}

/** @return Immutable set of columns that had errors */
public Set<String> getColumnsWithErrors() {
return Collections.unmodifiableSet(this.columnsWithErrors);
}

/** @return Validation is valid or not */
public boolean isValid() {
return this.errorMessages.isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.scalar.db.dataloader.core.dataimport.task.validation;

import com.fasterxml.jackson.databind.JsonNode;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.DatabaseKeyType;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils;
import java.util.Set;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ImportSourceRecordValidator {

/**
* Create list for validation error messages. Validate everything and not return when one single
* error is found. Avoiding trial and error imports where every time a new error appears
*
* @param partitionKeyNames List of partition keys in table
* @param clusteringKeyNames List of clustering keys in table
* @param columnNames List of all column names in table
* @param sourceRecord source data
* @param allColumnsRequired If true treat missing columns as an error
* @return Source record validation result
*/
public static ImportSourceRecordValidationResult validateSourceRecord(
Set<String> partitionKeyNames,
Set<String> clusteringKeyNames,
Set<String> columnNames,
JsonNode sourceRecord,
boolean allColumnsRequired,
TableMetadata tableMetadata) {
ImportSourceRecordValidationResult validationResult = new ImportSourceRecordValidationResult();

// check if partition keys are found
checkMissingKeys(DatabaseKeyType.PARTITION, partitionKeyNames, sourceRecord, validationResult);

// check if clustering keys are found
checkMissingKeys(
DatabaseKeyType.CLUSTERING, clusteringKeyNames, sourceRecord, validationResult);

// Check if the record is missing any columns
if (allColumnsRequired) {
checkMissingColumns(
sourceRecord,
columnNames,
validationResult,
validationResult.getColumnsWithErrors(),
tableMetadata);
}

return validationResult;
}

/**
* Check if the required keys are found in the data file.
*
* @param keyType Type of key to validate
* @param keyColumnNames List of required column names
* @param sourceRecord source data
* @param validationResult Source record validation result
*/
public static void checkMissingKeys(
DatabaseKeyType keyType,
Set<String> keyColumnNames,
JsonNode sourceRecord,
ImportSourceRecordValidationResult validationResult) {
for (String columnName : keyColumnNames) {
if (!sourceRecord.has(columnName)) {
String errorMessageFormat =
keyType == DatabaseKeyType.PARTITION
? CoreError.DATA_LOADER_MISSING_PARTITION_KEY_COLUMN.buildMessage(columnName)
: CoreError.DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN.buildMessage(columnName);
validationResult.addErrorMessage(columnName, errorMessageFormat);
}
}
}

/**
* Make sure the json object is not missing any columns. Error added to validation errors lists
*
* @param sourceRecord Source json object
* @param columnNames List of column names for a table
* @param validationResult Source record validation result
* @param ignoreColumns Columns that can be ignored in the check
*/
public static void checkMissingColumns(
JsonNode sourceRecord,
Set<String> columnNames,
ImportSourceRecordValidationResult validationResult,
Set<String> ignoreColumns,
TableMetadata tableMetadata) {
for (String columnName : columnNames) {
// If the field is not a metadata column and is missing and should not be ignored
if ((ignoreColumns == null || !ignoreColumns.contains(columnName))
&& !ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata)
&& !sourceRecord.has(columnName)) {
validationResult.addErrorMessage(
columnName, CoreError.DATA_LOADER_MISSING_COLUMN.buildMessage(columnName));
}
}
}

/**
* Make sure the json object is not missing any columns. Error added to validation errors lists
*
* @param sourceRecord Source json object
* @param columnNames List of column names for a table
* @param validationResult Source record validation result
*/
public static void checkMissingColumns(
JsonNode sourceRecord,
Set<String> columnNames,
ImportSourceRecordValidationResult validationResult,
TableMetadata tableMetadata) {
ImportSourceRecordValidator.checkMissingColumns(
sourceRecord, columnNames, validationResult, null, tableMetadata);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.scalar.db.dataloader.core.dataimport.task.mapping;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;
import java.util.ArrayList;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ImportDataMappingTest {

ControlFileTable controlFilTable;

@BeforeEach
void setup() {
controlFilTable = new ControlFileTable("namespace", "table");
ControlFileTableFieldMapping m1 = new ControlFileTableFieldMapping("source_id", "target_id");
ControlFileTableFieldMapping m2 =
new ControlFileTableFieldMapping("source_name", "target_name");
ControlFileTableFieldMapping m3 =
new ControlFileTableFieldMapping("source_email", "target_email");
ArrayList<ControlFileTableFieldMapping> mappingArrayList = new ArrayList<>();
mappingArrayList.add(m1);
mappingArrayList.add(m2);
mappingArrayList.add(m3);
controlFilTable.getMappings().addAll(mappingArrayList);
}

@Test
void apply_withValidData_shouldUpdateSourceData() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode source = objectMapper.createObjectNode();
source.put("source_id", "111");
source.put("source_name", "abc");
source.put("source_email", "sam@dsd.com");
ImportDataMapping.apply(source, controlFilTable);
// Assert changes
Assertions.assertEquals("111", source.get("target_id").asText());
Assertions.assertEquals("abc", source.get("target_name").asText());
Assertions.assertEquals("sam@dsd.com", source.get("target_email").asText());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.scalar.db.dataloader.core.dataimport.task.validation;

import com.fasterxml.jackson.databind.JsonNode;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.UnitTestUtils;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class ImportSourceRecordValidatorTest {

TableMetadata mockMetadata = UnitTestUtils.createTestTableMetadata();

@Test
void
validateSourceRecord_withValidData_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() {
Set<String> partitionKeyNames = mockMetadata.getPartitionKeyNames();
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
Set<String> columnNames = mockMetadata.getColumnNames();
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
ImportSourceRecordValidationResult result =
ImportSourceRecordValidator.validateSourceRecord(
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata);
Assertions.assertTrue(result.getColumnsWithErrors().isEmpty());
}

@Test
void
validateSourceRecord_withValidDataWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() {
Set<String> partitionKeyNames = mockMetadata.getPartitionKeyNames();
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
Set<String> columnNames = mockMetadata.getColumnNames();
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
ImportSourceRecordValidationResult result =
ImportSourceRecordValidator.validateSourceRecord(
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata);
Assertions.assertTrue(result.getColumnsWithErrors().isEmpty());
}

@Test
void
validateSourceRecord_withInValidPartitionKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() {
Set<String> partitionKeyNames = new HashSet<>();
partitionKeyNames.add("id1");
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
Set<String> columnNames = mockMetadata.getColumnNames();
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
ImportSourceRecordValidationResult result =
ImportSourceRecordValidator.validateSourceRecord(
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata);
Assertions.assertFalse(result.getColumnsWithErrors().isEmpty());
}

@Test
void
validateSourceRecord_withInValidPartitionKeyWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithErrors() {
Set<String> partitionKeyNames = new HashSet<>();
partitionKeyNames.add("id1");
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
Set<String> columnNames = mockMetadata.getColumnNames();
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
ImportSourceRecordValidationResult result =
ImportSourceRecordValidator.validateSourceRecord(
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata);
Assertions.assertFalse(result.getColumnsWithErrors().isEmpty());
Assertions.assertEquals(1, result.getErrorMessages().size());
}

@Test
void
validateSourceRecord_withInValidClusteringKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() {
Set<String> partitionKeyNames = mockMetadata.getPartitionKeyNames();
Set<String> clusteringKeyNames = new HashSet<>();
clusteringKeyNames.add("id1");
Set<String> columnNames = mockMetadata.getColumnNames();
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
ImportSourceRecordValidationResult result =
ImportSourceRecordValidator.validateSourceRecord(
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata);
Assertions.assertFalse(result.getColumnsWithErrors().isEmpty());
Assertions.assertEquals(
CoreError.DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN.buildMessage("id1"),
result.getErrorMessages().get(0));
}
}

0 comments on commit 6eeb5d4

Please sign in to comment.