diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 6aac4fbed..d10969ddd 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -742,6 +742,20 @@ public enum CoreError implements ScalarDbError { "Duplicated data mappings found for column '%s' in table '%s'", "", ""), + DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN( + Category.USER_ERROR, + "0174", + "Missing required field or column mapping for clustering key %s", + "", + ""), + DATA_LOADER_MISSING_PARTITION_KEY_COLUMN( + Category.USER_ERROR, + "0175", + "Missing required field or column mapping for partition key %s", + "", + ""), + DATA_LOADER_MISSING_COLUMN( + Category.USER_ERROR, "0176", "Missing field or column mapping for %s", "", ""), // // Errors for the concurrency error category diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java new file mode 100644 index 000000000..9cb6225d3 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java @@ -0,0 +1,38 @@ +package com.scalar.db.dataloader.core.dataimport; + +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel; +import com.scalar.db.dataloader.core.dataimport.log.LogMode; +import lombok.Builder; +import lombok.Data; + +/** Import options to import data into one or more ScalarDB tables */ +@Builder +@Data +public class ImportOptions { + + @Builder.Default private final ImportMode importMode = ImportMode.UPSERT; + @Builder.Default private final boolean requireAllColumns = false; + @Builder.Default private final FileFormat fileFormat = FileFormat.JSON; + @Builder.Default private final boolean prettyPrint = false; + @Builder.Default private final boolean ignoreNullValues = false; + @Builder.Default private final LogMode logMode = LogMode.SPLIT_BY_DATA_CHUNK; + + @Builder.Default + private final ControlFileValidationLevel controlFileValidationLevel = + ControlFileValidationLevel.MAPPED; + + @Builder.Default private final char delimiter = ','; + + @Builder.Default private final boolean logSuccessRecords = false; + @Builder.Default private final boolean logRawRecord = false; + + private final int dataChunkSize; + private final int transactionBatchSize; + private final ControlFile controlFile; + private final String namespace; + private final String tableName; + private final int maxThreads; + private final String customHeaderRow; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogMode.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogMode.java new file mode 100644 index 000000000..cf0349366 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogMode.java @@ -0,0 +1,7 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +/** Log modes available for import logging */ +public enum LogMode { + SINGLE_FILE, + SPLIT_BY_DATA_CHUNK +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMapping.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMapping.java new file mode 100644 index 000000000..7f7524d26 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMapping.java @@ -0,0 +1,28 @@ +package com.scalar.db.dataloader.core.dataimport.task.mapping; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping; + +public class ImportDataMapping { + + /** + * * Update the source data replace the source column name with the target column name according + * to control file table data + * + * @param source source data + * @param controlFileTable control file table to map source data + */ + public static void apply(ObjectNode source, ControlFileTable controlFileTable) { + // Copy the source field data to the target column if missing + for (ControlFileTableFieldMapping mapping : controlFileTable.getMappings()) { + String sourceField = mapping.getSourceField(); + String targetColumn = mapping.getTargetColumn(); + + if (source.has(sourceField) && !source.has(targetColumn)) { + source.set(targetColumn, source.get(sourceField)); + source.remove(sourceField); + } + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java new file mode 100644 index 000000000..30b878b9e --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java @@ -0,0 +1,48 @@ +package com.scalar.db.dataloader.core.dataimport.task.validation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.concurrent.Immutable; + +/** The validation result for a data source record */ +@Immutable +public final class ImportSourceRecordValidationResult { + + private final List errorMessages; + private final Set columnsWithErrors; + + /** Constructor */ + public ImportSourceRecordValidationResult() { + this.errorMessages = new ArrayList<>(); + this.columnsWithErrors = new HashSet<>(); + } + + /** + * Add a validation error message for a column. Also marking the column as containing an error. + * + * @param columnName column name + * @param errorMessage error message + */ + public void addErrorMessage(String columnName, String errorMessage) { + this.columnsWithErrors.add(columnName); + this.errorMessages.add(errorMessage); + } + + /** @return Immutable list of validation error messages */ + public List getErrorMessages() { + return Collections.unmodifiableList(this.errorMessages); + } + + /** @return Immutable set of columns that had errors */ + public Set getColumnsWithErrors() { + return Collections.unmodifiableSet(this.columnsWithErrors); + } + + /** @return Validation is valid or not */ + public boolean isValid() { + return this.errorMessages.isEmpty(); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java new file mode 100644 index 000000000..6d773ffcc --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java @@ -0,0 +1,119 @@ +package com.scalar.db.dataloader.core.dataimport.task.validation; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.DatabaseKeyType; +import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils; +import java.util.Set; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ImportSourceRecordValidator { + + /** + * Create list for validation error messages. Validate everything and not return when one single + * error is found. Avoiding trial and error imports where every time a new error appears + * + * @param partitionKeyNames List of partition keys in table + * @param clusteringKeyNames List of clustering keys in table + * @param columnNames List of all column names in table + * @param sourceRecord source data + * @param allColumnsRequired If true treat missing columns as an error + * @return Source record validation result + */ + public static ImportSourceRecordValidationResult validateSourceRecord( + Set partitionKeyNames, + Set clusteringKeyNames, + Set columnNames, + JsonNode sourceRecord, + boolean allColumnsRequired, + TableMetadata tableMetadata) { + ImportSourceRecordValidationResult validationResult = new ImportSourceRecordValidationResult(); + + // check if partition keys are found + checkMissingKeys(DatabaseKeyType.PARTITION, partitionKeyNames, sourceRecord, validationResult); + + // check if clustering keys are found + checkMissingKeys( + DatabaseKeyType.CLUSTERING, clusteringKeyNames, sourceRecord, validationResult); + + // Check if the record is missing any columns + if (allColumnsRequired) { + checkMissingColumns( + sourceRecord, + columnNames, + validationResult, + validationResult.getColumnsWithErrors(), + tableMetadata); + } + + return validationResult; + } + + /** + * Check if the required keys are found in the data file. + * + * @param keyType Type of key to validate + * @param keyColumnNames List of required column names + * @param sourceRecord source data + * @param validationResult Source record validation result + */ + public static void checkMissingKeys( + DatabaseKeyType keyType, + Set keyColumnNames, + JsonNode sourceRecord, + ImportSourceRecordValidationResult validationResult) { + for (String columnName : keyColumnNames) { + if (!sourceRecord.has(columnName)) { + String errorMessageFormat = + keyType == DatabaseKeyType.PARTITION + ? CoreError.DATA_LOADER_MISSING_PARTITION_KEY_COLUMN.buildMessage(columnName) + : CoreError.DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN.buildMessage(columnName); + validationResult.addErrorMessage(columnName, errorMessageFormat); + } + } + } + + /** + * Make sure the json object is not missing any columns. Error added to validation errors lists + * + * @param sourceRecord Source json object + * @param columnNames List of column names for a table + * @param validationResult Source record validation result + * @param ignoreColumns Columns that can be ignored in the check + */ + public static void checkMissingColumns( + JsonNode sourceRecord, + Set columnNames, + ImportSourceRecordValidationResult validationResult, + Set ignoreColumns, + TableMetadata tableMetadata) { + for (String columnName : columnNames) { + // If the field is not a metadata column and is missing and should not be ignored + if ((ignoreColumns == null || !ignoreColumns.contains(columnName)) + && !ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata) + && !sourceRecord.has(columnName)) { + validationResult.addErrorMessage( + columnName, CoreError.DATA_LOADER_MISSING_COLUMN.buildMessage(columnName)); + } + } + } + + /** + * Make sure the json object is not missing any columns. Error added to validation errors lists + * + * @param sourceRecord Source json object + * @param columnNames List of column names for a table + * @param validationResult Source record validation result + */ + public static void checkMissingColumns( + JsonNode sourceRecord, + Set columnNames, + ImportSourceRecordValidationResult validationResult, + TableMetadata tableMetadata) { + ImportSourceRecordValidator.checkMissingColumns( + sourceRecord, columnNames, validationResult, null, tableMetadata); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMappingTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMappingTest.java new file mode 100644 index 000000000..b5e2e7041 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/mapping/ImportDataMappingTest.java @@ -0,0 +1,45 @@ +package com.scalar.db.dataloader.core.dataimport.task.mapping; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping; +import java.util.ArrayList; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ImportDataMappingTest { + + ControlFileTable controlFilTable; + + @BeforeEach + void setup() { + controlFilTable = new ControlFileTable("namespace", "table"); + ControlFileTableFieldMapping m1 = new ControlFileTableFieldMapping("source_id", "target_id"); + ControlFileTableFieldMapping m2 = + new ControlFileTableFieldMapping("source_name", "target_name"); + ControlFileTableFieldMapping m3 = + new ControlFileTableFieldMapping("source_email", "target_email"); + ArrayList mappingArrayList = new ArrayList<>(); + mappingArrayList.add(m1); + mappingArrayList.add(m2); + mappingArrayList.add(m3); + controlFilTable.getMappings().addAll(mappingArrayList); + } + + @Test + void apply_withValidData_shouldUpdateSourceData() throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode source = objectMapper.createObjectNode(); + source.put("source_id", "111"); + source.put("source_name", "abc"); + source.put("source_email", "sam@dsd.com"); + ImportDataMapping.apply(source, controlFilTable); + // Assert changes + Assertions.assertEquals("111", source.get("target_id").asText()); + Assertions.assertEquals("abc", source.get("target_name").asText()); + Assertions.assertEquals("sam@dsd.com", source.get("target_email").asText()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidatorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidatorTest.java new file mode 100644 index 000000000..65a85b3c3 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidatorTest.java @@ -0,0 +1,87 @@ +package com.scalar.db.dataloader.core.dataimport.task.validation; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.UnitTestUtils; +import java.util.HashSet; +import java.util.Set; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class ImportSourceRecordValidatorTest { + + TableMetadata mockMetadata = UnitTestUtils.createTestTableMetadata(); + + @Test + void + validateSourceRecord_withValidData_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() { + Set partitionKeyNames = mockMetadata.getPartitionKeyNames(); + Set clusteringKeyNames = mockMetadata.getClusteringKeyNames(); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata); + Assertions.assertTrue(result.getColumnsWithErrors().isEmpty()); + } + + @Test + void + validateSourceRecord_withValidDataWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() { + Set partitionKeyNames = mockMetadata.getPartitionKeyNames(); + Set clusteringKeyNames = mockMetadata.getClusteringKeyNames(); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata); + Assertions.assertTrue(result.getColumnsWithErrors().isEmpty()); + } + + @Test + void + validateSourceRecord_withInValidPartitionKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() { + Set partitionKeyNames = new HashSet<>(); + partitionKeyNames.add("id1"); + Set clusteringKeyNames = mockMetadata.getClusteringKeyNames(); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata); + Assertions.assertFalse(result.getColumnsWithErrors().isEmpty()); + } + + @Test + void + validateSourceRecord_withInValidPartitionKeyWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithErrors() { + Set partitionKeyNames = new HashSet<>(); + partitionKeyNames.add("id1"); + Set clusteringKeyNames = mockMetadata.getClusteringKeyNames(); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata); + Assertions.assertFalse(result.getColumnsWithErrors().isEmpty()); + Assertions.assertEquals(1, result.getErrorMessages().size()); + } + + @Test + void + validateSourceRecord_withInValidClusteringKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() { + Set partitionKeyNames = mockMetadata.getPartitionKeyNames(); + Set clusteringKeyNames = new HashSet<>(); + clusteringKeyNames.add("id1"); + Set columnNames = mockMetadata.getColumnNames(); + JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata(); + ImportSourceRecordValidationResult result = + ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata); + Assertions.assertFalse(result.getColumnsWithErrors().isEmpty()); + Assertions.assertEquals( + CoreError.DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN.buildMessage("id1"), + result.getErrorMessages().get(0)); + } +}