Skip to content

Commit f566092

Browse files
authored
Enhancement: Support table truncate and overwrite. (#34)
* Enhancement: Support table truncate and overwrite.
1 parent a49d418 commit f566092

File tree

6 files changed

+203
-11
lines changed

6 files changed

+203
-11
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2024 OceanBase.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.oceanbase.spark.catalog
17+
18+
import com.oceanbase.spark.config.OceanBaseConfig
19+
import com.oceanbase.spark.dialect.OceanBaseDialect
20+
import com.oceanbase.spark.reader.JDBCLimitScanBuilder
21+
import com.oceanbase.spark.reader.v2.OBJdbcScanBuilder
22+
import com.oceanbase.spark.utils.OBJdbcUtils
23+
import com.oceanbase.spark.writer.v2.{DirectLoadWriteBuilderV2, JDBCWriteBuilder}
24+
25+
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.sql.connector.catalog._
27+
import org.apache.spark.sql.connector.catalog.TableCapability._
28+
import org.apache.spark.sql.connector.read.ScanBuilder
29+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
30+
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
31+
import org.apache.spark.sql.types.StructType
32+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
33+
34+
import java.util
35+
36+
import scala.collection.JavaConverters._
37+
import scala.util.{Failure, Success, Try}
38+
39+
case class OceanBaseTable(
40+
ident: Identifier,
41+
schema: StructType,
42+
config: OceanBaseConfig,
43+
dialect: OceanBaseDialect)
44+
extends Table
45+
with SupportsRead
46+
with SupportsWrite {
47+
48+
override def name(): String = ident.toString
49+
50+
override def capabilities(): util.Set[TableCapability] = {
51+
util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE)
52+
}
53+
54+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
55+
val mergedOptions = new JDBCOptions(
56+
config.getProperties.asScala.toMap ++ options.asCaseSensitiveMap().asScala)
57+
58+
mergedOptions.parameters
59+
.get(OceanBaseConfig.ENABLE_LEGACY_BATCH_READER)
60+
.map(_.toBoolean) match {
61+
case Some(true) => JDBCLimitScanBuilder(SparkSession.active, schema, mergedOptions)
62+
case _ => OBJdbcScanBuilder(schema, config, dialect)
63+
}
64+
}
65+
66+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
67+
68+
if (config.getDirectLoadEnable) {
69+
DirectLoadWriteBuilderV2(schema, config)
70+
} else {
71+
new JDBCWriteBuilder(schema, config, dialect)
72+
}
73+
}
74+
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/catalog/OceanBaseCatalog.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import java.sql.SQLException
3232

3333
import scala.collection.JavaConverters._
3434
import scala.collection.mutable
35+
import scala.util.{Failure, Success, Try}
3536

3637
class OceanBaseCatalog
3738
extends TableCatalog
@@ -181,6 +182,19 @@ class OceanBaseCatalog
181182
}
182183
}
183184

185+
override def purgeTable(ident: Identifier): Boolean = {
186+
val config = genNewOceanBaseConfig(this.config, ident)
187+
Try {
188+
OBJdbcUtils.withConnection(config) {
189+
conn =>
190+
OBJdbcUtils.executeStatement(conn, config, dialect.getTruncateQuery(config.getDbTable))
191+
}
192+
} match {
193+
case Success(_) => true
194+
case Failure(_) => false
195+
}
196+
}
197+
184198
override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
185199
case Array(db) =>
186200
OBJdbcUtils.withConnection(config)(conn => dialect.schemaExists(conn, config, db))

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/catalog/OceanBaseTable.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import com.oceanbase.spark.config.OceanBaseConfig
1919
import com.oceanbase.spark.dialect.OceanBaseDialect
2020
import com.oceanbase.spark.reader.JDBCLimitScanBuilder
2121
import com.oceanbase.spark.reader.v2.OBJdbcScanBuilder
22+
import com.oceanbase.spark.utils.OBJdbcUtils
2223
import com.oceanbase.spark.writer.v2.{DirectLoadWriteBuilderV2, JDBCWriteBuilder}
2324

2425
import org.apache.spark.sql.SparkSession
@@ -33,6 +34,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
3334
import java.util
3435

3536
import scala.collection.JavaConverters._
37+
import scala.util.{Failure, Success, Try}
3638

