From d8afcc1a4b79e8d2d9fe220adeccc7fb2c7a88cf Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 11 Feb 2025 14:36:02 +0200 Subject: [PATCH 1/3] init --- .../hsfs/beam/HopsworksConnection.java | 3 + .../hsfs/beam/engine/BeamEngine.java | 2 - .../hsfs/flink/HopsworksConnection.java | 3 + .../hsfs/flink/engine/FlinkEngine.java | 2 - .../logicalclocks/hsfs/StorageConnector.java | 54 +++++++--- .../logicalclocks/hsfs/engine/EngineBase.java | 10 ++ .../hsfs/TestStorageConnector.java | 65 +++++++++++ .../hsfs/spark/HopsworksConnection.java | 2 + .../hsfs/spark/engine/SparkEngine.java | 5 +- python/hsfs/storage_connector.py | 52 +++++++-- python/tests/test_storage_connector.py | 102 ++++++++++++++++++ 11 files changed, 273 insertions(+), 27 deletions(-) create mode 100644 java/hsfs/src/test/java/com/logicalclocks/hsfs/TestStorageConnector.java diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/HopsworksConnection.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/HopsworksConnection.java index 8b19103f5..4cf340ae4 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/HopsworksConnection.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/HopsworksConnection.java @@ -20,6 +20,8 @@ import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.HopsworksConnectionBase; import com.logicalclocks.hsfs.SecretStore; +import com.logicalclocks.hsfs.beam.engine.BeamEngine; +import com.logicalclocks.hsfs.engine.EngineBase; import com.logicalclocks.hsfs.metadata.Credentials; import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.HopsworksHttpClient; @@ -55,6 +57,7 @@ public HopsworksConnection(String host, int port, String project, Region region, hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue); this.projectObj = getProject(); HopsworksClient.getInstance().setProject(this.projectObj); + EngineBase.setInstance(BeamEngine.getInstance()); Credentials credentials = HopsworksClient.getInstance().getCredentials(); HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient(); hopsworksHttpClient.setTrustStorePath(credentials.gettStore()); diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java index 13ff573a1..63c68147f 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java @@ -87,8 +87,6 @@ public Map getKafkaConfig(FeatureGroupBase featureGroup, Map config = storageConnector.kafkaOptions(); diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java index 68195c77f..d981ed57f 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java @@ -20,6 +20,8 @@ import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.HopsworksConnectionBase; import com.logicalclocks.hsfs.SecretStore; +import com.logicalclocks.hsfs.engine.EngineBase; +import com.logicalclocks.hsfs.flink.engine.FlinkEngine; import com.logicalclocks.hsfs.metadata.Credentials; import com.logicalclocks.hsfs.metadata.HopsworksClient; @@ -56,6 +58,7 @@ public HopsworksConnection(String host, int port, String project, Region region, hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue); this.projectObj = getProject(); HopsworksClient.getInstance().setProject(this.projectObj); + EngineBase.setInstance(FlinkEngine.getInstance()); if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) { Credentials credentials = HopsworksClient.getInstance().getCredentials(); HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient(); diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java index b8e43908e..e748f567f 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java @@ -128,8 +128,6 @@ public Map getKafkaConfig(FeatureGroupBase featureGroup, Map config = storageConnector.kafkaOptions(); diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index 3f6a64c62..b5c20cb3b 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -17,14 +17,27 @@ package com.logicalclocks.hsfs; +import java.io.IOException; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Strings; +import com.logicalclocks.hsfs.engine.EngineBase; +import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.HopsworksHttpClient; import com.logicalclocks.hsfs.metadata.Option; import com.logicalclocks.hsfs.metadata.StorageConnectorApi; -import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.util.Constants; import lombok.AllArgsConstructor; @@ -32,17 +45,8 @@ import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.utils.CollectionUtils; -import java.io.IOException; -import java.time.Instant; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - @AllArgsConstructor @NoArgsConstructor @ToString @@ -395,13 +399,13 @@ public static class KafkaConnector extends StorageConnector { @Getter @Setter protected SecurityProtocol securityProtocol; - @Getter @Setter + @Getter protected String sslTruststoreLocation; @Getter @Setter protected String sslTruststorePassword; - @Getter @Setter + @Getter protected String sslKeystoreLocation; @Getter @Setter @@ -413,12 +417,36 @@ public static class KafkaConnector extends StorageConnector { @Getter @Setter protected SslEndpointIdentificationAlgorithm sslEndpointIdentificationAlgorithm; - @Getter @Setter + @Getter protected List