Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1015] Add option to add Keytab to Kafka connector #481

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HopsworksConnectionBase;
import com.logicalclocks.hsfs.SecretStore;
import com.logicalclocks.hsfs.beam.engine.BeamEngine;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.metadata.Credentials;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
@@ -55,6 +57,7 @@ public HopsworksConnection(String host, int port, String project, Region region,
hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue);
this.projectObj = getProject();
HopsworksClient.getInstance().setProject(this.projectObj);
EngineBase.setInstance(BeamEngine.getInstance());
Credentials credentials = HopsworksClient.getInstance().getCredentials();
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
hopsworksHttpClient.setTrustStorePath(credentials.gettStore());
Original file line number Diff line number Diff line change
@@ -37,14 +37,14 @@ public class BeamEngine extends EngineBase {
private static BeamEngine INSTANCE = null;
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

public static synchronized BeamEngine getInstance() throws FeatureStoreException {
public static synchronized BeamEngine getInstance() {
if (INSTANCE == null) {
INSTANCE = new BeamEngine();
}
return INSTANCE;
}

private BeamEngine() throws FeatureStoreException {
private BeamEngine() {
}

public BeamProducer insertStream(StreamFeatureGroup streamFeatureGroup, Map<String, String> writeOptions)
@@ -87,8 +87,6 @@ public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<Str

StorageConnector.KafkaConnector storageConnector =
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnector.setSslTruststoreLocation(addFile(storageConnector.getSslTruststoreLocation()));
storageConnector.setSslKeystoreLocation(addFile(storageConnector.getSslKeystoreLocation()));

Map<String, String> config = storageConnector.kafkaOptions();

Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HopsworksConnectionBase;
import com.logicalclocks.hsfs.SecretStore;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.flink.engine.FlinkEngine;
import com.logicalclocks.hsfs.metadata.Credentials;
import com.logicalclocks.hsfs.metadata.HopsworksClient;

@@ -56,6 +58,7 @@ public HopsworksConnection(String host, int port, String project, Region region,
hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue);
this.projectObj = getProject();
HopsworksClient.getInstance().setProject(this.projectObj);
EngineBase.setInstance(FlinkEngine.getInstance());
if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) {
Credentials credentials = HopsworksClient.getInstance().getCredentials();
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
Original file line number Diff line number Diff line change
@@ -128,8 +128,6 @@ public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<Str

StorageConnector.KafkaConnector storageConnector =
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnector.setSslTruststoreLocation(addFile(storageConnector.getSslTruststoreLocation()));
storageConnector.setSslKeystoreLocation(addFile(storageConnector.getSslKeystoreLocation()));

Map<String, String> config = storageConnector.kafkaOptions();

Original file line number Diff line number Diff line change
@@ -17,32 +17,36 @@

package com.logicalclocks.hsfs;

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Strings;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
import com.logicalclocks.hsfs.metadata.Option;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.util.Constants;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.utils.CollectionUtils;

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@AllArgsConstructor
@NoArgsConstructor
@ToString
@@ -395,13 +399,13 @@ public static class KafkaConnector extends StorageConnector {
@Getter @Setter
protected SecurityProtocol securityProtocol;

@Getter @Setter
@Getter
protected String sslTruststoreLocation;

@Getter @Setter
protected String sslTruststorePassword;

@Getter @Setter
@Getter
protected String sslKeystoreLocation;

@Getter @Setter
@@ -413,12 +417,36 @@ public static class KafkaConnector extends StorageConnector {
@Getter @Setter
protected SslEndpointIdentificationAlgorithm sslEndpointIdentificationAlgorithm;

@Getter @Setter
@Getter
protected List<Option> options;

@Getter @Setter
protected Boolean externalKafka;

public void setSslTruststoreLocation(String sslTruststoreLocation) throws IOException, FeatureStoreException {
this.sslTruststoreLocation = EngineBase.getInstance().addFile(sslTruststoreLocation);
}

public void setSslKeystoreLocation(String sslKeystoreLocation) throws IOException, FeatureStoreException {
this.sslKeystoreLocation = EngineBase.getInstance().addFile(sslKeystoreLocation);
}

public void setOptions(List<Option> options) throws IOException, FeatureStoreException {
// add keytab file
for (Option option: options) {
if (option.getName().equals("sasl.jaas.config")) {
Pattern pattern = Pattern.compile("keyTab=[\"'](.+?)[\"']");
Matcher matcher = pattern.matcher(option.getValue());
while (matcher.find()) {
String originalKeytabLocation = matcher.group(1);
String newKeytabLocation = EngineBase.getInstance().addFile(originalKeytabLocation);
option.setValue(option.getValue().replace(originalKeytabLocation, newKeytabLocation));
}
}
}
this.options = options;
}

public Map<String, String> kafkaOptions() throws FeatureStoreException {
HopsworksHttpClient client = HopsworksClient.getInstance().getHopsworksHttpClient();
Map<String, String> config = new HashMap<>();
Original file line number Diff line number Diff line change
@@ -28,6 +28,16 @@

public abstract class EngineBase {

private static EngineBase instance;

public static synchronized void setInstance(EngineBase instance) {
EngineBase.instance = instance;
}

public static synchronized EngineBase getInstance() {
return instance;
}

protected static final Logger LOGGER = LoggerFactory.getLogger(EngineBase.class);

public StorageConnectorApi storageConnectorApi = new StorageConnectorApi();
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2025. Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.logicalclocks.hsfs;

import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.metadata.Option;

import java.io.IOException;
import java.util.Collections;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;


class TestStorageConnector {
@Test
public void testOptions() throws FeatureStoreException, IOException {
// Arrange
EngineBase engineBase = Mockito.mock(EngineBase.class);
Mockito.when(engineBase.addFile(Mockito.anyString())).thenReturn("result_from_add_file");
EngineBase.setInstance(engineBase);

StorageConnector.KafkaConnector kafkaConnector = new StorageConnector.KafkaConnector();

// Act
kafkaConnector.setOptions(Collections.singletonList(new Option("test", "test")));

// Assert
Mockito.verify(engineBase, Mockito.times(0)).addFile(Mockito.anyString());
}

@Test
public void testOptionsSASLAuthentication() throws FeatureStoreException, IOException {
// Arrange
EngineBase engineBase = Mockito.mock(EngineBase.class);
Mockito.when(engineBase.addFile(Mockito.anyString())).thenReturn("result_from_add_file");
EngineBase.setInstance(engineBase);

StorageConnector.KafkaConnector kafkaConnector = new StorageConnector.KafkaConnector();

// Act
kafkaConnector.setOptions(Collections.singletonList(new Option("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"/home/laurent/my.keytab\" storeKey=true useTicketCache=false serviceName=\"kafka\" principal=\"laurent@kafka.com\";")));

// Assert
Mockito.verify(engineBase, Mockito.times(1)).addFile(Mockito.anyString());
Assertions.assertEquals("com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"result_from_add_file\" storeKey=true useTicketCache=false serviceName=\"kafka\" principal=\"laurent@kafka.com\";", kafkaConnector.getOptions().get(0).getValue());
}
}

Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HopsworksConnectionBase;
import com.logicalclocks.hsfs.SecretStore;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.metadata.HopsworksClient;

import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
@@ -53,6 +54,7 @@ public HopsworksConnection(String host, int port, String project, Region region,
hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue);
this.projectObj = getProject();
HopsworksClient.getInstance().setProject(this.projectObj);
EngineBase.setInstance(SparkEngine.getInstance());
if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) {
SparkEngine.getInstance().validateSparkConfiguration();
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
Original file line number Diff line number Diff line change
@@ -41,7 +41,6 @@
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.OnDemandOptions;
import com.logicalclocks.hsfs.metadata.OnlineIngestionApi;
import com.logicalclocks.hsfs.metadata.Option;
import com.logicalclocks.hsfs.util.Constants;
import com.logicalclocks.hsfs.spark.ExternalFeatureGroup;
@@ -125,7 +124,6 @@ public class SparkEngine extends EngineBase {

private final StorageConnectorUtils storageConnectorUtils = new StorageConnectorUtils();
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();
private OnlineIngestionApi onlineIngestionApi = new OnlineIngestionApi();

private static SparkEngine INSTANCE = null;

@@ -923,8 +921,6 @@ public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<Str

StorageConnector.KafkaConnector storageConnector =
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnector.setSslTruststoreLocation(addFile(storageConnector.getSslTruststoreLocation()));
storageConnector.setSslKeystoreLocation(addFile(storageConnector.getSslKeystoreLocation()));

Map<String, String> config = storageConnector.sparkOptions();

52 changes: 46 additions & 6 deletions python/hsfs/storage_connector.py
Original file line number Diff line number Diff line change
@@ -1124,6 +1124,19 @@ def __init__(
self._external_kafka = external_kafka
self._pem_files_created = False

# add keytab file
sasl_jaas_config = self._options.get("sasl.jaas.config")
if sasl_jaas_config:
for option in re.findall("keyTab=[\"'](.+?)[\"']", sasl_jaas_config):
original_keytab_location = option
new_keytab_location = engine.get_instance().add_file(
original_keytab_location
)
sasl_jaas_config = sasl_jaas_config.replace(
original_keytab_location, new_keytab_location
)
self._options["sasl.jaas.config"] = sasl_jaas_config

@property
def bootstrap_servers(self) -> Optional[List[str]]:
"""Bootstrap servers string."""
@@ -1245,11 +1258,17 @@ def confluent_options(self) -> Dict[str, Any]:
config["ssl.key.location"] = client_key_path
elif key == "sasl.jaas.config":
groups = re.search(
"(.+?) .*username=[\"'](.+?)[\"'] .*password=[\"'](.+?)[\"']",
value,
"(.+) (required|requisite|sufficient|optional)(.*)", value
)
mechanism = groups.group(1)
# flag = groups.group(2)
options = groups.group(3)

option_dict = {}
for option in re.findall(r"\s(\w+)=[\"'](.+?)[\"']", options):
option_dict[option[0]] = option[1]

if "sasl.mechanisms" not in config:
mechanism = groups.group(1)
mechanism_value = None
if (
mechanism
@@ -1266,9 +1285,30 @@ def confluent_options(self) -> Dict[str, Any]:
== "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule"
):
mechanism_value = "OAUTHBEARER"
config["sasl.mechanisms"] = mechanism_value
config["sasl.username"] = groups.group(2)
config["sasl.password"] = groups.group(3)
else:
mechanism_value = "GSSAPI"
config["sasl.mechanisms"] = mechanism_value

if mechanism_value == "GSSAPI":
service_name = option_dict.get("serviceName")
if service_name:
config["sasl.kerberos.service.name"] = service_name

principal = option_dict.get("principal")
if principal:
config["sasl.kerberos.principal"] = principal

key_tab = option_dict.get("keyTab")
if key_tab:
config["sasl.kerberos.keytab"] = key_tab
else:
username = option_dict.get("username")
if username:
config["sasl.username"] = username

password = option_dict.get("password")
if password:
config["sasl.password"] = password
elif key == "ssl.endpoint.identification.algorithm":
config[key] = "none" if value == "" else value
elif key == "queued.max.requests":
Loading
Loading