3739
case class OceanBaseTable(
3840
ident: Identifier,
@@ -41,7 +43,8 @@ case class OceanBaseTable(
4143
dialect: OceanBaseDialect)
4244
extends Table
4345
with SupportsRead
44-
with SupportsWrite {
46+
with SupportsWrite
47+
with TruncatableTable {
4548

4649
override def name(): String = ident.toString
4750

@@ -69,4 +72,16 @@ case class OceanBaseTable(
6972
new JDBCWriteBuilder(schema, config, dialect)
7073
}
7174
}
75+
76+
override def truncateTable(): Boolean = {
77+
Try {
78+
OBJdbcUtils.withConnection(config) {
79+
conn =>
80+
OBJdbcUtils.executeStatement(conn, config, dialect.getTruncateQuery(config.getDbTable))
81+
}
82+
} match {
83+
case Success(_) => true
84+
case Failure(_) => false
85+
}
86+
}
7287
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/v2/DirectLoadWriteBuilderV2.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,45 @@
1515
*/
1616
package com.oceanbase.spark.writer.v2
1717

18+
import com.oceanbase.spark.catalog.OceanBaseCatalogException
1819
import com.oceanbase.spark.config.OceanBaseConfig
1920
import com.oceanbase.spark.directload.{DirectLoader, DirectLoadUtils}
21+
import com.oceanbase.spark.utils.OBJdbcUtils
2022

2123
import org.apache.spark.sql.catalyst.InternalRow
2224
import org.apache.spark.sql.connector.metric.CustomMetric
23-
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, PhysicalWriteInfo, Write, WriteBuilder, WriterCommitMessage}
25+
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, PhysicalWriteInfo, SupportsOverwrite, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage}
26+
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
2427
import org.apache.spark.sql.types.StructType
2528

29+
import scala.util.{Failure, Try}
30+
2631
/** Direct-load writing implementation based on Spark DataSource V2 API. */
2732
case class DirectLoadWriteBuilderV2(schema: StructType, config: OceanBaseConfig)
28-
extends WriteBuilder {
33+
extends WriteBuilder
34+
with SupportsOverwrite
35+
with SupportsTruncate {
2936
override def build(): Write = new DirectLoadWrite(schema, config)
37+
38+
override def overwrite(filters: Array[Filter]): WriteBuilder = {
39+
if (filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]) {
40+
Try {
41+
OBJdbcUtils.withConnection(config) {
42+
conn => OBJdbcUtils.executeStatement(conn, config, s"truncate table ${config.getDbTable}")
43+
}
44+
} match {
45+
case Failure(exception) =>
46+
throw OceanBaseCatalogException(
47+
s"Failed to truncate table ${config.getDbTable}",
48+
exception)
49+
case _ =>
50+
}
51+
} else {
52+
throw OceanBaseCatalogException(s"Currently only overwrite full data is supported.")
53+
}
54+
55+
this
56+
}
3057
}
3158

3259
class DirectLoadWrite(schema: StructType, config: OceanBaseConfig) extends Write {

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/v2/JDBCWriteBuilder.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,45 @@
1515
*/
1616
package com.oceanbase.spark.writer.v2
1717

18+
import com.oceanbase.spark.catalog.OceanBaseCatalogException
1819
import com.oceanbase.spark.config.OceanBaseConfig
1920
import com.oceanbase.spark.dialect.OceanBaseDialect
21+
import com.oceanbase.spark.utils.OBJdbcUtils
2022

2123
import org.apache.spark.sql.catalyst.InternalRow
2224
import org.apache.spark.sql.connector.metric.CustomMetric
23-
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, PhysicalWriteInfo, Write, WriteBuilder, WriterCommitMessage}
25+
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, PhysicalWriteInfo, SupportsOverwrite, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage}
26+
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
2427
import org.apache.spark.sql.types.StructType
2528

