Skip to content

Commit 91e6894

Browse files
authored
Enhancement: Add e2e tests for oceanbase catalog. (#22)
1 parent b286b78 commit 91e6894

File tree

5 files changed

+483
-2
lines changed

5 files changed

+483
-2
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2024 OceanBase.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.oceanbase.spark
18+
19+
import com.oceanbase.spark.OceanBaseCatalogE2eITCase.{MYSQL_CONNECTOR_JAVA, SINK_CONNECTOR_NAME}
20+
import com.oceanbase.spark.OceanBaseTestBase.assertEqualsInAnyOrder
21+
import com.oceanbase.spark.utils.SparkContainerTestEnvironment
22+
import com.oceanbase.spark.utils.SparkContainerTestEnvironment.getResource
23+
24+
import org.junit.jupiter.api._
25+
import org.junit.jupiter.api.condition.DisabledIfSystemProperty
26+
import org.junit.jupiter.api.function.ThrowingSupplier
27+
import org.slf4j.LoggerFactory
28+
import org.testcontainers.containers.output.Slf4jLogConsumer
29+
30+
import java.util
31+
32+
class OceanBaseCatalogE2eITCase extends SparkContainerTestEnvironment {
33+
@BeforeEach
34+
@throws[Exception]
35+
override def before(): Unit = {
36+
super.before()
37+
initialize("sql/mysql/products.sql")
38+
}
39+
40+
@AfterEach
41+
@throws[Exception]
42+
override def after(): Unit = {
43+
super.after()
44+
dropTables("products")
45+
}
46+
47+
@Test
48+
@DisabledIfSystemProperty(
49+
named = "spark_version",
50+
matches = "^2\\.4\\.[0-9]$",
51+
disabledReason = "Catalog is only supported starting from Spark3."
52+
)
53+
def testInsertValues(): Unit = {
54+
val sqlLines: util.List[String] = new util.ArrayList[String]
55+
sqlLines.add(s"""
56+
|set spark.sql.catalog.ob=com.oceanbase.spark.catalog.OceanBaseCatalog;
57+
|set spark.sql.catalog.ob.url=$getJdbcUrlInContainer;
58+
|set spark.sql.catalog.ob.username=$getUsername;
59+
|set spark.sql.catalog.ob.password=$getPassword;
60+
|set `spark.sql.catalog.ob.schema-name`=$getSchemaName;
61+
|set spark.sql.defaultCatalog=ob;
62+
|""".stripMargin)
63+
sqlLines.add(
64+
s"""
65+
|INSERT INTO $getSchemaName.products VALUES
66+
|(101, 'scooter', 'Small 2-wheel scooter', 3.14),
67+
|(102, 'car battery', '12V car battery', 8.1),
68+
|(103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8),
69+
|(104, 'hammer', '12oz carpenter\\'s hammer', 0.75),
70+
|(105, 'hammer', '14oz carpenter\\'s hammer', 0.875),
71+
|(106, 'hammer', '16oz carpenter\\'s hammer', 1.0),
72+
|(107, 'rocks', 'box of assorted rocks', 5.3),
73+
|(108, 'jacket', 'water resistent black wind breaker', 0.1),
74+
|(109, 'spare tire', '24 inch spare tire', 22.2);
75+
|""".stripMargin)
76+
77+
sqlLines.add("select * from products limit 10;")
78+
79+
submitSQLJob(sqlLines, getResource(SINK_CONNECTOR_NAME), getResource(MYSQL_CONNECTOR_JAVA))
80+
81+
val expected: util.List[String] = util.Arrays.asList(
82+
"101,scooter,Small 2-wheel scooter,3.1400000000",
83+
"102,car battery,12V car battery,8.1000000000",
84+
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000",
85+
"104,hammer,12oz carpenter's hammer,0.7500000000",
86+
"105,hammer,14oz carpenter's hammer,0.8750000000",
87+
"106,hammer,16oz carpenter's hammer,1.0000000000",
88+
"107,rocks,box of assorted rocks,5.3000000000",
89+
"108,jacket,water resistent black wind breaker,0.1000000000",
90+
"109,spare tire,24 inch spare tire,22.2000000000"
91+
)
92+
val actual: util.List[String] = queryTable("products")
93+
assertEqualsInAnyOrder(expected, actual)
94+
}
95+
96+
@Test
97+
@DisabledIfSystemProperty(
98+
named = "spark_version",
99+
matches = "^2\\.4\\.[0-9]$",
100+
disabledReason = "Catalog is only supported starting from Spark3."
101+
)
102+
def testCatalogOp(): Unit = {
103+
val sqlLines: util.List[String] = new util.ArrayList[String]
104+
sqlLines.add(s"""
105+
|set spark.sql.catalog.ob=com.oceanbase.spark.catalog.OceanBaseCatalog;
106+
|set spark.sql.catalog.ob.url=$getJdbcUrlInContainer;
107+
|set spark.sql.catalog.ob.username=$getUsername;
108+
|set spark.sql.catalog.ob.password=$getPassword;
109+
|set `spark.sql.catalog.ob.schema-name`=$getSchemaName;
110+
|set spark.sql.defaultCatalog=ob;
111+
|""".stripMargin)
112+
113+
sqlLines.add(s"""
114+
|show databases;
115+
|show tables;
116+
|use test;
117+
|select * from products limit 10;
118+
|""".stripMargin)
119+
120+
Assertions.assertDoesNotThrow(new ThrowingSupplier[Unit] {
121+
override def get(): Unit = {
122+
submitSQLJob(sqlLines, getResource(SINK_CONNECTOR_NAME), getResource(MYSQL_CONNECTOR_JAVA))
123+
}
124+
})
125+
}
126+
}
127+
128+
object OceanBaseCatalogE2eITCase extends SparkContainerTestEnvironment {
129+
private val LOG = LoggerFactory.getLogger(classOf[OceanBaseE2eITCase])
130+
private val SINK_CONNECTOR_NAME =
131+
"^.*spark-connector-oceanbase-\\d+\\.\\d+_\\d+\\.\\d+-[\\d\\.]+(?:-SNAPSHOT)?\\.jar$"
132+
private val MYSQL_CONNECTOR_JAVA = "mysql-connector-java.jar"
133+
134+
@BeforeAll def setup(): Unit = {
135+
OceanBaseMySQLTestBase.CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start()
136+
}
137+
138+
@AfterAll def tearDown(): Unit = {
139+
OceanBaseMySQLTestBase.CONTAINER.stop()
140+
}
141+
142+
}

spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,68 @@ under the License.
5454
</dependencies>
5555

5656
<build>
57+
<pluginManagement>
58+
<plugins>
59+
<plugin>
60+
<groupId>net.alchim31.maven</groupId>
61+
<artifactId>scala-maven-plugin</artifactId>
62+
<version>${scala-maven-plugin.version}</version>
63+
<configuration>
64+
<args>
65+
<arg>-nobootcp</arg>
66+
<arg>-target:jvm-${target.java.version}</arg>
67+
</args>
68+
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
69+
</configuration>
70+
</plugin>
71+
<plugin>
72+
<groupId>org.apache.maven.plugins</groupId>
73+
<artifactId>maven-compiler-plugin</artifactId>
74+
</plugin>
75+
</plugins>
76+
</pluginManagement>
5777
<plugins>
78+
<plugin>
79+
<groupId>net.alchim31.maven</groupId>
80+
<artifactId>scala-maven-plugin</artifactId>
81+
<version>${scala-maven-plugin.version}</version>
82+
<executions>
83+
<!-- Run scala compiler in the process-resources phase, so that dependencies on
84+
scala classes can be resolved later in the (Java) compile phase -->
85+
<execution>
86+
<id>scala-compile-first</id>
87+
<goals>
88+
<goal>add-source</goal>
89+
<goal>compile</goal>
90+
</goals>
91+
<phase>process-resources</phase>
92+
</execution>
93+
94+
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
95+
scala classes can be resolved later in the (Java) test-compile phase -->
96+
<execution>
97+
<id>scala-test-compile</id>
98+
<goals>
99+
<goal>testCompile</goal>
100+
</goals>
101+
<phase>process-test-resources</phase>
102+
</execution>
103+
</executions>
104+
</plugin>
105+
<plugin>
106+
<groupId>com.diffplug.spotless</groupId>
107+
<artifactId>spotless-maven-plugin</artifactId>
108+
<version>${spotless.version}</version>
109+
<configuration>
110+
<scala>
111+
<scalafmt>
112+
<version>3.4.3</version>
113+
<!-- This file is in the root of the project to make sure IntelliJ picks it up automatically -->
114+
<file>${project.basedir}/../../.scalafmt.conf</file>
115+
</scalafmt>
116+
</scala>
117+
</configuration>
118+
</plugin>
58119
<plugin>
59120
<groupId>org.apache.maven.plugins</groupId>
60121
<artifactId>maven-shade-plugin</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2024 OceanBase.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.oceanbase.spark.writer.v2
17+
18+
import com.oceanbase.spark.catalog.OceanBaseCatalog
19+
import com.oceanbase.spark.config.OceanBaseConfig
20+
import com.oceanbase.spark.directload.{DirectLoader, DirectLoadUtils}
21+
22+
import org.apache.spark.sql.catalyst.InternalRow
23+
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, PhysicalWriteInfo, WriteBuilder, WriterCommitMessage}
24+
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
25+
import org.apache.spark.sql.types.StructType
26+
27+
import scala.collection.JavaConverters.mapAsJavaMapConverter
28+
29+
/** Direct-load writing implementation based on Spark DataSource V2 API. */
30+
case class DirectLoadWriteBuilderV2(schema: StructType, options: JDBCOptions) extends WriteBuilder {
31+
override def buildForBatch(): BatchWrite = {
32+
val map = options.parameters ++ Map(
33+
OceanBaseConfig.SCHEMA_NAME.getKey -> options.parameters(
34+
OceanBaseCatalog.CURRENT_DATABASE
35+
),
36+
OceanBaseConfig.TABLE_NAME.getKey -> options.parameters(
37+
OceanBaseCatalog.CURRENT_TABLE
38+
)
39+
)
40+
new DirectLoadBatchWrite(schema, new OceanBaseConfig(map.asJava))
41+
}
42+
}
43+
44+
/** This will be performed on the Driver side and no serialization is required. */
45+
class DirectLoadBatchWrite(schema: StructType, oceanBaseConfig: OceanBaseConfig)
46+
extends BatchWrite {
47+
48+
private val directLoader: DirectLoader =
49+
DirectLoadUtils.buildDirectLoaderFromSetting(oceanBaseConfig)
50+
private val executionId: String = directLoader.begin()
51+
oceanBaseConfig.set(OceanBaseConfig.DIRECT_LOAD_EXECUTION_ID, executionId)
52+
53+
override def createBatchWriterFactory(
54+
info: PhysicalWriteInfo
55+
): DataWriterFactory = {
56+
57+
new DirectLoadDataWriterFactory(schema: StructType, oceanBaseConfig)
58+
}
59+
60+
override def commit(messages: Array[WriterCommitMessage]): Unit = {
61+
directLoader.commit()
62+
directLoader.close()
63+
}
64+
65+
override def abort(messages: Array[WriterCommitMessage]): Unit = {
66+
directLoader.close()
67+
}
68+
}
69+
70+
class DirectLoadDataWriterFactory(
71+
schema: StructType,
72+
oceanBaseConfig: OceanBaseConfig
73+
) extends DataWriterFactory {
74+
75+
override def createWriter(
76+
partitionId: Int,
77+
taskId: Long
78+
): DataWriter[InternalRow] = {
79+
new DirectLoadWriteV2(schema, oceanBaseConfig)
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2024 OceanBase.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.oceanbase.spark.writer.v2
17+
18+
import com.oceanbase.spark.dialect.OceanBaseDialect
19+
20+
import org.apache.spark.sql.catalyst.InternalRow
21+
import org.apache.spark.sql.connector.write._
22+
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
23+
import org.apache.spark.sql.types.StructType
24+
25+
class JDBCWriteBuilder(
26+
schema: StructType,
27+
option: JDBCOptions,
28+
dialect: OceanBaseDialect
29+
) extends WriteBuilder {
30+
override def buildForBatch(): BatchWrite =
31+
new JDBCBatchWrite(schema, option, dialect)
32+
}
33+
34+
class JDBCBatchWrite(
35+
schema: StructType,
36+
option: JDBCOptions,
37+
dialect: OceanBaseDialect
38+
) extends BatchWrite
39+
with DataWriterFactory {
40+
41+
override def createBatchWriterFactory(
42+
info: PhysicalWriteInfo
43+
): DataWriterFactory = this
44+
45+
override def commit(messages: Array[WriterCommitMessage]): Unit = {}
46+
47+
override def abort(messages: Array[WriterCommitMessage]): Unit = {}
48+
49+
override def createWriter(
50+
partitionId: Int,
51+
taskId: Long
52+
): DataWriter[InternalRow] = {
53+
new JDBCWriter(schema: StructType, option: JDBCOptions, dialect)
54+
}
55+
}

0 commit comments

Comments
 (0)