Skip to content

Commit f4feecd

Browse files
authored
Enhancement: Refactor the configuration system in the catalog implementation. (#27)
1 parent f213f4f commit f4feecd

File tree

26 files changed

+736
-377
lines changed

26 files changed

+736
-377
lines changed

spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/Config.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public abstract class Config implements Serializable {
3232

3333
private static final Logger LOG = LoggerFactory.getLogger(Config.class);
3434

35-
private final ConcurrentMap<String, String> configMap;
35+
protected final ConcurrentMap<String, String> configMap;
3636

3737
private final Map<String, DeprecatedConfig> deprecatedConfigMap;
3838

spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ public interface ConfigConstants {
2828
String POSITIVE_NUMBER_ERROR_MSG = "The value must be a positive number";
2929

3030
String VERSION_1_0_0 = "1.0";
31+
String VERSION_1_1_0 = "1.1";
3132
}

spark-connector-oceanbase/spark-connector-oceanbase-3.1/src/main/scala/com/oceanbase/spark/reader/v2/OBJdbcReader.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.oceanbase.spark.reader.v2
1818

19+
import com.oceanbase.spark.config.OceanBaseConfig
1920
import com.oceanbase.spark.dialect.OceanBaseDialect
2021
import com.oceanbase.spark.reader.v2.OBJdbcReader.{makeGetters, OBValueGetter}
2122
import com.oceanbase.spark.utils.OBJdbcUtils
@@ -26,7 +27,6 @@ import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
2627
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
2728
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
2829
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
29-
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
3030
import org.apache.spark.sql.sources.Filter
3131
import org.apache.spark.sql.types._
3232
import org.apache.spark.unsafe.types.UTF8String
@@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit
3737

3838
class OBJdbcReader(
3939
schema: StructType,
40-
options: JDBCOptions,
40+
config: OceanBaseConfig,
4141
partition: InputPartition,
4242
pushedFilter: Array[Filter],
4343
dialect: OceanBaseDialect)
@@ -47,12 +47,12 @@ class OBJdbcReader(
4747

4848
private val getters: Array[OBValueGetter] = makeGetters(schema)
4949
private val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))
50-
private lazy val conn = OBJdbcUtils.getConnection(options)
50+
private lazy val conn = OBJdbcUtils.getConnection(config)
5151
private lazy val stmt: PreparedStatement =
5252
conn.prepareStatement(buildQuerySql(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
5353
private lazy val rs: ResultSet = {
54-
stmt.setFetchSize(options.fetchSize)
55-
stmt.setQueryTimeout(options.queryTimeout)
54+
stmt.setFetchSize(config.getJdbcFetchSize)
55+
stmt.setQueryTimeout(config.getJdbcQueryTimeout)
5656
stmt.executeQuery()
5757
}
5858

@@ -105,7 +105,7 @@ class OBJdbcReader(
105105
}
106106
val part: OBMySQLPartition = partition.asInstanceOf[OBMySQLPartition]
107107
s"""
108-
|SELECT $columnStr FROM ${options.tableOrQuery} ${part.partitionClause}
108+
|SELECT $columnStr FROM ${config.getDbTable} ${part.partitionClause}
109109
|$whereClause ${part.limitOffsetClause}
110110
|""".stripMargin
111111
}

spark-connector-oceanbase/spark-connector-oceanbase-3.1/src/main/scala/com/oceanbase/spark/reader/v2/OBJdbcScanBuilder.scala

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,21 @@
1616

1717
package com.oceanbase.spark.reader.v2
1818

19+
import com.oceanbase.spark.config.OceanBaseConfig
1920
import com.oceanbase.spark.dialect.OceanBaseDialect
2021

2122
import org.apache.spark.internal.Logging
2223
import org.apache.spark.sql.ExprUtils.compileFilter
2324
import org.apache.spark.sql.catalyst.InternalRow
24-
import org.apache.spark.sql.connector.expressions.NamedReference
2525
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
26-
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
2726
import org.apache.spark.sql.sources.Filter
2827
import org.apache.spark.sql.types.StructType
2928

3029
/**
3130
* This is for compatibility with Spark 3.1, which does not support the SupportsPushDownAggregates
3231
* feature.
3332
*/
34-
case class OBJdbcScanBuilder(
35-
schema: StructType,
36-
jdbcOptions: JDBCOptions,
37-
dialect: OceanBaseDialect)
33+
case class OBJdbcScanBuilder(schema: StructType, config: OceanBaseConfig, dialect: OceanBaseDialect)
3834
extends ScanBuilder
3935
with SupportsPushDownFilters
4036
with SupportsPushDownRequiredColumns
@@ -59,14 +55,14 @@ case class OBJdbcScanBuilder(
5955
override def build(): Scan =
6056
OBJdbcBatchScan(
6157
finalSchema: StructType,
62-
jdbcOptions: JDBCOptions,
58+
config: OceanBaseConfig,
6359
pushedFilter: Array[Filter],
6460
dialect: OceanBaseDialect)
6561
}
6662

6763
case class OBJdbcBatchScan(
6864
schema: StructType,
69-
jdbcOptions: JDBCOptions,
65+
config: OceanBaseConfig,
7066
pushedFilter: Array[Filter],
7167
dialect: OceanBaseDialect)
7268
extends Scan {
@@ -76,40 +72,40 @@ case class OBJdbcBatchScan(
7672
override def toBatch: Batch =
7773
new OBJdbcBatch(
7874
schema: StructType,
79-
jdbcOptions: JDBCOptions,
75+
config: OceanBaseConfig,
8076
pushedFilter: Array[Filter],
8177
dialect: OceanBaseDialect)
8278
}
8379

8480
class OBJdbcBatch(
8581
schema: StructType,
86-
jdbcOptions: JDBCOptions,
82+
config: OceanBaseConfig,
8783
pushedFilter: Array[Filter],
8884
dialect: OceanBaseDialect)
8985
extends Batch {
9086
private lazy val inputPartitions: Array[InputPartition] =
91-
OBMySQLPartition.columnPartition(jdbcOptions)
87+
OBMySQLPartition.columnPartition(config)
9288

9389
override def planInputPartitions(): Array[InputPartition] = inputPartitions
9490

9591
override def createReaderFactory(): PartitionReaderFactory = new OBJdbcReaderFactory(
9692
schema: StructType,
97-
jdbcOptions: JDBCOptions,
93+
config: OceanBaseConfig,
9894
pushedFilter: Array[Filter],
9995
dialect: OceanBaseDialect)
10096
}
10197

10298
class OBJdbcReaderFactory(
10399
schema: StructType,
104-
jdbcOptions: JDBCOptions,
100+
config: OceanBaseConfig,
105101
pushedFilter: Array[Filter],
106102
dialect: OceanBaseDialect)
107103
extends PartitionReaderFactory {
108104

109105
override def createReader(partition: InputPartition): PartitionReader[InternalRow] =
110106
new OBJdbcReader(
111107
schema: StructType,
112-
jdbcOptions: JDBCOptions,
108+
config: OceanBaseConfig,
113109
partition: InputPartition,
114110
pushedFilter: Array[Filter],
115111
dialect: OceanBaseDialect)

spark-connector-oceanbase/spark-connector-oceanbase-3.1/src/main/scala/com/oceanbase/spark/writer/v2/DirectLoadWriteBuilderV2.scala

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,18 @@
1515
*/
1616
package com.oceanbase.spark.writer.v2
1717

18-
import com.oceanbase.spark.catalog.OceanBaseCatalog
1918
import com.oceanbase.spark.config.OceanBaseConfig
2019
import com.oceanbase.spark.directload.{DirectLoader, DirectLoadUtils}
2120

2221
import org.apache.spark.sql.catalyst.InternalRow
2322
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, PhysicalWriteInfo, WriteBuilder, WriterCommitMessage}
24-
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
2523
import org.apache.spark.sql.types.StructType
2624

27-
import scala.collection.JavaConverters.mapAsJavaMapConverter
28-
2925
/** Direct-load writing implementation based on Spark DataSource V2 API. */
30-
case class DirectLoadWriteBuilderV2(schema: StructType, options: JDBCOptions) extends WriteBuilder {
26+
case class DirectLoadWriteBuilderV2(schema: StructType, config: OceanBaseConfig)
27+
extends WriteBuilder {
3128
override def buildForBatch(): BatchWrite = {
32-
val map = options.parameters ++ Map(
33-
OceanBaseConfig.SCHEMA_NAME.getKey -> options.parameters(
34-
OceanBaseCatalog.CURRENT_DATABASE
35-
),
36-
OceanBaseConfig.TABLE_NAME.getKey -> options.parameters(
37-
OceanBaseCatalog.CURRENT_TABLE
38-
)
39-
)
40-
new DirectLoadBatchWrite(schema, new OceanBaseConfig(map.asJava))
29+
new DirectLoadBatchWrite(schema, config)
4130
}
4231
}
4332

spark-connector-oceanbase/spark-connector-oceanbase-3.1/src/main/scala/com/oceanbase/spark/writer/v2/JDBCWriteBuilder.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.oceanbase.spark.writer.v2
1717

18+
import com.oceanbase.spark.config.OceanBaseConfig
1819
import com.oceanbase.spark.dialect.OceanBaseDialect
1920

2021
import org.apache.spark.sql.catalyst.InternalRow
@@ -24,16 +25,16 @@ import org.apache.spark.sql.types.StructType
2425

2526
class JDBCWriteBuilder(
2627
schema: StructType,
27-
option: JDBCOptions,
28+
config: OceanBaseConfig,
2829
dialect: OceanBaseDialect
2930
) extends WriteBuilder {
3031
override def buildForBatch(): BatchWrite =
31-
new JDBCBatchWrite(schema, option, dialect)
32+
new JDBCBatchWrite(schema, config, dialect)
3233
}
3334

3435
class JDBCBatchWrite(
3536
schema: StructType,
36-
option: JDBCOptions,
37+
config: OceanBaseConfig,
3738
dialect: OceanBaseDialect
3839
) extends BatchWrite
3940
with DataWriterFactory {
@@ -50,6 +51,6 @@ class JDBCBatchWrite(
5051
partitionId: Int,
5152
taskId: Long
5253
): DataWriter[InternalRow] = {
53-
new JDBCWriter(schema: StructType, option: JDBCOptions, dialect)
54+
new JDBCWriter(schema: StructType, config, dialect)
5455
}
5556
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,66 @@ public class OceanBaseConfig extends Config implements Serializable {
172172
.booleanConf()
173173
.createWithDefault(false);
174174

175+
// ======== JDBC Related =========
176+
public static final ConfigEntry<String> DRIVER =
177+
new ConfigBuilder("driver")
178+
.doc("The class name of the JDBC driver to use to connect to this URL.")
179+
.version(ConfigConstants.VERSION_1_1_0)
180+
.stringConf()
181+
.create();
182+
183+
public static final ConfigEntry<Integer> JDBC_QUERY_TIMEOUT =
184+
new ConfigBuilder("jdbc.query-timeout")
185+
.doc(
186+
"The number of seconds the driver will wait for a Statement object to execute to the given number of seconds. Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout.")
187+
.version(ConfigConstants.VERSION_1_1_0)
188+
.intConf()
189+
.checkValue(value -> value >= 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
190+
.createWithDefault(0);
191+
192+
public static final ConfigEntry<Integer> JDBC_FETCH_SIZE =
193+
new ConfigBuilder("jdbc.fetch-size")
194+
.doc(
195+
"The JDBC fetch size, which determines how many rows to fetch per round trip.")
196+
.version(ConfigConstants.VERSION_1_1_0)
197+
.intConf()
198+
.checkValue(value -> value >= 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
199+
.createWithDefault(100);
200+
201+
public static final ConfigEntry<Integer> JDBC_BATCH_SIZE =
202+
new ConfigBuilder("jdbc.batch-size")
203+
.doc(
204+
"The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing.")
205+
.version(ConfigConstants.VERSION_1_1_0)
206+
.intConf()
207+
.checkValue(value -> value >= 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
208+
.createWithDefault(1024);
209+
210+
public static final ConfigEntry<Boolean> JDBC_PUSH_DOWN_PREDICATE =
211+
new ConfigBuilder("jdbc.pushDownPredicate")
212+
.doc(
213+
"The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible.")
214+
.version(ConfigConstants.VERSION_1_1_0)
215+
.booleanConf()
216+
.createWithDefault(false);
217+
218+
public static final String DB_TABLE = "dbTable";
219+
public static final String TABLE_COMMENT = "tableComment";
220+
public static final String ENABLE_LEGACY_BATCH_READER = "enable_legacy_batch_reader";
221+
175222
public OceanBaseConfig(Map<String, String> properties) {
176223
super();
177224
loadFromMap(properties, k -> true);
178225
}
179226

227+
public Map<String, String> getProperties() {
228+
return super.configMap;
229+
}
230+
231+
public void setProperty(String key, String value) {
232+
super.configMap.put(key, value);
233+
}
234+
180235
public String getURL() {
181236
return get(URL);
182237
}
@@ -217,7 +272,7 @@ public int getDirectLoadParallel() {
217272
return get(DIRECT_LOAD_PARALLEL);
218273
}
219274

220-
public int getBatchSize() {
275+
public int getDirectLoadBatchSize() {
221276
return get(DIRECT_LOAD_BATCH_SIZE);
222277
}
223278

@@ -252,4 +307,32 @@ public Integer getDirectLoadTaskPartitionSize() {
252307
public boolean getDirectLoadUseRepartition() {
253308
return get(DIRECT_LOAD_TASK_USE_REPARTITION);
254309
}
310+
311+
public Integer getJdbcFetchSize() {
312+
return get(JDBC_FETCH_SIZE);
313+
}
314+
315+
public Integer getJdbcBatchSize() {
316+
return get(JDBC_BATCH_SIZE);
317+
}
318+
319+
public Integer getJdbcQueryTimeout() {
320+
return get(JDBC_QUERY_TIMEOUT);
321+
}
322+
323+
public String getDriver() {
324+
return get(DRIVER);
325+
}
326+
327+
public String getDbTable() {
328+
return getProperties().get(DB_TABLE);
329+
}
330+
331+
public String getTableComment() {
332+
return getProperties().get(TABLE_COMMENT);
333+
}
334+
335+
public Boolean getPushDownPredicate() {
336+
return get(JDBC_PUSH_DOWN_PREDICATE);
337+
}
255338
}

0 commit comments

Comments
 (0)