Skip to content

Commit 4adf65e

Browse files
authored
Enhancement: Support limit pushdown and top-n pushdown. (#56)
* Enhancement: Support limit pushdown and top-n pushdown.
1 parent cf01023 commit 4adf65e

File tree

8 files changed

+417
-3
lines changed

8 files changed

+417
-3
lines changed

spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,68 @@ under the License.
5454
</dependencies>
5555

5656
<build>
57+
<pluginManagement>
58+
<plugins>
59+
<plugin>
60+
<groupId>net.alchim31.maven</groupId>
61+
<artifactId>scala-maven-plugin</artifactId>
62+
<version>${scala-maven-plugin.version}</version>
63+
<configuration>
64+
<args>
65+
<arg>-nobootcp</arg>
66+
<arg>-target:jvm-${target.java.version}</arg>
67+
</args>
68+
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
69+
</configuration>
70+
</plugin>
71+
<plugin>
72+
<groupId>org.apache.maven.plugins</groupId>
73+
<artifactId>maven-compiler-plugin</artifactId>
74+
</plugin>
75+
</plugins>
76+
</pluginManagement>
5777
<plugins>
78+
<plugin>
79+
<groupId>net.alchim31.maven</groupId>
80+
<artifactId>scala-maven-plugin</artifactId>
81+
<version>${scala-maven-plugin.version}</version>
82+
<executions>
83+
<!-- Run scala compiler in the process-resources phase, so that dependencies on
84+
scala classes can be resolved later in the (Java) compile phase -->
85+
<execution>
86+
<id>scala-compile-first</id>
87+
<goals>
88+
<goal>add-source</goal>
89+
<goal>compile</goal>
90+
</goals>
91+
<phase>process-resources</phase>
92+
</execution>
93+
94+
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
95+
scala classes can be resolved later in the (Java) test-compile phase -->
96+
<execution>
97+
<id>scala-test-compile</id>
98+
<goals>
99+
<goal>testCompile</goal>
100+
</goals>
101+
<phase>process-test-resources</phase>
102+
</execution>
103+
</executions>
104+
</plugin>
105+
<plugin>
106+
<groupId>com.diffplug.spotless</groupId>
107+
<artifactId>spotless-maven-plugin</artifactId>
108+
<version>${spotless.version}</version>
109+
<configuration>
110+
<scala>
111+
<scalafmt>
112+
<version>3.4.3</version>
113+
<!-- This file is in the root of the project to make sure IntelliJ picks it up automatically -->
114+
<file>${project.basedir}/../../.scalafmt.conf</file>
115+
</scalafmt>
116+
</scala>
117+
</configuration>
118+
</plugin>
58119
<plugin>
59120
<groupId>org.apache.maven.plugins</groupId>
60121
<artifactId>maven-shade-plugin</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright 2024 OceanBase.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.oceanbase.spark.reader.v2
18+
19+
import com.oceanbase.spark.config.OceanBaseConfig
20+
import com.oceanbase.spark.dialect.OceanBaseDialect
21+
22+
import org.apache.spark.internal.Logging
23+
import org.apache.spark.sql.ExprUtils.compileFilter
24+
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.connector.expressions.{NamedReference, SortOrder}
26+
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
27+
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsRuntimeFiltering}
28+
import org.apache.spark.sql.sources.Filter
29+
import org.apache.spark.sql.types.StructType
30+
31+
case class OBJdbcScanBuilder(
32+
schema: StructType,
33+
config: OceanBaseConfig,
34+
dialect: OceanBaseDialect
35+
) extends ScanBuilder
36+
with SupportsPushDownFilters
37+
with SupportsPushDownRequiredColumns
38+
with SupportsPushDownAggregates
39+
with Logging {
40+
private var finalSchema = schema
41+
private var pushedFilter = Array.empty[Filter]
42+
private var pushDownLimit = 0
43+
private var sortOrders: Array[SortOrder] = Array.empty[SortOrder]
44+
45+
/** TODO: support org.apache.spark.sql.connector.read.SupportsPushDownV2Filters */
46+
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
47+
val (pushed, unSupported) =
48+
filters.partition(f => compileFilter(f, dialect).isDefined)
49+
this.pushedFilter = pushed
50+
unSupported
51+
}
52+
53+
override def pushedFilters(): Array[Filter] = pushedFilter
54+
55+
override def pruneColumns(requiredSchema: StructType): Unit = {
56+
val requiredCols = requiredSchema.map(_.name)
57+
this.finalSchema = StructType(
58+
finalSchema.filter(field => requiredCols.contains(field.name))
59+
)
60+
}
61+
62+
override def pushAggregation(aggregation: Aggregation): Boolean = {
63+
// TODO: support aggregation push down
64+
false
65+
}
66+
67+
override def build(): Scan =
68+
OBJdbcBatchScan(
69+
finalSchema: StructType,
70+
config: OceanBaseConfig,
71+
pushedFilter: Array[Filter],
72+
pushDownLimit: Int,
73+
sortOrders: Array[SortOrder],
74+
dialect: OceanBaseDialect
75+
)
76+
}
77+
78+
case class OBJdbcBatchScan(
79+
schema: StructType,
80+
config: OceanBaseConfig,
81+
pushedFilter: Array[Filter],
82+
pushDownLimit: Int,
83+
pushDownTopNSortOrders: Array[SortOrder],
84+
dialect: OceanBaseDialect
85+
) extends Scan
86+
with SupportsRuntimeFiltering {
87+
88+
// TODO: support spark runtime filter feat.
89+
private var runtimeFilters: Array[Filter] = Array.empty
90+
91+
override def readSchema(): StructType = schema
92+
93+
override def toBatch: Batch =
94+
new OBJdbcBatch(
95+
schema: StructType,
96+
config: OceanBaseConfig,
97+
pushedFilter: Array[Filter],
98+
pushDownLimit: Int,
99+
pushDownTopNSortOrders: Array[SortOrder],
100+
dialect: OceanBaseDialect
101+
)
102+
103+
override def filterAttributes(): Array[NamedReference] = Array.empty
104+
105+
override def filter(filters: Array[Filter]): Unit = {
106+
runtimeFilters = filters
107+
}
108+
}
109+
110+
class OBJdbcBatch(
111+
schema: StructType,
112+
config: OceanBaseConfig,
113+
pushedFilter: Array[Filter],
114+
pushDownLimit: Int,
115+
pushDownTopNSortOrders: Array[SortOrder],
116+
dialect: OceanBaseDialect
117+
) extends Batch {
118+
private lazy val inputPartitions: Array[InputPartition] =
119+
OBMySQLPartition.columnPartition(config, dialect)
120+
121+
override def planInputPartitions(): Array[InputPartition] = inputPartitions
122+
123+
override def createReaderFactory(): PartitionReaderFactory =
124+
new OBJdbcReaderFactory(
125+
schema: StructType,
126+
config: OceanBaseConfig,
127+
pushedFilter: Array[Filter],
128+
pushDownLimit: Int,
129+
pushDownTopNSortOrders: Array[SortOrder],
130+
dialect: OceanBaseDialect
131+
)
132+
}
133+
134+
class OBJdbcReaderFactory(
135+
schema: StructType,
136+
config: OceanBaseConfig,
137+
pushedFilter: Array[Filter],
138+
pushDownLimit: Int,
139+
pushDownTopNSortOrders: Array[SortOrder],
140+
dialect: OceanBaseDialect
141+
) extends PartitionReaderFactory {
142+
143+
override def createReader(
144+
partition: InputPartition
145+
): PartitionReader[InternalRow] =
146+
new OBJdbcReader(
147+
schema: StructType,
148+
config: OceanBaseConfig,
149+
partition: InputPartition,
150+
pushedFilter: Array[Filter],
151+
pushDownLimit: Int,
152+
pushDownTopNSortOrders: Array[SortOrder],
153+
dialect: OceanBaseDialect
154+
)
155+
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,21 @@ public class OceanBaseConfig extends Config implements Serializable {
263263
.booleanConf()
264264
.createWithDefault(false);
265265

266+
public static final ConfigEntry<Boolean> JDBC_ENABLE_PUSH_DOWN_LIMIT =
267+
new ConfigBuilder("jdbc.enable-pushdown-limit")
268+
.doc("Whether to enable pushdown of LIMIT clause to OceanBase.")
269+
.version(ConfigConstants.VERSION_1_2_0)
270+
.booleanConf()
271+
.createWithDefault(true);
272+
273+
public static final ConfigEntry<Boolean> JDBC_ENABLE_PUSH_DOWN_TOP_N =
274+
new ConfigBuilder("jdbc.enable-pushdown-top-n")
275+
.doc(
276+
"Whether to enable pushdown of ORDER BY ... LIMIT N (Top-N) queries to OceanBase. This configuration only takes effect when 'jdbc.enable-pushdown-limit' is true.")
277+
.version(ConfigConstants.VERSION_1_2_0)
278+
.booleanConf()
279+
.createWithDefault(true);
280+
266281
public static final ConfigEntry<Boolean> ENABLE_ONLY_AUTO_INC_USE_WHERE_PARTITION =
267282
new ConfigBuilder("jdbc.enable-only-auto-inc-use-where-partition")
268283
.doc(
@@ -424,6 +439,14 @@ public Boolean getEnableRewriteQuerySql() {
424439
return get(JDBC_ENABLE_REWRITE_QUERY_SQL);
425440
}
426441

442+
public Boolean getEnablePushdownLimit() {
443+
return get(JDBC_ENABLE_PUSH_DOWN_LIMIT);
444+
}
445+
446+
public Boolean getEnablePushdownTopN() {
447+
return get(JDBC_ENABLE_PUSH_DOWN_TOP_N);
448+
}
449+
427450
public Boolean getEnableOnlyAutoIncUseWherePartition() {
428451
return get(ENABLE_ONLY_AUTO_INC_USE_WHERE_PARTITION);
429452
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/dialect/OceanBaseDialect.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ abstract class OceanBaseDialect extends Logging with Serializable {
134134
sql
135135
}
136136

137+
/** returns the LIMIT clause for the SELECT statement */
138+
def getLimitClause(limit: Integer): String = {
139+
if (limit > 0) s"LIMIT $limit" else ""
140+
}
141+
137142
def getJDBCType(dt: DataType): Option[JdbcType] = None
138143

139144
/** Creates a schema. */

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/dialect/OceanBaseOracleDialect.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,15 @@ class OceanBaseOracleDialect extends OceanBaseDialect {
8383
throw new UnsupportedOperationException("Not currently supported in oracle mode")
8484
}
8585

86+
/**
87+
* returns the LIMIT clause for the SELECT statement
88+
*
89+
* Oracle mode not supported
90+
*/
91+
override def getLimitClause(limit: Integer): String = {
92+
""
93+
}
94+
8695
override def compileValue(value: Any): Any = value match {
8796
// The JDBC drivers support date literals in SQL statements written in the
8897
// format: {d 'yyyy-mm-dd'} and timestamp literals in SQL statements written

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/reader/v2/OBJdbcReader.scala

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.ExprUtils.compileFilter
2626
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
2727
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
2828
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
29+
import org.apache.spark.sql.connector.expressions.{NullOrdering, SortDirection, SortOrder}
2930
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
3031
import org.apache.spark.sql.sources.Filter
3132
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CharType, DataType, DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, Metadata, ShortType, StringType, StructType, TimestampType, VarcharType}
@@ -40,6 +41,8 @@ class OBJdbcReader(
4041
config: OceanBaseConfig,
4142
partition: InputPartition,
4243
pushedFilter: Array[Filter],
44+
pushDownLimit: Int,
45+
pushDownTopNSortOrders: Array[SortOrder],
4346
dialect: OceanBaseDialect)
4447
extends PartitionReader[InternalRow]
4548
with SQLConfHelper
@@ -109,16 +112,69 @@ class OBJdbcReader(
109112
}
110113
}
111114

115+
val myLimitClause: String = {
116+
if (part.limitOffsetClause == null || part.limitOffsetClause.isEmpty)
117+
dialect.getLimitClause(pushDownLimit)
118+
else
119+
""
120+
}
121+
112122
var hint = s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}) */"
113123
if (part.useHiddenPKColumn)
114124
hint =
115125
s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}), opt_param('hidden_column_visible', 'true') */"
116126

