Skip to content

Commit 3a88fc0

Browse files
authored
feat: Supports table delete . (#40)
1 parent 55f4ebe commit 3a88fc0

File tree

4 files changed

+78
-1
lines changed

4 files changed

+78
-1
lines changed

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/catalog/OceanBaseTable.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ import com.oceanbase.spark.reader.v2.OBJdbcScanBuilder
2222
import com.oceanbase.spark.utils.OBJdbcUtils
2323
import com.oceanbase.spark.writer.v2.{DirectLoadWriteBuilderV2, JDBCWriteBuilder}
2424

25+
import org.apache.spark.sql.ExprUtils.compileFilter
2526
import org.apache.spark.sql.SparkSession
2627
import org.apache.spark.sql.connector.catalog._
2728
import org.apache.spark.sql.connector.catalog.TableCapability._
2829
import org.apache.spark.sql.connector.read.ScanBuilder
2930
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
3031
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
32+
import org.apache.spark.sql.sources.Filter
3133
import org.apache.spark.sql.types.StructType
3234
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3335

@@ -44,7 +46,8 @@ case class OceanBaseTable(
4446
extends Table
4547
with SupportsRead
4648
with SupportsWrite
47-
with TruncatableTable {
49+
with TruncatableTable
50+
with SupportsDelete {
4851

4952
override def name(): String = ident.toString
5053

@@ -84,4 +87,33 @@ case class OceanBaseTable(
8487
case Failure(_) => false
8588
}
8689
}
90+
91+
override def canDeleteWhere(filters: Array[Filter]): Boolean = {
92+
filters.forall(filter => compileFilter(filter, dialect).isDefined)
93+
}
94+
95+
override def deleteWhere(filters: Array[Filter]): Unit = {
96+
val filterWhereClause: String =
97+
filters
98+
.flatMap(compileFilter(_, dialect))
99+
.map(p => s"($p)")
100+
.mkString(" AND ")
101+
102+
val whereClause: String = {
103+
if (filterWhereClause.nonEmpty) {
104+
"WHERE " + filterWhereClause
105+
} else {
106+
""
107+
}
108+
}
109+
OBJdbcUtils.withConnection(config) {
110+
conn =>
111+
{
112+
OBJdbcUtils.executeStatement(
113+
conn,
114+
config,
115+
dialect.getDeleteWhereSql(config.getDbTable, whereClause))
116+
}
117+
}
118+
}
87119
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/dialect/OceanBaseDialect.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ abstract class OceanBaseDialect extends Logging with Serializable {
128128
s"TRUNCATE TABLE $table"
129129
}
130130

131+
def getDeleteWhereSql(tableName: String, whereClause: String): String = {
132+
val sql = s"DELETE FROM $tableName $whereClause"
133+
logInfo(s"The generated OceanBase delete sql statement: $sql")
134+
sql
135+
}
136+
131137
def getJDBCType(dt: DataType): Option[JdbcType] = None
132138

133139
/** Creates a schema. */

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/utils/OBJdbcUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ object OBJdbcUtils {
112112
try {
113113
statement.setQueryTimeout(config.getJdbcQueryTimeout)
114114
statement.executeUpdate(sql)
115+
} catch {
116+
case exception: Exception =>
117+
throw new RuntimeException(s"Failed to execute sql: $sql", exception.getCause)
115118
} finally {
116119
statement.close()
117120
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OBCatalogMySQLITCase.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,42 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
231231
session.stop()
232232
}
233233

234+
@Test
235+
def testDeleteWhere(): Unit = {
236+
val session = SparkSession
237+
.builder()
238+
.master("local[*]")
239+
.config("spark.sql.catalog.ob", OB_CATALOG_CLASS)
240+
.config("spark.sql.catalog.ob.url", getJdbcUrl)
241+
.config("spark.sql.catalog.ob.username", getUsername)
242+
.config("spark.sql.catalog.ob.password", getPassword)
243+
.config("spark.sql.catalog.ob.schema-name", getSchemaName)
244+
.getOrCreate()
245+
246+
session.sql("use ob;")
247+
insertTestData(session, "products")
248+
session.sql("delete from products where 1 = 0")
249+
queryAndVerifyTableData(session, "products", expected)
250+
251+
session.sql("delete from products where id = 1")
252+
queryAndVerifyTableData(session, "products", expected)
253+
254+
session.sql("delete from products where description is null")
255+
queryAndVerifyTableData(session, "products", expected)
256+
257+
session.sql("delete from products where id in (101, 102, 103)")
258+
session.sql("delete from products where name = 'hammer'")
259+
260+
session.sql("delete from products where name like 'rock%'")
261+
session.sql("delete from products where name like '%jack%' and id = 108 or weight = 5.3")
262+
session.sql("delete from products where id >= 109")
263+
264+
val expect = new util.ArrayList[String]()
265+
queryAndVerifyTableData(session, "products", expect)
266+
267+
session.stop()
268+
}
269+
234270
private def queryAndVerifyTableData(
235271
session: SparkSession,
236272
tableName: String,

0 commit comments

Comments
 (0)