Skip to content

Commit 55f4ebe

Browse files
authored
Enhancement: Add direct-load.username configuration option to be compatible with OceanBase on the cloud. (#38)
* Enhancement: Added direct-load.username configuration option to be compatible with OceanBase on the cloud.
1 parent 2cea04f commit 55f4ebe

File tree

6 files changed

+33
-2
lines changed

6 files changed

+33
-2
lines changed

docs/spark-catalog-oceanbase.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,13 @@ select * from spark_catalog.default.orders;
371371
<td>Integer</td>
372372
<td>Rpc port used in direct-load.</td>
373373
</tr>
374+
<tr>
375+
<td>spark.sql.catalog.your_catalog_name.direct-load.username</td>
376+
<td>否</td>
377+
<td></td>
378+
<td>String</td>
379+
<td>The direct-load's username. If this configuration is not specified, the jdbc username is used.</td>
380+
</tr>
374381
<tr>
375382
<td>spark.sql.catalog.your_catalog_name.direct-load.parallel</td>
376383
<td>No</td>

docs/spark-catalog-oceanbase_cn.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,13 @@ select * from spark_catalog.default.orders;
370370
<td>Integer</td>
371371
<td>旁路导入用到的rpc端口。</td>
372372
</tr>
373+
<tr>
374+
<td>spark.sql.catalog.your_catalog_name.direct-load.username</td>
375+
<td>否</td>
376+
<td></td>
377+
<td>String</td>
378+
<td>旁路导入用户名。如果不指定该配置,则使用jdbc用户名。</td>
379+
</tr>
373380
<tr>
374381
<td>spark.sql.catalog.your_catalog_name.direct-load.parallel</td>
375382
<td>否</td>

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ public class OceanBaseConfig extends Config implements Serializable {
8888
.checkValue(port -> port > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
8989
.createWithDefault(2882);
9090

91+
public static final ConfigEntry<String> DIRECT_LOAD_USERNAME =
92+
new ConfigBuilder("direct-load.username")
93+
.doc("The username used in direct-load")
94+
.version(ConfigConstants.VERSION_1_1_0)
95+
.stringConf()
96+
.create();
97+
9198
public static final ConfigEntry<Integer> DIRECT_LOAD_PARALLEL =
9299
new ConfigBuilder("direct-load.parallel")
93100
.doc(
@@ -159,7 +166,7 @@ public class OceanBaseConfig extends Config implements Serializable {
159166
.createWithDefault(0);
160167

161168
public static final ConfigEntry<Integer> DIRECT_LOAD_BATCH_SIZE =
162-
new ConfigBuilder("direct-load-batch-size")
169+
new ConfigBuilder("direct-load.batch-size")
163170
.doc("The batch size write to OceanBase one time")
164171
.version(ConfigConstants.VERSION_1_0_0)
165172
.intConf()
@@ -289,6 +296,10 @@ public int getDirectLoadPort() {
289296
return get(DIRECT_LOAD_RPC_PORT);
290297
}
291298

299+
public String getDirectLoadUserName() {
300+
return get(DIRECT_LOAD_USERNAME);
301+
}
302+
292303
public String getDirectLoadExecutionId() {
293304
return get(DIRECT_LOAD_EXECUTION_ID);
294305
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoadUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@ public class DirectLoadUtils {
2525
public static DirectLoader buildDirectLoaderFromSetting(OceanBaseConfig oceanBaseConfig) {
2626
try {
2727
OceanBaseUserInfo userInfo = OceanBaseUserInfo.parse(oceanBaseConfig);
28+
String directLoadUserName = oceanBaseConfig.getDirectLoadUserName();
29+
if (directLoadUserName == null || directLoadUserName.isEmpty()) {
30+
directLoadUserName = userInfo.getUser();
31+
}
2832
return new DirectLoaderBuilder()
2933
.host(oceanBaseConfig.getDirectLoadHost())
3034
.port(oceanBaseConfig.getDirectLoadPort())
31-
.user(userInfo.getUser())
35+
.user(directLoadUserName)
3236
.password(oceanBaseConfig.getPassword())
3337
.tenant(userInfo.getTenant())
3438
.schema(oceanBaseConfig.getSchemaName())

spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OBCatalogMySQLITCase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
153153
.config("spark.sql.catalog.ob.direct-load.enabled", "true")
154154
.config("spark.sql.catalog.ob.direct-load.host", getHost)
155155
.config("spark.sql.catalog.ob.direct-load.rpc-port", getRpcPort)
156+
.config("spark.sql.catalog.ob.direct-load.username", getUsername.split("@").head)
156157
.getOrCreate()
157158

158159
insertTestData(session, "products")

spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
7575
| "direct-load.enabled"=true,
7676
| "direct-load.host"="$getHost",
7777
| "direct-load.rpc-port"=$getRpcPort,
78+
| "direct-load.username"="${getUsername.split("@").head}",
7879
| "direct-load.write-thread-num"="1"
7980
|);
8081
|""".stripMargin)

0 commit comments

Comments
 (0)