Skip to content

Commit 205ee07

Browse files
authored
Enhancement: Supports setting query_timeout hint. (#60)
1 parent 81d6310 commit 205ee07

File tree

5 files changed

+61
-12
lines changed

5 files changed

+61
-12
lines changed

docs/spark-catalog-oceanbase.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,13 @@ select * from spark_catalog.default.orders;
332332
<td>Int</td>
333333
<td>Controls the parallelism level for statistical queries (e.g., COUNT, MIN, MAX) by adding /*+ PARALLEL(N) */ hint to generated SQL.</td>
334334
</tr>
335+
<tr>
336+
<td>spark.sql.catalog.your_catalog_name.jdbc.query-timeout-hint-degree</td>
337+
<td>否</td>
338+
<td style="word-wrap: break-word;">-1</td>
339+
<td>Int</td>
340+
<td>Control the query timeout by adding /*+ query_timeout(N) */ hint to the generated SQL. This parameter can be used to specify the timeout in microseconds. The default value is -1, which means that the hint is not added.</td>
341+
</tr>
335342
<tr>
336343
<td>spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition</td>
337344
<td>No</td>

docs/spark-catalog-oceanbase_cn.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,13 @@ select * from spark_catalog.default.orders;
331331
<td>Int</td>
332332
<td>通过向生成的 SQL 添加 /*+ PARALLEL(N) */ hint 来控制统计查询(例如 COUNT、MIN、MAX)的并行级别。</td>
333333
</tr>
334+
<tr>
335+
<td>spark.sql.catalog.your_catalog_name.jdbc.query-timeout-hint-degree</td>
336+
<td>否</td>
337+
<td style="word-wrap: break-word;">-1</td>
338+
<td>Int</td>
339+
<td>通过向生成的 SQL 添加 /*+ query_timeout(n) */ hint 来控制查询超时时间。通过该参数可以设置超时时间,单位为微妙。默认为-1,表示不添加该Hint。</td>
340+
</tr>
334341
<tr>
335342
<td>spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition</td>
336343
<td>否</td>

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,15 @@ public class OceanBaseConfig extends Config implements Serializable {
241241
.intConf()
242242
.createWithDefault(1);
243243

244+
// https://www.oceanbase.com/docs/enterprise-oceanbase-database-cn-10000000000881388
245+
public static final ConfigEntry<Integer> JDBC_QUERY_TIMEOUT_HINT_DEGREE =
246+
new ConfigBuilder("jdbc.query-timeout-hint-degree")
247+
.doc(
248+
"The SQL statements sent by Spark to OB will automatically carry query_timeout Hint. Set this parameter to specify the timeout in microseconds. The default is -1, which means query_timeout hint is not set.")
249+
.version(ConfigConstants.VERSION_1_2_0)
250+
.intConf()
251+
.createWithDefault(-1);
252+
244253
public static final ConfigEntry<Integer> JDBC_STATISTICS_PARALLEL_HINT_DEGREE =
245254
new ConfigBuilder("jdbc.statistics-parallel-hint-degree")
246255
.doc(
@@ -493,6 +502,10 @@ public Integer getJdbcParallelHintDegree() {
493502
return get(JDBC_PARALLEL_HINT_DEGREE);
494503
}
495504

505+
public Integer getQueryTimeoutHintDegree() {
506+
return get(JDBC_QUERY_TIMEOUT_HINT_DEGREE);
507+
}
508+
496509
public Integer getJdbcStatsParallelHintDegree() {
497510
return get(JDBC_STATISTICS_PARALLEL_HINT_DEGREE);
498511
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/reader/v2/OBJdbcReader.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,18 @@ class OBJdbcReader(
126126
""
127127
}
128128

129-
var hint = s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}) */"
130-
if (part.useHiddenPKColumn)
131-
hint =
132-
s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}), opt_param('hidden_column_visible', 'true') */"
129+
val useHiddenPKColumnHint = if (part.useHiddenPKColumn) {
130+
s", opt_param('hidden_column_visible', 'true') "
131+
} else {
132+
""
133+
}
134+
val queryTimeoutHint = if (config.getQueryTimeoutHintDegree > 0) {
135+
s", query_timeout(${config.getQueryTimeoutHintDegree}) "
136+
} else {
137+
""
138+
}
139+
val hint =
140+
s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}) $useHiddenPKColumnHint $queryTimeoutHint */"
133141

134142
s"""
135143
|SELECT $hint $columnStr FROM ${config.getDbTable} ${part.partitionClause}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/reader/v2/OBMySQLPartition.scala

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ object OBMySQLPartition extends Logging {
253253
val statement = connection.createStatement()
254254
val tableName = config.getDbTable
255255
val sql =
256-
s"SELECT /*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */ count(1) AS cnt FROM $tableName $partName"
256+
s"SELECT /*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) ${queryTimeoutHint(
257+
config)} */ count(1) AS cnt FROM $tableName $partName"
257258
try {
258259
val rs = statement.executeQuery(sql)
259260
if (rs.next())
@@ -386,10 +387,12 @@ object OBMySQLPartition extends Logging {
386387
priKeyColumnName: String) = {
387388
val statement = connection.createStatement()
388389
val tableName = config.getDbTable
389-
var hint = s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */"
390-
if (priKeyColumnName.equals(HIDDEN_PK_INCREMENT))
391-
hint =
392-
s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}), opt_param('hidden_column_visible', 'true') */"
390+
val useHiddenPKColHint =
391+
if (priKeyColumnName.equals(HIDDEN_PK_INCREMENT))
392+
s", opt_param('hidden_column_visible', 'true') "
393+
else EMPTY_STRING
394+
val hint =
395+
s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) $useHiddenPKColHint ${queryTimeoutHint(config)} */"
393396

394397
val sql =
395398
s"""
@@ -599,7 +602,8 @@ object OBMySQLPartition extends Logging {
599602
priKeyColumnName: String,
600603
config: OceanBaseConfig): Object = {
601604
val tableName = config.getDbTable
602-
val hint = s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */"
605+
val hint =
606+
s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) ${queryTimeoutHint(config)} */"
603607
val sql =
604608
s"""
605609
SELECT
@@ -629,7 +633,8 @@ object OBMySQLPartition extends Logging {
629633
priKeyColumnName: String,
630634
config: OceanBaseConfig): Object = {
631635
val tableName = config.getDbTable
632-
val hint = s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */"
636+
val hint =
637+
s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) ${queryTimeoutHint(config)} */"
633638
val sql =
634639
s"""
635640
SELECT $hint
@@ -657,7 +662,8 @@ object OBMySQLPartition extends Logging {
657662
priKeyColumnName: String) = {
658663
val statement = conn.createStatement()
659664
val tableName = config.getDbTable
660-
val hint = s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */"
665+
val hint =
666+
s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) ${queryTimeoutHint(config)} */"
661667
val sql =
662668
s"""
663669
SELECT $hint
@@ -682,6 +688,14 @@ object OBMySQLPartition extends Logging {
682688
obj1.toString.compareTo(obj2.toString)
683689
}
684690

691+
def queryTimeoutHint(config: OceanBaseConfig): String = if (
692+
config.getQueryTimeoutHintDegree > 0
693+
) {
694+
s", query_timeout(${config.getQueryTimeoutHintDegree}) "
695+
} else {
696+
""
697+
}
698+
685699
private case class UnevenlyPriKeyTableInfo(count: Long, min: Object, max: Object)
686700
}
687701

0 commit comments

Comments
 (0)