Skip to content

Commit 10726a9

Browse files
authored
[FSTORE-1534] Add support for working with multiple S3 connectors within the same application
1 parent 6e21906 commit 10726a9

File tree

2 files changed

+85
-12
lines changed

2 files changed

+85
-12
lines changed

python/hsfs/engine/spark.py

+23-10
Original file line numberDiff line numberDiff line change
@@ -1188,41 +1188,54 @@ def setup_storage_connector(self, storage_connector, path=None):
11881188
return path
11891189

11901190
def _setup_s3_hadoop_conf(self, storage_connector, path):
1191-
FS_S3_ENDPOINT = "fs.s3a.endpoint"
1191+
# For legacy behaviour set the S3 values at global level
1192+
self._set_s3_hadoop_conf(storage_connector, "fs.s3a")
1193+
1194+
# Set credentials at bucket level as well to allow users to use multiple
1195+
# storage connector in the same application.
1196+
self._set_s3_hadoop_conf(
1197+
storage_connector, f"fs.s3a.bucket.{storage_connector.bucket}"
1198+
)
1199+
return path.replace("s3", "s3a", 1) if path is not None else None
1200+
1201+
def _set_s3_hadoop_conf(self, storage_connector, prefix):
11921202
if storage_connector.access_key:
11931203
self._spark_context._jsc.hadoopConfiguration().set(
1194-
"fs.s3a.access.key", storage_connector.access_key
1204+
f"{prefix}.access.key", storage_connector.access_key
11951205
)
11961206
if storage_connector.secret_key:
11971207
self._spark_context._jsc.hadoopConfiguration().set(
1198-
"fs.s3a.secret.key", storage_connector.secret_key
1208+
f"{prefix}.secret.key", storage_connector.secret_key
11991209
)
12001210
if storage_connector.server_encryption_algorithm:
12011211
self._spark_context._jsc.hadoopConfiguration().set(
1202-
"fs.s3a.server-side-encryption-algorithm",
1212+
f"{prefix}.server-side-encryption-algorithm",
12031213
storage_connector.server_encryption_algorithm,
12041214
)
12051215
if storage_connector.server_encryption_key:
12061216
self._spark_context._jsc.hadoopConfiguration().set(
1207-
"fs.s3a.server-side-encryption-key",
1217+
f"{prefix}.server-side-encryption-key",
12081218
storage_connector.server_encryption_key,
12091219
)
12101220
if storage_connector.session_token:
1221+
print(f"session token set for {prefix}")
12111222
self._spark_context._jsc.hadoopConfiguration().set(
1212-
"fs.s3a.aws.credentials.provider",
1223+
f"{prefix}.aws.credentials.provider",
12131224
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
12141225
)
12151226
self._spark_context._jsc.hadoopConfiguration().set(
1216-
"fs.s3a.session.token",
1227+
f"{prefix}.session.token",
12171228
storage_connector.session_token,
12181229
)
1230+
1231+
# This is the name of the property as expected from the user, without the bucket name.
1232+
FS_S3_ENDPOINT = "fs.s3a.endpoint"
12191233
if FS_S3_ENDPOINT in storage_connector.arguments:
12201234
self._spark_context._jsc.hadoopConfiguration().set(
1221-
FS_S3_ENDPOINT, storage_connector.spark_options().get(FS_S3_ENDPOINT)
1235+
f"{prefix}.endpoint",
1236+
storage_connector.spark_options().get(FS_S3_ENDPOINT),
12221237
)
12231238

1224-
return path.replace("s3", "s3a", 1) if path is not None else None
1225-
12261239
def _setup_adls_hadoop_conf(self, storage_connector, path):
12271240
for k, v in storage_connector.spark_options().items():
12281241
self._spark_context._jsc.hadoopConfiguration().set(k, v)

python/tests/engine/test_spark.py

