Skip to content

feat: add WHERE clause partitioning support for string primary key tables #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
e2e-tests:
strategy:
matrix:
spark_version: ["3.1","3.2","3.3","3.4","3.5"]
spark_version: ["3.1","3.2","3.3","3.4","3.5.4"]
uses: ./.github/workflows/test.yml
with:
module: spark-connector-oceanbase-e2e-tests
Expand Down
14 changes: 14 additions & 0 deletions docs/spark-catalog-oceanbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,20 @@ select * from spark_catalog.default.orders;
<td>Int</td>
<td>Controls the parallelism level for statistical queries (e.g., COUNT, MIN, MAX) by adding /*+ PARALLEL(N) */ hint to generated SQL.</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition</td>
<td>No</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When true, primary key tables will be prohibited from using WHERE clause partitioning.</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.jdbc.{database}.{table}.partition-column</td>
<td>No</td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>You can manually specify the primary key table partition column, and by default, one will be automatically selected from the primary key columns.</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.string-as-varchar-length</td>
<td>No</td>
Expand Down
14 changes: 14 additions & 0 deletions docs/spark-catalog-oceanbase_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,20 @@ select * from spark_catalog.default.orders;
<td>Int</td>
<td>通过向生成的 SQL 添加 /*+ PARALLEL(N) */ hint 来控制统计查询(例如 COUNT、MIN、MAX)的并行级别。</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition</td>
<td>否</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>如果为true,则将禁止主键表使用 where 子句进行分区。</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.jdbc.{database}.{table}.partition-column</td>
<td>否</td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>您可以手动指定主键表分区列,否则将默认自动从主键列中选择一个。</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.string-as-varchar-length</td>
<td>否</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ under the License.
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang.modules/scala-parallel-collections -->
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_2.13</artifactId>
<version>1.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.oceanbase.spark.config;

import com.oceanbase.spark.dialect.OceanBaseDialect;

import java.io.Serializable;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -278,14 +280,22 @@ public class OceanBaseConfig extends Config implements Serializable {
.booleanConf()
.createWithDefault(true);

public static final ConfigEntry<Boolean> ENABLE_ONLY_AUTO_INC_USE_WHERE_PARTITION =
new ConfigBuilder("jdbc.enable-only-auto-inc-use-where-partition")
public static final ConfigEntry<Boolean> DISABLE_PK_TABLE_USE_WHERE_PARTITION =
new ConfigBuilder("jdbc.disable-pk-table-use-where-partition")
.doc(
"Only auto-increment primary key columns are allowed to use the where partitioning method.")
"When true, primary key tables will be prohibited from using WHERE clause partitioning.")
.version(ConfigConstants.VERSION_1_2_0)
.booleanConf()
.createWithDefault(false);

public static final ConfigEntry<String> SPECIFY_PK_TABLE_PARTITION_COLUMN =
new ConfigBuilder("jdbc.%s.partition-column")
.doc(
"You can manually specify the primary key table partition column, and by default, one will be automatically selected from the primary key columns.")
.version(ConfigConstants.VERSION_1_2_0)
.stringConf()
.create();

