diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/provider/PostgreSQLJdbcValueMappings.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/provider/PostgreSQLJdbcValueMappings.java index 9d868cbe5a..2680ec227a 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/provider/PostgreSQLJdbcValueMappings.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/provider/PostgreSQLJdbcValueMappings.java @@ -19,33 +19,138 @@ import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.JdbcValueMappingsProvider; import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.ResultSetValueExtractor; import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.ResultSetValueMapper; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.TimeStampTz; import com.google.common.collect.ImmutableMap; +import java.nio.ByteBuffer; import java.sql.ResultSet; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; import java.util.Calendar; import java.util.Map; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.commons.lang3.tuple.Pair; /** PostgreSQL data type mapping to AVRO types. */ public class PostgreSQLJdbcValueMappings implements JdbcValueMappingsProvider { + + private static final Calendar UTC_CALENDAR = + Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)); + + private static final DateTimeFormatter TIMESTAMPTZ_FORMAT = + new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .optionalStart() + .appendFraction(ChronoField.NANO_OF_SECOND, 1, 6, true) + .optionalEnd() + .appendOffset("+HH:mm", "+00") + .toFormatter(); + + private static long toMicros(Instant instant) { + return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); + } + + private static long toMicros(OffsetTime offsetTime) { + return TimeUnit.HOURS.toMicros(offsetTime.getHour()) + + TimeUnit.MINUTES.toMicros(offsetTime.getMinute()) + + TimeUnit.SECONDS.toMicros(offsetTime.getSecond()) + + TimeUnit.NANOSECONDS.toMicros(offsetTime.getNano()); + } + private static final ResultSetValueMapper valuePassThrough = (value, schema) -> value; - private static final Calendar utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + private static final ResultSetValueExtractor bytesExtractor = + (rs, fieldName) -> { + byte[] bytes = rs.getBytes(fieldName); + if (bytes == null) { + return null; + } + return ByteBuffer.wrap(bytes); + }; + + private static final ResultSetValueExtractor dateExtractor = + (rs, fieldName) -> rs.getDate(fieldName, UTC_CALENDAR); + + private static final ResultSetValueExtractor timestampExtractor = + (rs, fieldName) -> rs.getTimestamp(fieldName, UTC_CALENDAR); - private static final ResultSetValueExtractor utcDateExtractor = - (rs, fieldName) -> rs.getDate(fieldName, utcCalendar); + private static final ResultSetValueExtractor timestamptzExtractor = + (rs, fieldName) -> { + String timestampTz = rs.getString(fieldName); + if (timestampTz == null) { + return null; + } + return OffsetDateTime.parse(timestampTz, TIMESTAMPTZ_FORMAT); + }; - private static final ResultSetValueMapper sqlDateToAvro = + // Value might be a Double.NaN or a valid BigDecimal + private static final ResultSetValueMapper numericToAvro = + (value, schema) -> value.toString(); + + private static final ResultSetValueMapper dateToAvro = (value, schema) -> (int) value.toLocalDate().toEpochDay(); - // TODO(thiagotnunes): Add missing type mappings + private static final ResultSetValueMapper timestampToAvro = + (value, schema) -> toMicros(value.toInstant()); + + private static final ResultSetValueMapper timestamptzToAvro = + (value, schema) -> + new GenericRecordBuilder(TimeStampTz.SCHEMA) + .set(TimeStampTz.TIMESTAMP_FIELD_NAME, toMicros(value.toInstant())) + .set( + TimeStampTz.OFFSET_FIELD_NAME, + TimeUnit.SECONDS.toMillis(value.getOffset().getTotalSeconds())) + .build(); + private static final ImmutableMap> SCHEMA_MAPPINGS = ImmutableMap., ResultSetValueMapper>>builder() .put("BIGINT", Pair.of(ResultSet::getLong, valuePassThrough)) + .put("BIGSERIAL", Pair.of(ResultSet::getLong, valuePassThrough)) + .put("BIT", Pair.of(bytesExtractor, valuePassThrough)) + .put("BIT VARYING", Pair.of(bytesExtractor, valuePassThrough)) + .put("BOOL", Pair.of(ResultSet::getBoolean, valuePassThrough)) + .put("BOOLEAN", Pair.of(ResultSet::getBoolean, valuePassThrough)) + .put("BYTEA", Pair.of(bytesExtractor, valuePassThrough)) + .put("CHAR", Pair.of(ResultSet::getString, valuePassThrough)) + .put("CHARACTER", Pair.of(ResultSet::getString, valuePassThrough)) .put("CHARACTER VARYING", Pair.of(ResultSet::getString, valuePassThrough)) - .put("DATE", Pair.of(utcDateExtractor, sqlDateToAvro)) + .put("CITEXT", Pair.of(ResultSet::getString, valuePassThrough)) + .put("DATE", Pair.of(dateExtractor, dateToAvro)) + .put("DECIMAL", Pair.of(ResultSet::getObject, numericToAvro)) + .put("DOUBLE PRECISION", Pair.of(ResultSet::getDouble, valuePassThrough)) + .put("FLOAT4", Pair.of(ResultSet::getFloat, valuePassThrough)) + .put("FLOAT8", Pair.of(ResultSet::getDouble, valuePassThrough)) + .put("INT", Pair.of(ResultSet::getInt, valuePassThrough)) + .put("INTEGER", Pair.of(ResultSet::getInt, valuePassThrough)) + .put("INT2", Pair.of(ResultSet::getInt, valuePassThrough)) + .put("INT4", Pair.of(ResultSet::getInt, valuePassThrough)) .put("INT8", Pair.of(ResultSet::getLong, valuePassThrough)) + .put("JSON", Pair.of(ResultSet::getString, valuePassThrough)) + .put("JSONB", Pair.of(ResultSet::getString, valuePassThrough)) + .put("MONEY", Pair.of(ResultSet::getDouble, valuePassThrough)) + .put("NUMERIC", Pair.of(ResultSet::getObject, numericToAvro)) + .put("OID", Pair.of(ResultSet::getLong, valuePassThrough)) + .put("REAL", Pair.of(ResultSet::getFloat, valuePassThrough)) + .put("SERIAL", Pair.of(ResultSet::getInt, valuePassThrough)) + .put("SERIAL2", Pair.of(ResultSet::getInt, valuePassThrough)) + .put("SERIAL4", Pair.of(ResultSet::getInt, valuePassThrough)) + .put("SERIAL8", Pair.of(ResultSet::getLong, valuePassThrough)) + .put("SMALLINT", Pair.of(ResultSet::getInt, valuePassThrough)) + .put("SMALLSERIAL", Pair.of(ResultSet::getInt, valuePassThrough)) .put("TEXT", Pair.of(ResultSet::getString, valuePassThrough)) + .put("TIMESTAMP", Pair.of(timestampExtractor, timestampToAvro)) + .put("TIMESTAMPTZ", Pair.of(timestamptzExtractor, timestamptzToAvro)) + .put("TIMESTAMP WITH TIME ZONE", Pair.of(timestamptzExtractor, timestamptzToAvro)) + .put("TIMESTAMP WITHOUT TIME ZONE", Pair.of(timestampExtractor, timestampToAvro)) + .put("UUID", Pair.of(ResultSet::getString, valuePassThrough)) + .put("VARBIT", Pair.of(bytesExtractor, valuePassThrough)) .put("VARCHAR", Pair.of(ResultSet::getString, valuePassThrough)) .build() .entrySet() diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/PostgreSQLMappingProvider.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/PostgreSQLMappingProvider.java index 0e73e44610..b29eecd4b0 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/PostgreSQLMappingProvider.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/PostgreSQLMappingProvider.java @@ -33,20 +33,67 @@ public final class PostgreSQLMappingProvider { private static final ImmutableMap MAPPING = ImmutableMap.builder() .put("BIGINT", UnifiedMappingProvider.Type.LONG) + .put("BIGSERIAL", UnifiedMappingProvider.Type.LONG) + .put("BIT", UnifiedMappingProvider.Type.BYTES) + .put("BIT VARYING", UnifiedMappingProvider.Type.BYTES) + .put("BOOL", UnifiedMappingProvider.Type.BOOLEAN) + .put("BOOLEAN", UnifiedMappingProvider.Type.BOOLEAN) + .put("BYTEA", UnifiedMappingProvider.Type.BYTES) + // TODO(thiagotnunes): Refine mapping type according to + // https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a limit + // for length we should use varchar instead) + .put("CHAR", UnifiedMappingProvider.Type.STRING) + // TODO(thiagotnunes): Refine mapping type according to + // https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a limit + // for length we should use varchar instead) + .put("CHARACTER", UnifiedMappingProvider.Type.STRING) // TODO(thiagotnunes): Refine mapping type according to // https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a limit // for length we should use varchar instead) .put("CHARACTER VARYING", UnifiedMappingProvider.Type.STRING) + .put("CITEXT", UnifiedMappingProvider.Type.STRING) .put("DATE", UnifiedMappingProvider.Type.DATE) + // TODO(thiagotnunes): Refine mapping type according to + // https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a + // precision and scale are >= 0, map to DECIMAL) + .put("DECIMAL", UnifiedMappingProvider.Type.NUMBER) + .put("DOUBLE PRECISION", UnifiedMappingProvider.Type.DOUBLE) + .put("FLOAT4", UnifiedMappingProvider.Type.FLOAT) + .put("FLOAT8", UnifiedMappingProvider.Type.DOUBLE) + .put("INT", UnifiedMappingProvider.Type.INTEGER) + .put("INTEGER", UnifiedMappingProvider.Type.INTEGER) + .put("INT2", UnifiedMappingProvider.Type.INTEGER) + .put("INT4", UnifiedMappingProvider.Type.INTEGER) .put("INT8", UnifiedMappingProvider.Type.LONG) + .put("JSON", UnifiedMappingProvider.Type.JSON) + .put("JSONB", UnifiedMappingProvider.Type.JSON) + .put("MONEY", UnifiedMappingProvider.Type.DOUBLE) + // https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a + // precision and scale are >= 0, map to DECIMAL) + .put("NUMERIC", UnifiedMappingProvider.Type.NUMBER) + .put("OID", UnifiedMappingProvider.Type.LONG) + .put("REAL", UnifiedMappingProvider.Type.FLOAT) + .put("SERIAL", UnifiedMappingProvider.Type.INTEGER) + .put("SERIAL2", UnifiedMappingProvider.Type.INTEGER) + .put("SERIAL4", UnifiedMappingProvider.Type.INTEGER) + .put("SERIAL8", UnifiedMappingProvider.Type.LONG) + .put("SMALLINT", UnifiedMappingProvider.Type.INTEGER) + .put("SMALLSERIAL", UnifiedMappingProvider.Type.INTEGER) // TODO(thiagotnunes): Refine mapping type according to // https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a limit // for length we should use varchar instead) .put("TEXT", UnifiedMappingProvider.Type.STRING) + .put("TIMESTAMP", UnifiedMappingProvider.Type.TIMESTAMP) + .put("TIMESTAMPTZ", UnifiedMappingProvider.Type.TIMESTAMP_WITH_TIME_ZONE) + .put("TIMESTAMP WITH TIME ZONE", UnifiedMappingProvider.Type.TIMESTAMP_WITH_TIME_ZONE) + .put("TIMESTAMP WITHOUT TIME ZONE", UnifiedMappingProvider.Type.TIMESTAMP) + .put("UUID", UnifiedMappingProvider.Type.STRING) + .put("VARBIT", UnifiedMappingProvider.Type.BYTES) // TODO(thiagotnunes): Refine mapping type according to // https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a limit // for length we should use varchar instead) .put("VARCHAR", UnifiedMappingProvider.Type.STRING) + .put("UNSUPPORTED", UnifiedMappingProvider.Type.UNSUPPORTED) .build() .entrySet() .stream() diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/CustomAsserts.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/CustomAsserts.java new file mode 100644 index 0000000000..350c824a76 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/CustomAsserts.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; + +public class CustomAsserts { + + private static final double DELTA = 0.0001f; + + private CustomAsserts() {} + + public static void assertColumnEquals(String message, Object expected, Object actual) { + if (expected instanceof byte[] && actual instanceof byte[]) { + assertArrayEquals(message, (byte[]) expected, (byte[]) actual); + } else if (expected instanceof byte[] && actual instanceof ByteBuffer) { + assertArrayEquals(message, (byte[]) expected, ((ByteBuffer) actual).array()); + } else if (expected instanceof Float && actual instanceof Float) { + assertEquals(message, (float) expected, (float) actual, DELTA); + } else if (expected instanceof Double && actual instanceof Double) { + assertEquals(message, (double) expected, (double) actual, DELTA); + } else { + assertEquals(message, expected, actual); + } + } +} diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapperTest.java index 2e18ed5392..d1fe943d38 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapperTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapperTest.java @@ -15,6 +15,7 @@ */ package com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper; +import static com.google.cloud.teleport.v2.source.reader.io.CustomAsserts.assertColumnEquals; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -33,6 +34,7 @@ import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapper.MapperType; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.DateTime; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.TimeStampTz; import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType; import com.google.common.collect.ImmutableList; import java.nio.ByteBuffer; @@ -43,6 +45,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.util.Locale; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecordBuilder; import org.junit.After; @@ -65,6 +68,7 @@ public static void beforeClass() { // and detect the lock faster, we decrease this timeout System.setProperty("derby.locks.waitTimeout", "2"); System.setProperty("derby.stream.error.file", "build/derby.log"); + Locale.setDefault(Locale.US); } @Before @@ -115,7 +119,7 @@ public void testMapMSQLRows() throws SQLException { assertEquals(schemaReference, sourceRow.sourceSchemaReference()); assertEquals(table, sourceRow.tableName()); assertEquals(shardId, sourceRow.shardId()); - assertEquals( + assertColumnEquals( "Failed for column: " + expectedColumn.name + " for value index: " + i, expectedColumn.mappedValue, sourceRow.getPayload().get(i)); @@ -169,7 +173,7 @@ public void testMapPostgreSQLRows() throws SQLException { assertEquals(table, sourceRow.tableName()); assertEquals(shardId, sourceRow.shardId()); - assertEquals( + assertColumnEquals( "Failed for column: " + expectedColumn.name + " for value index: " + i, expectedColumn.mappedValue, sourceRow.getPayload().get(i)); @@ -490,12 +494,84 @@ private ImmutableList postgreSQLColumns() { .sourceColumnType("BIGINT") .mappedValue(12345L) .build()) + .add( + Column.builder() + .derbyColumnType("BIGINT") + .sourceColumnType("BIGSERIAL") + .mappedValue(1L) + .build()) + .add( + Column.builder() + .derbyColumnType("CHAR(8) FOR BIT DATA") + .sourceColumnType("BIT") + .mappedValue(ByteBuffer.allocate(8).putLong(Byte.MAX_VALUE).array()) + .build()) + .add( + Column.builder() + .derbyColumnType("CHAR(16) FOR BIT DATA") + .sourceColumnType("BIT VARYING") + .mappedValue(ByteBuffer.allocate(16).putLong(Short.MAX_VALUE).array()) + .build()) + .add( + Column.builder() + .derbyColumnType("BOOLEAN") + .sourceColumnType("BOOL") + .mappedValue(false) + .build()) + .add( + Column.builder() + .derbyColumnType("BOOLEAN") + .sourceColumnType("BOOLEAN") + .mappedValue(true) + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("BOX") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("CHAR(64) FOR BIT DATA") + .sourceColumnType("BYTEA") + .mappedValue(ByteBuffer.allocate(64).putLong(Long.MAX_VALUE).array()) + .build()) + .add( + Column.builder() + .derbyColumnType("CHAR(1)") + .sourceColumnType("CHAR") + .mappedValue("a") + .build()) + .add( + Column.builder() + .derbyColumnType("CHAR(1)") + .sourceColumnType("CHARACTER") + .mappedValue("b") + .build()) .add( Column.builder() .derbyColumnType("VARCHAR(100)") .sourceColumnType("CHARACTER VARYING", new Long[] {100L}) .mappedValue("character varying value") .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("CIDR") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("CIRCLE") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("CITEXT") + .mappedValue("ci text") + .build()) .add( Column.builder() .derbyColumnType("DATE") @@ -503,24 +579,415 @@ private ImmutableList postgreSQLColumns() { .inputValue(java.sql.Date.valueOf("2024-08-30")) .mappedValue((int) java.sql.Date.valueOf("2024-08-30").toLocalDate().toEpochDay()) .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("DATEMULTIRANGE") + .inputValue("{[0001-01-01, 9999-12-31]}") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("DATERANGE") + .inputValue("[0001-01-01, 9999-12-31]") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("DECIMAL(9,3)") + .sourceColumnType("DECIMAL", new Long[] {9L, 3L}) + .inputValue(123456.789) + .mappedValue("123456.789") + .build()) + .add( + Column.builder() + .derbyColumnType("DOUBLE") + .sourceColumnType("DOUBLE PRECISION") + .mappedValue(1.2345D) + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("ENUM") + .inputValue("ENUM VALUE") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("DOUBLE") + .sourceColumnType("FLOAT4") + .mappedValue(1.23F) + .build()) + .add( + Column.builder() + .derbyColumnType("DOUBLE") + .sourceColumnType("FLOAT8") + .mappedValue(1.2345D) + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("INET") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("INTEGER") + .sourceColumnType("INT") + .mappedValue(Integer.MAX_VALUE) + .build()) + .add( + Column.builder() + .derbyColumnType("INTEGER") + .sourceColumnType("INTEGER") + .mappedValue(Integer.MAX_VALUE) + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("INTERVAL") + .inputValue("1 hour") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("SMALLINT") + .sourceColumnType("INT2") + .inputValue(Short.MAX_VALUE) + .mappedValue((int) Short.MAX_VALUE) + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("INT4MULTIRANGE") + .inputValue("{[10, 20]}") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("INT4RANGE") + .inputValue("[10, 20]") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("INTEGER") + .sourceColumnType("INT4") + .mappedValue(Integer.MAX_VALUE) + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("INT8MULTIRANGE") + .inputValue("{[30, 40]}") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("INT8RANGE") + .inputValue("[30, 40]") + .mappedValue(null) // Unsupported + .build()) .add( Column.builder() .derbyColumnType("BIGINT") .sourceColumnType("INT8") .mappedValue(Long.MAX_VALUE) .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("JSON") + .mappedValue("{\"key\":\"value\"}") + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("JSONB") + .mappedValue("{\"key\":\"value\"}") + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("LINE") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("LSEG") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("MACADDR") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("MACADDR8") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("MONEY") + .inputValue("1.23") + .mappedValue(1.23D) + .build()) + .add( + Column.builder() + .derbyColumnType("DECIMAL(9,3)") + .sourceColumnType("NUMERIC", new Long[] {9L, 3L}) + .inputValue(123456.789) + .mappedValue("123456.789") + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("NUMMULTIRANGE") + .inputValue("{[50, 60]}") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("NUMRANGE") + .inputValue("[50, 60]") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("BIGINT") + .sourceColumnType("OID") + .mappedValue(1000L) + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("PATH") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("PG_LSN") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("PG_SNAPSHOT") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("POINT") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("POLYGON") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("DOUBLE") + .sourceColumnType("REAL") + .mappedValue(1.23F) + .build()) + .add( + Column.builder() + .derbyColumnType("INTEGER") + .sourceColumnType("SERIAL") + .mappedValue(Integer.MAX_VALUE) + .build()) + .add( + Column.builder() + .derbyColumnType("SMALLINT") + .sourceColumnType("SERIAL2") + .inputValue(Short.MAX_VALUE) + .mappedValue((int) Short.MAX_VALUE) + .build()) + .add( + Column.builder() + .derbyColumnType("INTEGER") + .sourceColumnType("SERIAL4") + .mappedValue(Integer.MAX_VALUE) + .build()) + .add( + Column.builder() + .derbyColumnType("BIGINT") + .sourceColumnType("SERIAL8") + .mappedValue(Long.MAX_VALUE) + .build()) + .add( + Column.builder() + .derbyColumnType("SMALLINT") + .sourceColumnType("SMALLINT") + .inputValue(Short.MAX_VALUE) + .mappedValue((int) Short.MAX_VALUE) + .build()) + .add( + Column.builder() + .derbyColumnType("SMALLINT") + .sourceColumnType("SMALLSERIAL") + .inputValue(Short.MAX_VALUE) + .mappedValue((int) Short.MAX_VALUE) + .build()) .add( Column.builder() .derbyColumnType("VARCHAR(100)") .sourceColumnType("TEXT", new Long[] {100L}) .mappedValue("text value") .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TIME") + .mappedValue(0L) + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TIME WITHOUT TIME ZONE") + .inputValue("01:02:03") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TIMETZ") + .inputValue("01:02:03-05") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("TIMESTAMP") + .sourceColumnType("TIMESTAMP") + .inputValue(Timestamp.valueOf("1970-01-02 01:02:03.123456")) + .mappedValue(90123123456L) + .build()) + .add( + Column.builder() + .derbyColumnType("TIMESTAMP") + .sourceColumnType("TIMESTAMP WITHOUT TIME ZONE") + .inputValue(Timestamp.valueOf("1970-01-02 01:02:03")) + .mappedValue(90123000000L) + .build()) + .add( + Column.builder() + .name("timestamptz_hour_offset") + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TIMESTAMPTZ") + .inputValue("1970-01-02 01:02:03.123456-05") + .mappedValue( + new GenericRecordBuilder(TimeStampTz.SCHEMA) + .set(TimeStampTz.TIMESTAMP_FIELD_NAME, 108123123456L) + .set(TimeStampTz.OFFSET_FIELD_NAME, -18000000L) + .build()) + .build()) + .add( + Column.builder() + .name("timestamptz_hour_minute_offset") + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TIMESTAMPTZ") + .inputValue("1970-01-02 01:02:03.123456-05:00") + .mappedValue( + new GenericRecordBuilder(TimeStampTz.SCHEMA) + .set(TimeStampTz.TIMESTAMP_FIELD_NAME, 108123123456L) + .set(TimeStampTz.OFFSET_FIELD_NAME, -18000000L) + .build()) + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TIMESTAMP WITH TIME ZONE") + .inputValue("1970-01-02 01:02:03.12345+01:00") + .mappedValue( + new GenericRecordBuilder(TimeStampTz.SCHEMA) + .set(TimeStampTz.TIMESTAMP_FIELD_NAME, 86523123450L) + .set(TimeStampTz.OFFSET_FIELD_NAME, 3600000L) + .build()) + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TSMULTIRANGE") + .inputValue("{[1970-01-01 01:00, 1970-01-01 02:00]}") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TSRANGE") + .inputValue("[1970-01-01 01:00, 1970-01-01 02:00]") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TSTZMULTIRANGE") + .inputValue("{[1970-01-01 01:00+10:00, 1970-01-01 02:00+10:00]}") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TSTZRANGE") + .inputValue("[1970-01-01 01:00+10:00, 1970-01-01 02:00+10:00]") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TSQUERY") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TSVECTOR") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("TXID_SNAPSHOT") + .mappedValue(null) // Unsupported + .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("UUID") + .mappedValue("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11") + .build()) + .add( + Column.builder() + .derbyColumnType("CHAR(32) FOR BIT DATA") + .sourceColumnType("VARBIT") + .mappedValue(ByteBuffer.allocate(32).putLong(Integer.MAX_VALUE).array()) + .build()) .add( Column.builder() .derbyColumnType("VARCHAR(100)") .sourceColumnType("VARCHAR", new Long[] {100L}) .mappedValue("varchar value") .build()) + .add( + Column.builder() + .derbyColumnType("VARCHAR(100)") + .sourceColumnType("XML") + .mappedValue(null) // Unsupported + .build()) .build(); } @@ -626,9 +1093,6 @@ Column build() { if (sourceColumnType == null) { throw new IllegalStateException("Source column type must be defined for column"); } - if (mappedValue == null) { - throw new IllegalStateException("Mapped value must be defined for column"); - } if (name == null) { name = String.join("_", sourceColumnType.getName().toLowerCase().split(" ")); name += "_col"; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/PostgreSQLMappingProviderTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/PostgreSQLMappingProviderTest.java index a1be2dba3e..f3172e176f 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/PostgreSQLMappingProviderTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/PostgreSQLMappingProviderTest.java @@ -44,11 +44,53 @@ public void testPostgreSQLMappingProvider() { private ImmutableMap expectedMapping() { return ImmutableMap.builder() .put("BIGINT", "\"long\"") + .put("BIGSERIAL", "\"long\"") + .put("BIT", "\"bytes\"") + .put("BIT VARYING", "\"bytes\"") + .put("BOOL", "\"boolean\"") + .put("BOOLEAN", "\"boolean\"") + .put("BYTEA", "\"bytes\"") + .put("CHAR", "\"string\"") + .put("CHARACTER", "\"string\"") .put("CHARACTER VARYING", "\"string\"") + .put("CITEXT", "\"string\"") .put("DATE", "{\"type\":\"int\",\"logicalType\":\"date\"}") + .put("DECIMAL", "{\"type\":\"string\",\"logicalType\":\"number\"}") + .put("DOUBLE PRECISION", "\"double\"") + .put("FLOAT4", "\"float\"") + .put("FLOAT8", "\"double\"") + .put("INT", "\"int\"") + .put("INTEGER", "\"int\"") + .put("INT2", "\"int\"") + .put("INT4", "\"int\"") .put("INT8", "\"long\"") + .put("JSON", "{\"type\":\"string\",\"logicalType\":\"json\"}") + .put("JSONB", "{\"type\":\"string\",\"logicalType\":\"json\"}") + .put("MONEY", "\"double\"") + .put("NUMERIC", "{\"type\":\"string\",\"logicalType\":\"number\"}") + .put("OID", "\"long\"") + .put("REAL", "\"float\"") + .put("SERIAL", "\"int\"") + .put("SERIAL2", "\"int\"") + .put("SERIAL4", "\"int\"") + .put("SERIAL8", "\"long\"") + .put("SMALLINT", "\"int\"") + .put("SMALLSERIAL", "\"int\"") .put("TEXT", "\"string\"") + .put("TIMESTAMP", "{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}") + .put( + "TIMESTAMPTZ", + "{\"type\":\"record\",\"name\":\"timestampTz\",\"fields\":[{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},{\"name\":\"offset\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}}]}") + .put( + "TIMESTAMP WITH TIME ZONE", + "{\"type\":\"record\",\"name\":\"timestampTz\",\"fields\":[{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},{\"name\":\"offset\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}}]}") + .put( + "TIMESTAMP WITHOUT TIME ZONE", + "{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}") + .put("UUID", "\"string\"") + .put("VARBIT", "\"bytes\"") .put("VARCHAR", "\"string\"") + .put("UNSUPPORTED", "{\"type\":\"null\",\"logicalType\":\"unsupported\"}") .build() .entrySet() .stream() diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataTypesIt.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataTypesIt.java index c3a158ba8e..77593fc0b8 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataTypesIt.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataTypesIt.java @@ -55,9 +55,9 @@ public class DataTypesIt extends SourceDbToSpannerITBase { public static MySQLResourceManager mySQLResourceManager; public static SpannerResourceManager spannerResourceManager; - private static final String MYSQL_DUMP_FILE_RESOURCE = "DataTypesIt/data-types.sql"; + private static final String MYSQL_DUMP_FILE_RESOURCE = "DataTypesIt/mysql/data-types.sql"; - private static final String SPANNER_DDL_RESOURCE = "DataTypesIt/spanner-schema.sql"; + private static final String SPANNER_DDL_RESOURCE = "DataTypesIt/mysql/spanner-schema.sql"; /** * Setup resource managers and Launch dataflow job once during the execution of this test class. \ diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java index bef4cd9c89..2120ea8703 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java @@ -189,7 +189,11 @@ protected PipelineLauncher.LaunchInfo launchDataflowJob( options.setParameters(params); options.addEnvironment("additionalExperiments", List.of("disable_runner_v2")); - options.addEnvironment("numWorkers", 2); + if (System.getProperty("numWorkers") != null) { + options.addEnvironment("numWorkers", Integer.parseInt(System.getProperty("numWorkers"))); + } else { + options.addEnvironment("numWorkers", 2); + } // Run PipelineLauncher.LaunchInfo jobInfo = launchTemplate(options, false); assertThatPipeline(jobInfo).isRunning(); diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/DataTypesIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/DataTypesIT.java new file mode 100644 index 0000000000..c3e995332a --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/DataTypesIT.java @@ -0,0 +1,223 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.postgresql; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; + +import com.google.cloud.spanner.Struct; +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.cloud.teleport.v2.templates.SourceDbToSpanner; +import com.google.cloud.teleport.v2.templates.SourceDbToSpannerITBase; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.spanner.SpannerResourceManager; +import org.apache.beam.it.gcp.spanner.matchers.SpannerAsserts; +import org.apache.beam.it.jdbc.PostgresResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An integration test for {@link SourceDbToSpanner} Flex template which tests all PostgreSQL data + * types migration. + */ +@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) +@TemplateIntegrationTest(SourceDbToSpanner.class) +@RunWith(JUnit4.class) +public class DataTypesIT extends SourceDbToSpannerITBase { + private static final Logger LOG = LoggerFactory.getLogger(DataTypesIT.class); + + public static PostgresResourceManager postgreSQLResourceManager; + public static SpannerResourceManager spannerResourceManager; + + private static final String POSTGRESQL_DDL_RESOURCE = "DataTypesIt/postgresql/data-types.sql"; + private static final String SPANNER_DDL_RESOURCE = "DataTypesIt/postgresql/spanner-schema.sql"; + + /** Setup resource managers. */ + @Before + public void setUp() { + postgreSQLResourceManager = setUpPostgreSQLResourceManager(); + spannerResourceManager = setUpSpannerResourceManager(); + } + + /** Cleanup dataflow job, all the resources, and resource managers. */ + @After + public void tearDown() { + ResourceManagerUtils.cleanResources(spannerResourceManager, postgreSQLResourceManager); + } + + @Test + public void allTypesTest() throws Exception { + loadSQLFileResource(postgreSQLResourceManager, POSTGRESQL_DDL_RESOURCE); + createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE); + System.setProperty("numWorkers", "20"); + Map jobParameters = new HashMap<>(); + jobParameters.put("numPartitions", "100"); + PipelineLauncher.LaunchInfo jobInfo = + launchDataflowJob( + getClass().getSimpleName(), + null, + null, + postgreSQLResourceManager, + spannerResourceManager, + jobParameters, + null); + PipelineOperator.Result result = pipelineOperator().waitUntilDone(createConfig(jobInfo)); + assertThatResult(result).isLaunchFinished(); + + Map>> expectedData = getExpectedData(); + for (Map.Entry>> entry : expectedData.entrySet()) { + String type = entry.getKey(); + String tableName = String.format("t_%s", type); + LOG.info("Asserting type:{}", type); + + List rows = spannerResourceManager.readTableRecords(tableName, "id", "col"); + for (Struct row : rows) { + LOG.info("Found row: {}", row); + } + SpannerAsserts.assertThatStructs(rows) + .hasRecordsUnorderedCaseInsensitiveColumns(entry.getValue()); + } + + // Validate unsupported types. + List unsupportedTypeTables = + List.of( + "t_box", + "t_cidr", + "t_circle", + "t_datemultirange", + "t_daterange", + "t_enum", + "t_inet", + "t_int4multirange", + "t_int4range", + "t_int8multirange", + "t_int8range", + "t_interval", + "t_line", + "t_lseg", + "t_macaddr", + "t_macaddr8", + "t_nummultirange", + "t_numrange", + "t_path", + "t_pg_lsn", + "t_pg_snapshot", + "t_point", + "t_polygon", + "t_time", + "t_time_with_time_zone", + "t_time_without_time_zone", + "t_timetz", + "t_tsmultirange", + "t_tsquery", + "t_tsrange", + "t_tstzmultirange", + "t_tstzrange", + "t_tsvector", + "t_txid_snapshot", + "t_xml"); + + for (String table : unsupportedTypeTables) { + // Unsupported rows should still be migrated. Each source table has 2 rows. + assertThat(spannerResourceManager.getRowCount(table)).isEqualTo(2L); + } + } + + private Map>> getExpectedData() { + HashMap>> result = new HashMap<>(); + result.put("bigint", createRows("-9223372036854775808", "9223372036854775807", "42", "NULL")); + result.put("bigserial", createRows("-9223372036854775808", "9223372036854775807", "42")); + result.put("bit", createRows("MA==", "MQ==", "NULL")); + result.put("bit_varying", createRows("MDEwMQ==", "NULL")); + result.put("bool", createRows("false", "true", "NULL")); + result.put("boolean", createRows("false", "true", "NULL")); + result.put("bytea", createRows("YWJj", "NULL")); + result.put("char", createRows("a", "Θ", "NULL")); + result.put("character", createRows("a", "Ξ", "NULL")); + result.put("character_varying", createRows("testing character varying", "NULL")); + result.put("date", createRows("0001-01-01", "9999-12-31", "NULL")); + result.put("decimal", createRows("0.12", "NULL")); + result.put( + "double_precision", + createRows( + "-1.9876542E307", "1.9876542E307", "NaN", "-Infinity", "Infinity", "1.23", "NULL")); + result.put( + "float4", + createRows( + "-1.9876542E38", "1.9876542E38", "NaN", "-Infinity", "Infinity", "2.34", "NULL")); + result.put( + "float8", + createRows( + "-1.9876542E307", "1.9876542E307", "NaN", "-Infinity", "Infinity", "3.45", "NULL")); + result.put("int", createRows("-2147483648", "2147483647", "1", "NULL")); + result.put("integer", createRows("-2147483648", "2147483647", "2", "NULL")); + result.put("int2", createRows("-32768", "32767", "3", "NULL")); + result.put("int4", createRows("-2147483648", "2147483647", "4", "NULL")); + result.put("int8", createRows("-9223372036854775808", "9223372036854775807", "5", "NULL")); + result.put("json", createRows("{\"duplicate_key\":1}", "{\"null_key\":null}", "NULL")); + result.put("jsonb", createRows("{\"duplicate_key\":2}", "{\"null_key\":null}", "NULL")); + result.put("money", createRows("123.45", "NULL")); + result.put("numeric", createRows("4.56", "NULL")); + result.put("oid", createRows("1000", "NULL")); + result.put( + "real", + createRows( + "-1.9876542E38", "1.9876542E38", "NaN", "-Infinity", "Infinity", "5.67", "NULL")); + result.put("serial", createRows("-2147483648", "2147483647", "6")); + result.put("serial2", createRows("-32768", "32767", "7")); + result.put("serial4", createRows("-2147483648", "2147483647", "8")); + result.put("serial8", createRows("-9223372036854775808", "9223372036854775807", "9")); + result.put("smallint", createRows("-32768", "32767", "10", "NULL")); + result.put("smallserial", createRows("-32768", "32767", "11")); + result.put("text", createRows("testing text", "NULL")); + result.put("timestamp", createRows("1970-01-02T03:04:05.123456Z", "NULL")); + result.put( + "timestamptz", + createRows("1970-02-02T18:05:06.123456000Z", "1970-02-03T05:05:06.123456000Z", "NULL")); + result.put( + "timestamp_with_time_zone", + createRows("1970-02-02T18:05:06.123456000Z", "1970-02-03T05:05:06.123456000Z", "NULL")); + result.put("timestamp_without_time_zone", createRows("1970-01-02T03:04:05.123456Z", "NULL")); + result.put("uuid", createRows("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", "NULL")); + result.put("varbit", createRows("MTEwMA==", "NULL")); + result.put("varchar", createRows("testing varchar", "NULL")); + return result; + } + + private List> createRows(Object... values) { + List> rows = new ArrayList<>(); + for (int i = 0; i < values.length; i++) { + Map row = new HashMap<>(); + row.put("id", i + 1); + row.put("col", values[i]); + rows.add(row); + } + return rows; + } +} diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/data-types.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/mysql/data-types.sql similarity index 100% rename from v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/data-types.sql rename to v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/mysql/data-types.sql diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/spanner-schema.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/mysql/spanner-schema.sql similarity index 100% rename from v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/spanner-schema.sql rename to v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/mysql/spanner-schema.sql diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/postgresql/data-types.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/postgresql/data-types.sql new file mode 100644 index 0000000000..b5d8183a4f --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/postgresql/data-types.sql @@ -0,0 +1,153 @@ +CREATE TYPE myenum AS ENUM ('enum1', 'enum2', 'enum3'); + +CREATE TABLE t_bigint (id serial primary key, col bigint); +CREATE TABLE t_bigserial (id serial primary key, col bigserial); +CREATE TABLE t_bit (id serial primary key, col bit); +CREATE TABLE t_bit_varying (id serial primary key, col bit varying); +CREATE TABLE t_bool (id serial primary key, col bool); +CREATE TABLE t_boolean (id serial primary key, col boolean); +CREATE TABLE t_box (id serial primary key, col box); +CREATE TABLE t_bytea (id serial primary key, col bytea); +CREATE TABLE t_char (id serial primary key, col char); +CREATE TABLE t_character (id serial primary key, col character); +CREATE TABLE t_character_varying (id serial primary key, col character varying); +CREATE TABLE t_cidr (id serial primary key, col cidr); +CREATE TABLE t_circle (id serial primary key, col circle); +CREATE TABLE t_date (id serial primary key, col date); +CREATE TABLE t_datemultirange (id serial primary key, col datemultirange); +CREATE TABLE t_daterange (id serial primary key, col daterange); +CREATE TABLE t_decimal (id serial primary key, col numeric(10,2)); +CREATE TABLE t_double_precision (id serial primary key, col double precision); +CREATE TABLE t_enum (id serial primary key, col myenum); +CREATE TABLE t_float4 (id serial primary key, col float4); +CREATE TABLE t_float8 (id serial primary key, col float8); +CREATE TABLE t_inet (id serial primary key, col inet); +CREATE TABLE t_int (id serial primary key, col int); +CREATE TABLE t_int2 (id serial primary key, col int2); +CREATE TABLE t_int4 (id serial primary key, col int4); +CREATE TABLE t_int4multirange (id serial primary key, col int4multirange); +CREATE TABLE t_int4range (id serial primary key, col int4range); +CREATE TABLE t_int8 (id serial primary key, col int8); +CREATE TABLE t_int8multirange (id serial primary key, col int8multirange); +CREATE TABLE t_int8range (id serial primary key, col int8range); +CREATE TABLE t_integer (id serial primary key, col integer); +CREATE TABLE t_interval (id serial primary key, col interval); +CREATE TABLE t_json (id serial primary key, col json); +CREATE TABLE t_jsonb (id serial primary key, col jsonb); +CREATE TABLE t_line (id serial primary key, col line); +CREATE TABLE t_lseg (id serial primary key, col lseg); +CREATE TABLE t_macaddr (id serial primary key, col macaddr); +CREATE TABLE t_macaddr8 (id serial primary key, col macaddr8); +CREATE TABLE t_money (id serial primary key, col money); +CREATE TABLE t_numeric (id serial primary key, col numeric(10,2)); +CREATE TABLE t_nummultirange (id serial primary key, col nummultirange); +CREATE TABLE t_numrange (id serial primary key, col numrange); +CREATE TABLE t_oid (id serial primary key, col oid); +CREATE TABLE t_path (id serial primary key, col path); +CREATE TABLE t_pg_lsn (id serial primary key, col pg_lsn); +CREATE TABLE t_pg_snapshot (id serial primary key, col pg_snapshot); +CREATE TABLE t_point (id serial primary key, col point); +CREATE TABLE t_polygon (id serial primary key, col polygon); +CREATE TABLE t_real (id serial primary key, col real); +CREATE TABLE t_serial (id serial primary key, col serial); +CREATE TABLE t_serial2 (id serial primary key, col serial2); +CREATE TABLE t_serial4 (id serial primary key, col serial4); +CREATE TABLE t_serial8 (id serial primary key, col serial8); +CREATE TABLE t_smallint (id serial primary key, col smallint); +CREATE TABLE t_smallserial (id serial primary key, col smallserial); +CREATE TABLE t_text (id serial primary key, col text); +CREATE TABLE t_time (id serial primary key, col time); +CREATE TABLE t_time_with_time_zone (id serial primary key, col time with time zone); +CREATE TABLE t_time_without_time_zone (id serial primary key, col time without time zone); +CREATE TABLE t_timestamp (id serial primary key, col timestamp); +CREATE TABLE t_timestamp_with_time_zone (id serial primary key, col timestamp with time zone); +CREATE TABLE t_timestamp_without_time_zone (id serial primary key, col timestamp without time zone); +CREATE TABLE t_timestamptz (id serial primary key, col timestamptz); +CREATE TABLE t_timetz (id serial primary key, col timetz); +CREATE TABLE t_tsmultirange (id serial primary key, col tsmultirange); +CREATE TABLE t_tsquery (id serial primary key, col tsquery); +CREATE TABLE t_tsrange (id serial primary key, col tsrange); +CREATE TABLE t_tstzmultirange (id serial primary key, col tstzmultirange); +CREATE TABLE t_tstzrange (id serial primary key, col tstzrange); +CREATE TABLE t_tsvector (id serial primary key, col tsvector); +CREATE TABLE t_txid_snapshot (id serial primary key, col txid_snapshot); +CREATE TABLE t_uuid (id serial primary key, col uuid); +CREATE TABLE t_varbit (id serial primary key, col varbit); +CREATE TABLE t_varchar (id serial primary key, col varchar); +CREATE TABLE t_xml (id serial primary key, col xml); + +INSERT INTO t_bigint (col) VALUES (-9223372036854775808), (9223372036854775807), (42), (NULL); +INSERT INTO t_bigserial (col) VALUES (-9223372036854775808), (9223372036854775807), (42); +INSERT INTO t_bit (col) VALUES (0::bit), (1::bit), (NULL); +INSERT INTO t_bit_varying (col) VALUES ('0101'::bit varying), (NULL); +INSERT INTO t_bool (col) VALUES (false), (true), (NULL); +INSERT INTO t_boolean (col) VALUES (false), (true), (NULL); +INSERT INTO t_box (col) VALUES ('((1, 2), (3, 4))'), (NULL); +INSERT INTO t_bytea (col) VALUES ('abc'::bytea), (NULL); +INSERT INTO t_char (col) VALUES ('a'), ('Θ'), (NULL); +INSERT INTO t_character (col) VALUES ('a'), ('Ξ'), (NULL); +INSERT INTO t_character_varying (col) VALUES ('testing character varying'), (NULL); +INSERT INTO t_cidr (col) VALUES ('192.168.100.128/25'), (NULL); +INSERT INTO t_circle (col) VALUES ('((1, 2), 3)'), (NULL); +INSERT INTO t_date (col) VALUES ('0001-01-01'::date), ('9999-12-31'::date), (NULL); +INSERT INTO t_datemultirange (col) VALUES ('{[0001-01-01, 9999-12-31]}'), (NULL); +INSERT INTO t_daterange (col) VALUES ('[0001-01-01, 9999-12-31]'), (NULL); +INSERT INTO t_decimal (col) VALUES (0.12), (NULL); +INSERT INTO t_double_precision (col) VALUES ('-1.9876542e307'), ('1.9876542e307'), ('NaN'), ('-Infinity'), ('Infinity'), (1.23), (NULL); +INSERT INTO t_enum (col) VALUES ('enum1'), (NULL); +INSERT INTO t_float4 (col) VALUES ('-1.9876542e38'), ('1.9876542e38'), ('NaN'), ('-Infinity'), ('Infinity'), (2.34), (NULL); +INSERT INTO t_float8 (col) VALUES ('-1.9876542e307'), ('1.9876542e307'), ('NaN'), ('-Infinity'), ('Infinity'), (3.45), (NULL); +INSERT INTO t_inet (col) VALUES ('192.168.1.0/24'), (NULL); +INSERT INTO t_int (col) VALUES (-2147483648), (2147483647), (1), (NULL); +INSERT INTO t_int2 (col) VALUES (-32768), (32767), (3), (NULL); +INSERT INTO t_int4 (col) VALUES (-2147483648), (2147483647), (4), (NULL); +INSERT INTO t_int4multirange (col) VALUES ('{[10, 20]}'), (NULL); +INSERT INTO t_int4range (col) VALUES ('[10, 20]'), (NULL); +INSERT INTO t_int8 (col) VALUES (-9223372036854775808), (9223372036854775807), (5), (NULL); +INSERT INTO t_int8multirange (col) VALUES ('{[30, 40]}'), (NULL); +INSERT INTO t_int8range (col) VALUES ('[30, 40]'), (NULL); +INSERT INTO t_integer (col) VALUES (-2147483648), (2147483647), (2), (NULL); +INSERT INTO t_interval (col) VALUES ('1 hour'), (NULL); +INSERT INTO t_json (col) VALUES ('{"duplicate_key": 1, "duplicate_key": 2}'), ('{"null_key": null}'), (NULL); +INSERT INTO t_jsonb (col) VALUES ('{"duplicate_key": 1, "duplicate_key": 2}'), ('{"null_key": null}'), (NULL); +INSERT INTO t_line (col) VALUES ('{ 1, 2, 3 }'), (NULL); +INSERT INTO t_lseg (col) VALUES ('[ (1, 2), (3, 4) ]'), (NULL); +INSERT INTO t_macaddr (col) VALUES ('08:00:2b:01:02:03'), (NULL); +INSERT INTO t_macaddr8 (col) VALUES ('08:00:2b:01:02:03:04:05'), (NULL); +INSERT INTO t_money (col) VALUES ('123.45'::money), (NULL); +INSERT INTO t_numeric (col) VALUES (4.56), (NULL); +INSERT INTO t_nummultirange (col) VALUES ('{[50, 60]}'), (NULL); +INSERT INTO t_numrange (col) VALUES ('[50, 60]'), (NULL); +INSERT INTO t_oid (col) VALUES (1000::oid), (NULL); +INSERT INTO t_path (col) VALUES ('[ (1, 2), (3, 4), (5, 6) ]'), (NULL); +INSERT INTO t_pg_lsn (col) VALUES ('123/0'::pg_lsn), (NULL); +INSERT INTO t_pg_snapshot (col) VALUES ('1000:1000:'::pg_snapshot), (NULL); +INSERT INTO t_point (col) VALUES ('(1, 2)'), (NULL); +INSERT INTO t_polygon (col) VALUES ('( (1, 2), (3, 4) )'), (NULL); +INSERT INTO t_real (col) VALUES ('-1.9876542e38'), ('1.9876542e38'), ('NaN'), ('-Infinity'), ('Infinity'), (5.67), (NULL); +INSERT INTO t_serial (col) VALUES (-2147483648), (2147483647), (6); +INSERT INTO t_serial2 (col) VALUES (-32768), (32767), (7); +INSERT INTO t_serial4 (col) VALUES (-2147483648), (2147483647), (8); +INSERT INTO t_serial8 (col) VALUES (-9223372036854775808), (9223372036854775807), (9); +INSERT INTO t_smallint (col) VALUES (-32768), (32767), (10), (NULL); +INSERT INTO t_smallserial (col) VALUES (-32768), (32767), (11); +INSERT INTO t_text (col) VALUES ('testing text'), (NULL); +INSERT INTO t_time (col) VALUES ('24:00:00'::time), (NULL); +INSERT INTO t_time_with_time_zone (col) VALUES ('23:59:59+10:00'), (NULL); +INSERT INTO t_time_without_time_zone (col) VALUES ('24:00:00'::time), (NULL); +INSERT INTO t_timestamp (col) VALUES ('1970-01-02 03:04:05.123456'::timestamp), (NULL); +INSERT INTO t_timestamp_with_time_zone (col) VALUES ('1970-02-03 04:05:06.123456+10:00'::timestamptz), ('1970-02-03 04:05:06.123456-01'::timestamptz), (NULL); +INSERT INTO t_timestamp_without_time_zone (col) VALUES ('1970-01-02 03:04:05.123456'::timestamp), (NULL); +INSERT INTO t_timestamptz (col) VALUES ('1970-02-03 04:05:06.123456+10:00'::timestamptz), ('1970-02-03 04:05:06.123456-01'::timestamptz), (NULL); +INSERT INTO t_timetz (col) VALUES ('23:59:59+10:00'), (NULL); +INSERT INTO t_tsmultirange (col) VALUES ('{[1970-01-01 01:00, 1970-01-01 02:00]}'), (NULL); +INSERT INTO t_tsquery (col) VALUES ('fat & rat'::tsquery), (NULL); +INSERT INTO t_tsrange (col) VALUES ('[1970-01-01 01:00, 1970-01-01 02:00]'), (NULL); +INSERT INTO t_tstzmultirange (col) VALUES ('{[1970-01-01 01:00+10:00, 1970-01-01 02:00+10:00]}'), (NULL); +INSERT INTO t_tstzrange (col) VALUES ('[1970-01-01 01:00+10:00, 1970-01-01 02:00+10:00]'), (NULL); +INSERT INTO t_tsvector (col) VALUES ('a fat cat sat on a mat'::tsvector), (NULL); +INSERT INTO t_txid_snapshot (col) VALUES ('10:20:10,14,15'::txid_snapshot), (NULL); +INSERT INTO t_uuid (col) VALUES ('a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid), (NULL); +INSERT INTO t_varbit (col) VALUES ('1100'::varbit), (NULL); +INSERT INTO t_varchar (col) VALUES ('testing varchar'), (NULL); +INSERT INTO t_xml (col) VALUES ('123'::xml), (NULL); diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/postgresql/spanner-schema.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/postgresql/spanner-schema.sql new file mode 100644 index 0000000000..2111c20cfe --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIt/postgresql/spanner-schema.sql @@ -0,0 +1,75 @@ +CREATE TABLE t_bigint (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_bigserial (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_bit (id INT64, col BYTES(MAX)) PRIMARY KEY (id); +CREATE TABLE t_bit_varying (id INT64, col BYTES(MAX)) PRIMARY KEY (id); +CREATE TABLE t_bool (id INT64, col BOOL) PRIMARY KEY (id); +CREATE TABLE t_boolean (id INT64, col BOOL) PRIMARY KEY (id); +CREATE TABLE t_box (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_bytea (id INT64, col BYTES(MAX)) PRIMARY KEY (id); +CREATE TABLE t_char (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_character (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_character_varying (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_cidr (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_circle (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_date (id INT64, col DATE) PRIMARY KEY (id); +CREATE TABLE t_datemultirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_daterange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_decimal (id INT64, col NUMERIC) PRIMARY KEY (id); +CREATE TABLE t_double_precision (id INT64, col FLOAT64) PRIMARY KEY (id); +CREATE TABLE t_enum (id INT64 , col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_float4 (id INT64, col FLOAT64) PRIMARY KEY (id); +CREATE TABLE t_float8 (id INT64, col FLOAT64) PRIMARY KEY (id); +CREATE TABLE t_inet (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_int (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_int2 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_int4 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_int4multirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_int4range (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_int8 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_int8multirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_int8range (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_integer (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_interval (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_json (id INT64, col JSON) PRIMARY KEY (id); +CREATE TABLE t_jsonb (id INT64, col JSON) PRIMARY KEY (id); +CREATE TABLE t_line (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_lseg (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_macaddr (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_macaddr8 (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_money (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_numeric (id INT64, col NUMERIC) PRIMARY KEY (id); +CREATE TABLE t_nummultirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_numrange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_oid (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_path (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_pg_lsn (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_pg_snapshot (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_point (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_polygon (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_real (id INT64, col FLOAT64) PRIMARY KEY (id); +CREATE TABLE t_serial (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_serial2 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_serial4 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_serial8 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_smallint (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_smallserial (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE t_text (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_time (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_time_with_time_zone (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_time_without_time_zone (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_timestamp (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_timestamp_with_time_zone (id INT64, col TIMESTAMP) PRIMARY KEY (id); +CREATE TABLE t_timestamp_without_time_zone (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_timestamptz (id INT64, col TIMESTAMP) PRIMARY KEY (id); +CREATE TABLE t_timetz (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_tsmultirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_tsquery (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_tsrange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_tstzmultirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_tstzrange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_tsvector (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_txid_snapshot (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_uuid (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_varbit (id INT64, col BYTES(MAX)) PRIMARY KEY (id); +CREATE TABLE t_varchar (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE t_xml (id INT64, col STRING(MAX)) PRIMARY KEY (id);