+62-2
Original file line numberDiff line numberDiff line change
@@ -4372,7 +4372,7 @@ def test_setup_storage_connector_jdbc(self, mocker):
43724372
assert mock_spark_engine_setup_adls_hadoop_conf.call_count == 0
43734373
assert mock_spark_engine_setup_gcp_hadoop_conf.call_count == 0
43744374

4375-
def test_setup_s3_hadoop_conf(self, mocker):
4375+
def test_setup_s3_hadoop_conf_legacy(self, mocker):
43764376
# Arrange
43774377
mock_pyspark_getOrCreate = mocker.patch(
43784378
"pyspark.sql.session.SparkSession.builder.getOrCreate"
@@ -4384,6 +4384,7 @@ def test_setup_s3_hadoop_conf(self, mocker):
43844384
id=1,
43854385
name="test_connector",
43864386
featurestore_id=99,
4387+
bucket="bucket-name",
43874388
access_key="1",
43884389
secret_key="2",
43894390
server_encryption_algorithm="3",
@@ -4402,7 +4403,7 @@ def test_setup_s3_hadoop_conf(self, mocker):
44024403
assert result == "s3a_test_path"
44034404
assert (
44044405
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count
4405-
== 7
4406+
== 14
44064407
)
44074408
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
44084409
"fs.s3a.access.key", s3_connector.access_key
@@ -4428,6 +4429,65 @@ def test_setup_s3_hadoop_conf(self, mocker):
44284429
"fs.s3a.endpoint", s3_connector.arguments.get("fs.s3a.endpoint")
44294430
)
44304431

4432+
def test_setup_s3_hadoop_conf_bucket_scope(self, mocker):
4433+
# Arrange
4434+
mock_pyspark_getOrCreate = mocker.patch(
4435+
"pyspark.sql.session.SparkSession.builder.getOrCreate"
4436+
)
4437+
4438+
spark_engine = spark.Engine()
4439+
4440+
s3_connector = storage_connector.S3Connector(
4441+
id=1,
4442+
name="test_connector",
4443+
featurestore_id=99,
4444+
bucket="bucket-name",
4445+
access_key="1",
4446+
secret_key="2",
4447+
server_encryption_algorithm="3",
4448+
server_encryption_key="4",
4449+
session_token="5",
4450+
arguments=[{"name": "fs.s3a.endpoint", "value": "testEndpoint"}],
4451+
)
4452+
4453+
# Act
4454+
result = spark_engine._setup_s3_hadoop_conf(
4455+
storage_connector=s3_connector,
4456+
path="s3_test_path",
4457+
)
4458+
4459+
# Assert
4460+
assert result == "s3a_test_path"
4461+
assert (
4462+
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count
4463+
== 14
4464+
)
4465+
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
4466+
"fs.s3a.bucket.bucket-name.access.key", s3_connector.access_key
4467+
)
4468+
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
4469+
"fs.s3a.bucket.bucket-name.secret.key", s3_connector.secret_key
4470+
)
4471+
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
4472+
"fs.s3a.bucket.bucket-name.server-side-encryption-algorithm",
4473+
s3_connector.server_encryption_algorithm,
4474+
)
4475+
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
4476+
"fs.s3a.bucket.bucket-name.server-side-encryption-key",
4477+
s3_connector.server_encryption_key,
4478+
)
4479+
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
4480+
"fs.s3a.bucket.bucket-name.aws.credentials.provider",
4481+
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
4482+
)
4483+
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
4484+
"fs.s3a.bucket.bucket-name.session.token", s3_connector.session_token
4485+
)
4486+
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
4487+
"fs.s3a.bucket.bucket-name.endpoint",
4488+
s3_connector.arguments.get("fs.s3a.endpoint"),
4489+
)
4490+
44314491
def test_setup_adls_hadoop_conf(self, mocker):
44324492
# Arrange
44334493
mock_pyspark_getOrCreate = mocker.patch(

0 commit comments

Comments
 (0)