public static final ConfigEntry<Integer> THE_LENGTH_STRING_TO_VARCHAR_TABLE_CREATE =
new ConfigBuilder("string-as-varchar-length")
.doc(
Expand Down Expand Up @@ -447,8 +457,8 @@ public Boolean getEnablePushdownTopN() {
return get(JDBC_ENABLE_PUSH_DOWN_TOP_N);
}

public Boolean getEnableOnlyAutoIncUseWherePartition() {
return get(ENABLE_ONLY_AUTO_INC_USE_WHERE_PARTITION);
public Boolean getDisableIntPkTableUseWherePartition() {
return get(DISABLE_PK_TABLE_USE_WHERE_PARTITION);
}

public Integer getLengthString2Varchar() {
Expand Down Expand Up @@ -490,4 +500,15 @@ public Integer getJdbcStatsParallelHintDegree() {
public Optional<Long> getJdbcMaxRecordsPrePartition() {
return Optional.ofNullable(get(JDBC_MAX_RECORDS_PER_PARTITION));
}

public Optional<String> getJdbcReaderPartitionColumn(OceanBaseDialect dialect) {
String configPartCol =
String.format(
SPECIFY_PK_TABLE_PARTITION_COLUMN.getKey(),
dialect.unQuoteIdentifier(getDbTable()));
return getProperties().entrySet().stream()
.filter(entry -> entry.getKey().contains(configPartCol))
.map(entry -> dialect.quoteIdentifier(entry.getValue()))
.findFirst();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public void setTenant(String tenant) {
this.tenant = tenant;
}

/**
* Parses the user information from the OceanBase configuration. Supports multiple username
* formats: - user@tenant#cluster - cluster:tenant:user - user@tenant - only user
*
* @param oceanBaseConfig The OceanBase configuration containing the username.
* @return A new OceanBaseUserInfo object with the parsed information.
*/
public static OceanBaseUserInfo parse(OceanBaseConfig oceanBaseConfig) {
final String username = oceanBaseConfig.getUsername();
final String sepUserAtTenant = "@";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ abstract class OceanBaseDialect extends Logging with Serializable {
s"""`$colName`"""
}

def unQuoteIdentifier(colName: String): String = {
colName.replace("`", "")
}

def getPriKeyInfo(
connection: Connection,
schemaName: String,
tableName: String,
config: OceanBaseConfig): ArrayBuffer[PriKeyColumnInfo]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class OceanBaseMySQLDialect extends OceanBaseDialect {
}

def getPriKeyInfo(
connection: Connection,
schemaName: String,
tableName: String,
config: OceanBaseConfig): ArrayBuffer[PriKeyColumnInfo] = {
Expand All @@ -192,26 +193,23 @@ class OceanBaseMySQLDialect extends OceanBaseDialect {
| and TABLE_NAME = '$tableName';
|""".stripMargin

OBJdbcUtils.withConnection(config) {
val arrayBuffer = ArrayBuffer[PriKeyColumnInfo]()
conn =>
OBJdbcUtils.executeQuery(conn, config, sql) {
rs =>
{
while (rs.next()) {
val columnKey = rs.getString(3)
if (null != columnKey && columnKey.equals("PRI")) {
arrayBuffer += PriKeyColumnInfo(
quoteIdentifier(rs.getString(1)),
rs.getString(2),
columnKey,
rs.getString(4),
rs.getString(5))
}
}
val arrayBuffer = ArrayBuffer[PriKeyColumnInfo]()
OBJdbcUtils.executeQuery(connection, config, sql) {
rs =>
{
while (rs.next()) {
val columnKey = rs.getString(3)
if (null != columnKey && columnKey.equals("PRI")) {
arrayBuffer += PriKeyColumnInfo(
quoteIdentifier(rs.getString(1)),
rs.getString(2),
columnKey,
rs.getString(4),
rs.getString(5))
}
}
arrayBuffer
}
arrayBuffer
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class OceanBaseOracleDialect extends OceanBaseDialect {
s""""$colName""""
}

override def unQuoteIdentifier(colName: String): String = {
colName.replace("\"", "")
}

override def createTable(
conn: Connection,
tableName: String,
Expand Down Expand Up @@ -66,6 +70,7 @@ class OceanBaseOracleDialect extends OceanBaseDialect {
"Not currently supported in oracle mode")

override def getPriKeyInfo(
connection: Connection,
schemaName: String,
tableName: String,
config: OceanBaseConfig): ArrayBuffer[PriKeyColumnInfo] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ class OBJdbcReader(
private lazy val stmt: PreparedStatement =
conn.prepareStatement(buildQuerySql(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
private lazy val rs: ResultSet = {
partition match {
case part: OBMySQLPartition =>
part.unevenlyWhereValue.zipWithIndex.foreach {
case (value, index) => stmt.setObject(index + 1, value)
}
case _ =>
}
stmt.setFetchSize(config.getJdbcFetchSize)
stmt.setQueryTimeout(config.getJdbcQueryTimeout)
stmt.executeQuery()
Expand Down
Loading