29+
import scala.util.{Failure, Try}
30+
2631
class JDBCWriteBuilder(schema: StructType, config: OceanBaseConfig, dialect: OceanBaseDialect)
27-
extends WriteBuilder {
32+
extends WriteBuilder
33+
with SupportsOverwrite
34+
with SupportsTruncate {
2835
override def build(): Write = new JDBCWrite(schema, config, dialect)
36+
37+
override def overwrite(filters: Array[Filter]): WriteBuilder = {
38+
if (filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]) {
39+
Try {
40+
OBJdbcUtils.withConnection(config) {
41+
conn =>
42+
OBJdbcUtils.executeStatement(conn, config, dialect.getTruncateQuery(config.getDbTable))
43+
}
44+
} match {
45+
case Failure(exception) =>
46+
throw OceanBaseCatalogException(
47+
s"Failed to truncate table ${config.getDbTable}",
48+
exception)
49+
case _ =>
50+
}
51+
} else {
52+
throw OceanBaseCatalogException(s"Currently only overwrite full data is supported.")
53+
}
54+
55+
this
56+
}
2957
}
3058

3159
class JDBCWrite(schema: StructType, config: OceanBaseConfig, dialect: OceanBaseDialect)

spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OBCatalogMySQLITCase.scala

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import org.apache.spark.sql.SparkSession
2222
import org.junit.jupiter.api.{AfterAll, AfterEach, Assertions, BeforeAll, BeforeEach, Test}
2323
import org.junit.jupiter.api.function.ThrowingSupplier
2424

25+
import java.util
26+
2527
class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
2628

2729
@BeforeEach
@@ -50,7 +52,7 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
5052

5153
session.sql("use ob;")
5254
insertTestData(session, "products")
53-
queryAndVerifyTableData(session, "products")
55+
queryAndVerifyTableData(session, "products", expected)
5456

5557
session.stop()
5658
}
@@ -113,7 +115,7 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
113115

114116
session.sql("use ob;")
115117
insertTestData(session, "products_no_pri_key")
116-
queryAndVerifyTableData(session, "products_no_pri_key")
118+
queryAndVerifyTableData(session, "products_no_pri_key", expected)
117119
session.stop()
118120
}
119121

@@ -132,7 +134,7 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
132134
session.sql("use ob;")
133135
insertTestData(session, "products_full_pri_key")
134136

135-
queryAndVerifyTableData(session, "products_full_pri_key")
137+
queryAndVerifyTableData(session, "products_full_pri_key", expected)
136138

137139
session.stop()
138140
}
@@ -154,7 +156,11 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
154156
.getOrCreate()
155157

156158
insertTestData(session, "products")
157-
queryAndVerifyTableData(session, "products")
159+
queryAndVerifyTableData(session, "products", expected)
160+
161+
insertTestData(session, "products_no_pri_key")
162+
session.sql("insert overwrite products select * from products_no_pri_key")
163+
queryAndVerifyTableData(session, "products", expected)
158164
session.stop()
159165
}
160166

@@ -173,7 +179,7 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
173179
insertTestData(session, "products")
174180
// Test CTAS
175181
session.sql("create table test1 as select * from products")
176-
queryAndVerifyTableData(session, "test1")
182+
queryAndVerifyTableData(session, "test1", expected)
177183

178184
// test bucket partition table:
179185
// 1. column comment test
@@ -199,7 +205,35 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
199205
session.stop()
200206
}
201207

202-
private def queryAndVerifyTableData(session: SparkSession, tableName: String): Unit = {
208+
@Test
209+
def testTruncateAndOverWriteTable(): Unit = {
210+
val session = SparkSession
211+
.builder()
212+
.master("local[*]")
213+
.config("spark.sql.catalog.ob", OB_CATALOG_CLASS)
214+
.config("spark.sql.catalog.ob.url", getJdbcUrl)
215+
.config("spark.sql.catalog.ob.username", getUsername)
216+
.config("spark.sql.catalog.ob.password", getPassword)
217+
.config("spark.sql.catalog.ob.schema-name", getSchemaName)
218+
.getOrCreate()
219+
220+
session.sql("use ob;")
221+
insertTestData(session, "products")
222+
session.sql("truncate table products")
223+
val expect = new util.ArrayList[String]()
224+
queryAndVerifyTableData(session, "products", expect)
225+
226+
insertTestData(session, "products_no_pri_key")
227+
session.sql("insert overwrite products select * from products_no_pri_key")
228+
queryAndVerifyTableData(session, "products", expected)
229+
230+
session.stop()
231+
}
232+
233+
private def queryAndVerifyTableData(
234+
session: SparkSession,
235+
tableName: String,
236+
expected: util.List[String]): Unit = {
203237
import scala.collection.JavaConverters._
204238
val actual = session
205239
.sql(s"select * from $tableName")

0 commit comments

Comments
 (0)