Skip to content

Commit addf131

Browse files
committed
Enhancement: Supports setting query_timeout hint.
1 parent d915ea7 commit addf131

File tree

3 files changed

+47
-12
lines changed

3 files changed

+47
-12
lines changed

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,15 @@ public class OceanBaseConfig extends Config implements Serializable {
241241
.intConf()
242242
.createWithDefault(1);
243243

244+
// https://www.oceanbase.com/docs/enterprise-oceanbase-database-cn-10000000000881388
245+
public static final ConfigEntry<Integer> JDBC_QUERY_TIMEOUT_HINT_DEGREE =
246+
new ConfigBuilder("jdbc.query-timeout-hint-degree")
247+
.doc(
248+
"The SQL statements sent by Spark to OB will automatically carry query_timeout Hint. Set this parameter to specify the timeout in microseconds. The default is -1, which means query_timeout hint is not set.")
249+
.version(ConfigConstants.VERSION_1_2_0)
250+
.intConf()
251+
.createWithDefault(-1);
252+
244253
public static final ConfigEntry<Integer> JDBC_STATISTICS_PARALLEL_HINT_DEGREE =
245254
new ConfigBuilder("jdbc.statistics-parallel-hint-degree")
246255
.doc(
@@ -493,6 +502,10 @@ public Integer getJdbcParallelHintDegree() {
493502
return get(JDBC_PARALLEL_HINT_DEGREE);
494503
}
495504

505+
public Integer getQueryTimeoutHintDegree() {
506+
return get(JDBC_QUERY_TIMEOUT_HINT_DEGREE);
507+
}
508+
496509
public Integer getJdbcStatsParallelHintDegree() {
497510
return get(JDBC_STATISTICS_PARALLEL_HINT_DEGREE);
498511
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/reader/v2/OBJdbcReader.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,18 @@ class OBJdbcReader(
126126
""
127127
}
128128

129-
var hint = s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}) */"
130-
if (part.useHiddenPKColumn)
131-
hint =
132-
s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}), opt_param('hidden_column_visible', 'true') */"
129+
val useHiddenPKColumnHint = if (part.useHiddenPKColumn) {
130+
s", opt_param('hidden_column_visible', 'true') "
131+
} else {
132+
""
133+
}
134+
val queryTimeoutHint = if (config.getQueryTimeoutHintDegree > 0) {
135+
s", query_timeout(${config.getQueryTimeoutHintDegree}) "
136+
} else {
137+
""
138+
}
139+
val hint =
140+
s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}) $useHiddenPKColumnHint $queryTimeoutHint */"
133141

134142
s"""
135143
|SELECT $hint $columnStr FROM ${config.getDbTable} ${part.partitionClause}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/reader/v2/OBMySQLPartition.scala

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ object OBMySQLPartition extends Logging {
253253
val statement = connection.createStatement()
254254
val tableName = config.getDbTable
255255
val sql =
256-
s"SELECT /*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */ count(1) AS cnt FROM $tableName $partName"
256+
s"SELECT /*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) ${queryTimeoutHint(
257+
config)} */ count(1) AS cnt FROM $tableName $partName"
257258
try {
258259
val rs = statement.executeQuery(sql)
259260
if (rs.next())
@@ -386,10 +387,12 @@ object OBMySQLPartition extends Logging {
386387
priKeyColumnName: String) = {
387388
val statement = connection.createStatement()
388389
val tableName = config.getDbTable
389-
var hint = s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */"
390-
if (priKeyColumnName.equals(HIDDEN_PK_INCREMENT))
391-
hint =
392-
s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}), opt_param('hidden_column_visible', 'true') */"
390+
val useHiddenPKColHint =
391+
if (priKeyColumnName.equals(HIDDEN_PK_INCREMENT))
392+
s", opt_param('hidden_column_visible', 'true') "
393+
else EMPTY_STRING
394+
val hint =
395+
s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) $useHiddenPKColHint ${queryTimeoutHint(config)} */"
393396

394397
val sql =
395398
s"""
@@ -599,7 +602,8 @@ object OBMySQLPartition extends Logging {
599602
priKeyColumnName: String,
600603
config: OceanBaseConfig): Object = {
601604
val tableName = config.getDbTable
602-
val hint = s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */"
605+
val hint =
606+
s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) ${queryTimeoutHint(config)} */"
603607
val sql =
604608
s"""
605609
SELECT
@@ -629,7 +633,8 @@ object OBMySQLPartition extends Logging {
629633
priKeyColumnName: String,
630634
config: OceanBaseConfig): Object = {
631635
val tableName = config.getDbTable
632-
val hint = s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */"
636+
val hint =
637+
s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) ${queryTimeoutHint(config)} */"
633638
val sql =
634639
s"""
635640
SELECT $hint
@@ -657,7 +662,8 @@ object OBMySQLPartition extends Logging {
657662
priKeyColumnName: String) = {
658663
val statement = conn.createStatement()
659664
val tableName = config.getDbTable
660-
val hint = s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) */"
665+
val hint =
666+
s"/*+ PARALLEL(${config.getJdbcStatsParallelHintDegree}) ${queryTimeoutHint(config)} */"
661667
val sql =
662668
s"""
663669
SELECT $hint
@@ -682,6 +688,14 @@ object OBMySQLPartition extends Logging {
682688
obj1.toString.compareTo(obj2.toString)
683689
}
684690

691+
def queryTimeoutHint(config: OceanBaseConfig): String = if (
692+
config.getQueryTimeoutHintDegree > 0
693+
) {
694+
s", query_timeout(${config.getQueryTimeoutHintDegree}) "
695+
} else {
696+
""
697+
}
698+
685699
private case class UnevenlyPriKeyTableInfo(count: Long, min: Object, max: Object)
686700
}
687701

0 commit comments

Comments
 (0)