117127
s"""
118128
|SELECT $hint $columnStr FROM ${config.getDbTable} ${part.partitionClause}
119-
|$whereClause ${part.limitOffsetClause}
129+
|$whereClause $getOrderByClause ${part.limitOffsetClause} $myLimitClause
120130
|""".stripMargin
121131
}
132+
133+
/**
134+
* Mapping between original SQL requirements and MySQL implementations:
135+
* ---------------------------------------------------------------------------------------------------
136+
* \| Original Requirement | MySQL Implementation | Resulting Order |
137+
* ---------------------------------------------------------------------------------------------------
138+
* | ORDER BY id ASC NULLS FIRST | ORDER BY id ASC (default behavior) | NULLs first → ASC non-nulls |
139+
* |:-----------------------------|:------------------------------------|:-----------------------------|
140+
* | ORDER BY id ASC NULLS LAST | ORDER BY id IS NULL, id ASC | ASC non-nulls → NULLs last |
141+
* | ORDER BY id DESC NULLS FIRST | ORDER BY id IS NULL DESC, id DESC | NULLs first → DESC non-nulls |
142+
* | ORDER BY id DESC NULLS LAST | ORDER BY id DESC (default behavior) | DESC non-nulls → NULLs last |
143+
* ---------------------------------------------------------------------------------------------------
144+
*
145+
* @return
146+
* MySQL-compatible ORDER BY clause
147+
*/
148+
private def getOrderByClause: String = {
149+
if (pushDownTopNSortOrders.nonEmpty) {
150+
val mysqlOrderBy = pushDownTopNSortOrders
151+
.map {
152+
sortOrder =>
153+
// Parse sort field name, direction, and null ordering rules (based on Spark's SortOrder)
154+
val field = dialect.quoteIdentifier(sortOrder.expression().describe())
155+
156+
// Generate sorting expressions according to MySQL's null handling characteristics
157+
(sortOrder.direction(), sortOrder.nullOrdering()) match {
158+
// Scenario: ASC + NULLS_LAST - Add IS NULL helper sort
159+
case (SortDirection.ASCENDING, NullOrdering.NULLS_LAST) =>
160+
s"$field IS NULL, $field ASC" // Prioritize non-NULL values
161+
// Scenario: DESC + NULLS_FIRST - Add IS NULL DESC helper sort
162+
case (SortDirection.DESCENDING, NullOrdering.NULLS_FIRST) =>
163+
s"$field IS NULL DESC, $field DESC" // Prioritize NULL values
164+
// Default sorting behavior for other cases
165+
case _ => s"$field ${sortOrder.direction().toString}"
166+
}
167+
}
168+
.mkString(", ")
169+
170+
// Info output of generated ORDER BY clause
171+
logInfo(s"Generated ORDER BY clause: $mysqlOrderBy")
172+
s" ORDER BY $mysqlOrderBy"
173+
} else {
174+
""
175+
}
176+
}
177+
122178
}
123179

124180
object OBJdbcReader extends SQLConfHelper {

0 commit comments

Comments
 (0)