Skip to content

Commit d915ea7

Browse files
authored
feat: add WHERE clause partitioning support for string primary key tables (#58)
1 parent 4adf65e commit d915ea7

File tree

16 files changed

+629
-164
lines changed

16 files changed

+629
-164
lines changed

.github/workflows/push_pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
e2e-tests:
3030
strategy:
3131
matrix:
32-
spark_version: ["3.1","3.2","3.3","3.4","3.5"]
32+
spark_version: ["3.1","3.2","3.3","3.4","3.5.4"]
3333
uses: ./.github/workflows/test.yml
3434
with:
3535
module: spark-connector-oceanbase-e2e-tests

docs/spark-catalog-oceanbase.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,20 @@ select * from spark_catalog.default.orders;
332332
<td>Int</td>
333333
<td>Controls the parallelism level for statistical queries (e.g., COUNT, MIN, MAX) by adding /*+ PARALLEL(N) */ hint to generated SQL.</td>
334334
</tr>
335+
<tr>
336+
<td>spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition</td>
337+
<td>No</td>
338+
<td style="word-wrap: break-word;">false</td>
339+
<td>Boolean</td>
340+
<td>When true, primary key tables will be prohibited from using WHERE clause partitioning.</td>
341+
</tr>
342+
<tr>
343+
<td>spark.sql.catalog.your_catalog_name.jdbc.{database}.{table}.partition-column</td>
344+
<td>No</td>
345+
<td style="word-wrap: break-word;"></td>
346+
<td>String</td>
347+
<td>You can manually specify the primary key table partition column, and by default, one will be automatically selected from the primary key columns.</td>
348+
</tr>
335349
<tr>
336350
<td>spark.sql.catalog.your_catalog_name.string-as-varchar-length</td>
337351
<td>No</td>

docs/spark-catalog-oceanbase_cn.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,20 @@ select * from spark_catalog.default.orders;
331331
<td>Int</td>
332332
<td>通过向生成的 SQL 添加 /*+ PARALLEL(N) */ hint 来控制统计查询(例如 COUNT、MIN、MAX)的并行级别。</td>
333333
</tr>
334+
<tr>
335+
<td>spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition</td>
336+
<td>否</td>
337+
<td style="word-wrap: break-word;">false</td>
338+
<td>Boolean</td>
339+
<td>如果为true,则将禁止主键表使用 where 子句进行分区。</td>
340+
</tr>
341+
<tr>
342+
<td>spark.sql.catalog.your_catalog_name.jdbc.{database}.{table}.partition-column</td>
343+
<td>否</td>
344+
<td style="word-wrap: break-word;"></td>
345+
<td>String</td>
346+
<td>您可以手动指定主键表分区列,否则将默认自动从主键列中选择一个。</td>
347+
</tr>
334348
<tr>
335349
<td>spark.sql.catalog.your_catalog_name.string-as-varchar-length</td>
336350
<td>否</td>

spark-connector-oceanbase/spark-connector-oceanbase-base/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ under the License.
4747
<artifactId>scala-compiler</artifactId>
4848
<version>${scala.version}</version>
4949
</dependency>
50+
<!-- https://mvnrepository.com/artifact/org.scala-lang.modules/scala-parallel-collections -->
51+
<dependency>
52+
<groupId>org.scala-lang.modules</groupId>
53+
<artifactId>scala-parallel-collections_2.13</artifactId>
54+
<version>1.2.0</version>
55+
</dependency>
5056

5157
<dependency>
5258
<groupId>org.apache.spark</groupId>

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.oceanbase.spark.config;
1818

19+
import com.oceanbase.spark.dialect.OceanBaseDialect;
20+
1921
import java.io.Serializable;
2022
import java.time.Duration;
2123
import java.util.Map;
@@ -278,14 +280,22 @@ public class OceanBaseConfig extends Config implements Serializable {
278280
.booleanConf()
279281
.createWithDefault(true);
280282

281-
public static final ConfigEntry<Boolean> ENABLE_ONLY_AUTO_INC_USE_WHERE_PARTITION =
282-
new ConfigBuilder("jdbc.enable-only-auto-inc-use-where-partition")
283+
public static final ConfigEntry<Boolean> DISABLE_PK_TABLE_USE_WHERE_PARTITION =
284+
new ConfigBuilder("jdbc.disable-pk-table-use-where-partition")
283285
.doc(
284-
"Only auto-increment primary key columns are allowed to use the where partitioning method.")
286+
"When true, primary key tables will be prohibited from using WHERE clause partitioning.")
285287
.version(ConfigConstants.VERSION_1_2_0)
286288
.booleanConf()
287289
.createWithDefault(false);
288290

291+
public static final ConfigEntry<String> SPECIFY_PK_TABLE_PARTITION_COLUMN =
292+
new ConfigBuilder("jdbc.%s.partition-column")
293+
.doc(
294+
"You can manually specify the primary key table partition column, and by default, one will be automatically selected from the primary key columns.")
295+
.version(ConfigConstants.VERSION_1_2_0)
296+
.stringConf()
297+
.create();
298+
289299
public static final ConfigEntry<Integer> THE_LENGTH_STRING_TO_VARCHAR_TABLE_CREATE =
290300
new ConfigBuilder("string-as-varchar-length")
291301
.doc(
@@ -447,8 +457,8 @@ public Boolean getEnablePushdownTopN() {
447457
return get(JDBC_ENABLE_PUSH_DOWN_TOP_N);
448458
}
449459

450-
public Boolean getEnableOnlyAutoIncUseWherePartition() {
451-
return get(ENABLE_ONLY_AUTO_INC_USE_WHERE_PARTITION);
460+
public Boolean getDisableIntPkTableUseWherePartition() {
461+
return get(DISABLE_PK_TABLE_USE_WHERE_PARTITION);
452462
}
453463

454464
public Integer getLengthString2Varchar() {
@@ -490,4 +500,15 @@ public Integer getJdbcStatsParallelHintDegree() {
490500
public Optional<Long> getJdbcMaxRecordsPrePartition() {
491501
return Optional.ofNullable(get(JDBC_MAX_RECORDS_PER_PARTITION));
492502
}
503+
504+
public Optional<String> getJdbcReaderPartitionColumn(OceanBaseDialect dialect) {
505+
String configPartCol =
506+
String.format(
507+
SPECIFY_PK_TABLE_PARTITION_COLUMN.getKey(),
508+
dialect.unQuoteIdentifier(getDbTable()));
509+
return getProperties().entrySet().stream()
510+
.filter(entry -> entry.getKey().contains(configPartCol))
511+
.map(entry -> dialect.quoteIdentifier(entry.getValue()))
512+
.findFirst();
513+
}
493514
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseUserInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ public void setTenant(String tenant) {
5555
this.tenant = tenant;
5656
}
5757

58+
/**
59+
* Parses the user information from the OceanBase configuration. Supports multiple username
60+
* formats: - user@tenant#cluster - cluster:tenant:user - user@tenant - only user
61+
*
62+
* @param oceanBaseConfig The OceanBase configuration containing the username.
63+
* @return A new OceanBaseUserInfo object with the parsed information.
64+
*/
5865
public static OceanBaseUserInfo parse(OceanBaseConfig oceanBaseConfig) {
5966
final String username = oceanBaseConfig.getUsername();
6067
final String sepUserAtTenant = "@";

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/dialect/OceanBaseDialect.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,12 @@ abstract class OceanBaseDialect extends Logging with Serializable {
159159
s"""`$colName`"""
160160
}
161161

162+
def unQuoteIdentifier(colName: String): String = {
163+
colName.replace("`", "")
164+
}
165+
162166
def getPriKeyInfo(
167+
connection: Connection,
163168
schemaName: String,
164169
tableName: String,
165170
config: OceanBaseConfig): ArrayBuffer[PriKeyColumnInfo]

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/dialect/OceanBaseMySQLDialect.scala

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ class OceanBaseMySQLDialect extends OceanBaseDialect {
178178
}
179179

180180
def getPriKeyInfo(
181+
connection: Connection,
181182
schemaName: String,
182183
tableName: String,
183184
config: OceanBaseConfig): ArrayBuffer[PriKeyColumnInfo] = {
@@ -192,26 +193,23 @@ class OceanBaseMySQLDialect extends OceanBaseDialect {
192193
| and TABLE_NAME = '$tableName';
193194
|""".stripMargin
194195

195-
OBJdbcUtils.withConnection(config) {
196-
val arrayBuffer = ArrayBuffer[PriKeyColumnInfo]()
197-
conn =>
198-
OBJdbcUtils.executeQuery(conn, config, sql) {
199-
rs =>
200-
{
201-
while (rs.next()) {
202-
val columnKey = rs.getString(3)
203-
if (null != columnKey && columnKey.equals("PRI")) {
204-
arrayBuffer += PriKeyColumnInfo(
205-
quoteIdentifier(rs.getString(1)),
206-
rs.getString(2),
207-
columnKey,
208-
rs.getString(4),
209-
rs.getString(5))
210-
}
211-
}
196+
val arrayBuffer = ArrayBuffer[PriKeyColumnInfo]()
197+
OBJdbcUtils.executeQuery(connection, config, sql) {
198+
rs =>
199+
{
200+
while (rs.next()) {
201+
val columnKey = rs.getString(3)
202+
if (null != columnKey && columnKey.equals("PRI")) {
203+
arrayBuffer += PriKeyColumnInfo(
204+
quoteIdentifier(rs.getString(1)),
205+
rs.getString(2),
206+
columnKey,
207+
rs.getString(4),
208+
rs.getString(5))
212209
}
210+
}
211+
arrayBuffer
213212
}
214-
arrayBuffer
215213
}
216214
}
217215

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/dialect/OceanBaseOracleDialect.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ class OceanBaseOracleDialect extends OceanBaseDialect {
3434
s""""$colName""""
3535
}
3636

37+
override def unQuoteIdentifier(colName: String): String = {
38+
colName.replace("\"", "")
39+
}
40+
3741
override def createTable(
3842
conn: Connection,
3943
tableName: String,
@@ -66,6 +70,7 @@ class OceanBaseOracleDialect extends OceanBaseDialect {
6670
"Not currently supported in oracle mode")
6771

6872
override def getPriKeyInfo(
73+
connection: Connection,
6974
schemaName: String,
7075
tableName: String,
7176
config: OceanBaseConfig): ArrayBuffer[PriKeyColumnInfo] = {

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/reader/v2/OBJdbcReader.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ class OBJdbcReader(
5454
private lazy val stmt: PreparedStatement =
5555
conn.prepareStatement(buildQuerySql(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
5656
private lazy val rs: ResultSet = {
57+
partition match {
58+
case part: OBMySQLPartition =>
59+
part.unevenlyWhereValue.zipWithIndex.foreach {
60+
case (value, index) => stmt.setObject(index + 1, value)
61+
}
62+
case _ =>
63+
}
5764
stmt.setFetchSize(config.getJdbcFetchSize)
5865
stmt.setQueryTimeout(config.getJdbcQueryTimeout)
5966
stmt.executeQuery()

0 commit comments

Comments
 (0)