Skip to content

Commit 846f1be

Browse files
committed
Enhancement: Fix compatibility issues with Spark 3.2.
1 parent 0fc505a commit 846f1be

File tree

1 file changed

+127
-0
lines changed
  • spark-connector-oceanbase/spark-connector-oceanbase-3.2/src/main/scala/com/oceanbase/spark/catalog

1 file changed

+127
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2024 OceanBase.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.oceanbase.spark.catalog
17+
18+
import com.oceanbase.spark.config.OceanBaseConfig
19+
import com.oceanbase.spark.dialect.OceanBaseDialect
20+
import com.oceanbase.spark.reader.JDBCLimitScanBuilder
21+
import com.oceanbase.spark.reader.v2.OBJdbcScanBuilder
22+
import com.oceanbase.spark.utils.OBJdbcUtils
23+
import com.oceanbase.spark.writer.v2.{
24+
DirectLoadWriteBuilderV2,
25+
JDBCWriteBuilder
26+
}
27+
import org.apache.spark.sql.ExprUtils.compileFilter
28+
import org.apache.spark.sql.SparkSession
29+
import org.apache.spark.sql.connector.catalog.TableCapability._
30+
import org.apache.spark.sql.connector.catalog._
31+
import org.apache.spark.sql.connector.read.ScanBuilder
32+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
33+
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
34+
import org.apache.spark.sql.sources.Filter
35+
import org.apache.spark.sql.types.StructType
36+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
37+
38+
import java.util
39+
import scala.collection.JavaConverters._
40+
import scala.util.{Failure, Success, Try}
41+
42+
case class OceanBaseTable(
43+
ident: Identifier,
44+
schema: StructType,
45+
config: OceanBaseConfig,
46+
dialect: OceanBaseDialect
47+
) extends Table
48+
with SupportsRead
49+
with SupportsWrite
50+
with TruncatableTable
51+
with SupportsDelete {
52+
53+
override def name(): String = ident.toString
54+
55+
override def capabilities(): util.Set[TableCapability] = {
56+
util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE)
57+
}
58+
59+
override def newScanBuilder(
60+
options: CaseInsensitiveStringMap
61+
): ScanBuilder = {
62+
val mergedOptions = new JDBCOptions(
63+
config.getProperties.asScala.toMap ++ options.asCaseSensitiveMap().asScala
64+
)
65+
66+
mergedOptions.parameters
67+
.get(OceanBaseConfig.ENABLE_LEGACY_BATCH_READER)
68+
.map(_.toBoolean) match {
69+
case Some(true) =>
70+
JDBCLimitScanBuilder(SparkSession.active, schema, mergedOptions)
71+
case _ => OBJdbcScanBuilder(schema, config, dialect)
72+
}
73+
}
74+
75+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
76+
77+
if (config.getDirectLoadEnable) {
78+
DirectLoadWriteBuilderV2(schema, config)
79+
} else {
80+
new JDBCWriteBuilder(schema, config, dialect)
81+
}
82+
}
83+
84+
override def truncateTable(): Boolean = {
85+
Try {
86+
OBJdbcUtils.withConnection(config) { conn =>
87+
OBJdbcUtils.executeStatement(
88+
conn,
89+
config,
90+
dialect.getTruncateQuery(config.getDbTable)
91+
)
92+
}
93+
} match {
94+
case Success(_) => true
95+
case Failure(_) => false
96+
}
97+
}
98+
99+
override def canDeleteWhere(filters: Array[Filter]): Boolean = {
100+
filters.forall(filter => compileFilter(filter, dialect).isDefined)
101+
}
102+
103+
override def deleteWhere(filters: Array[Filter]): Unit = {
104+
val filterWhereClause: String =
105+
filters
106+
.flatMap(compileFilter(_, dialect))
107+
.map(p => s"($p)")
108+
.mkString(" AND ")
109+
110+
val whereClause: String = {
111+
if (filterWhereClause.nonEmpty) {
112+
"WHERE " + filterWhereClause
113+
} else {
114+
""
115+
}
116+
}
117+
OBJdbcUtils.withConnection(config) { conn =>
118+
{
119+
OBJdbcUtils.executeStatement(
120+
conn,
121+
config,
122+
dialect.getDeleteWhereSql(config.getDbTable, whereClause)
123+
)
124+
}
125+
}
126+
}
127+
}

0 commit comments

Comments
 (0)