Skip to content

Commit e383460

Browse files
committed
Merge remote-tracking branch 'origin/main' into fix-spark-java-endpoint
2 parents 86764a6 + 36ee228 commit e383460

File tree

29 files changed

+399
-135
lines changed

29 files changed

+399
-135
lines changed

java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.logicalclocks.hsfs.OnlineConfig;
2626
import com.logicalclocks.hsfs.StatisticsConfig;
2727
import com.logicalclocks.hsfs.Storage;
28+
import com.logicalclocks.hsfs.StorageConnector;
2829
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
2930
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
3031
import com.logicalclocks.hsfs.constructor.QueryBase;
@@ -48,7 +49,7 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
4849
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
4950
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
5051
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
51-
String eventTime, OnlineConfig onlineConfig) {
52+
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
5253
this();
5354
this.featureStore = featureStore;
5455
this.name = name;
@@ -65,6 +66,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
6566
this.onlineTopicName = onlineTopicName;
6667
this.eventTime = eventTime;
6768
this.onlineConfig = onlineConfig;
69+
this.storageConnector = storageConnector;
70+
this.path = path;
6871
}
6972

7073
public StreamFeatureGroup() {

java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.logicalclocks.hsfs.OnlineConfig;
2727
import com.logicalclocks.hsfs.StatisticsConfig;
2828
import com.logicalclocks.hsfs.Storage;
29+
import com.logicalclocks.hsfs.StorageConnector;
2930
import com.logicalclocks.hsfs.constructor.QueryBase;
3031

3132
import com.logicalclocks.hsfs.metadata.Statistics;
@@ -54,7 +55,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
5455
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
5556
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
5657
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
57-
OnlineConfig onlineConfig) {
58+
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
5859
this();
5960
this.featureStore = featureStore;
6061
this.name = name;
@@ -73,6 +74,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
7374
this.notificationTopicName = notificationTopicName;
7475
this.eventTime = eventTime;
7576
this.onlineConfig = onlineConfig;
77+
this.storageConnector = storageConnector;
78+
this.path = path;
7679
}
7780

7881
public StreamFeatureGroup() {

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java

+8
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,14 @@ public abstract class FeatureGroupBase<T> {
132132
@Setter
133133
protected OnlineConfig onlineConfig;
134134

135+
@Getter
136+
@Setter
137+
protected StorageConnector storageConnector;
138+
139+
@Getter
140+
@Setter
141+
protected String path;
142+
135143
@JsonIgnore
136144
// These are only used in the client. In the server they are aggregated in the `features` field
137145
protected List<String> partitionKeys;

java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java

+3
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ public static class S3Connector extends StorageConnector {
124124
@Getter @Setter
125125
protected String bucket;
126126

127+
@Getter @Setter
128+
protected String region;
129+
127130
@Getter @Setter
128131
protected String sessionToken;
129132

java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt
128128
}
129129

130130
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
131-
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();;
131+
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();
132132

133133
return commitTimeStamp;
134134
}

java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/HopsworksHttpClient.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import org.apache.http.HttpRequest;
2323
import org.apache.http.client.ResponseHandler;
2424
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2526

2627
import java.io.File;
2728
import java.io.IOException;
29+
import java.nio.charset.Charset;
2830

2931
public interface HopsworksHttpClient {
30-
static final Logger LOGGER = null;
32+
Logger LOGGER = LoggerFactory.getLogger(HopsworksHttpClient.class);
3133

3234
<T> T handleRequest(HttpRequest request, ResponseHandler<T> responseHandler)
3335
throws IOException, FeatureStoreException;
@@ -46,7 +48,7 @@ <T> T handleRequest(HttpRequest request, ResponseHandler<T> responseHandler)
4648

4749
static String readCertKey(String materialPwd) {
4850
try {
49-
return FileUtils.readFileToString(new File(materialPwd));
51+
return FileUtils.readFileToString(new File(materialPwd), Charset.defaultCharset());
5052
} catch (IOException ex) {
5153
LOGGER.warn("Failed to get cert password.", ex);
5254
}

java/hsfs/src/test/java/com/logicalclocks/hsfs/metadata/TestHopsworksClient.java

+8
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,12 @@ public void testStringEntitySerialization() throws IOException {
4646
Assertions.assertEquals("{\"email\":\"test@test.com\",\"firstName\":\"test\",\"lastName\":\"de la Rúa Martínez\"}",
4747
json);
4848
}
49+
50+
// FSTORE-1562: readCertKey throws NullPointerException if
51+
@Test
52+
public void testReadCertKey_failure() {
53+
String key = HopsworksHttpClient.readCertKey("/this/path/does/not/exists");
54+
Assertions.assertNull(key);
55+
}
56+
4957
}

java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java

-8
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@
5757
@JsonIgnoreProperties(ignoreUnknown = true)
5858
public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
5959

60-
@Getter
61-
@Setter
62-
private StorageConnector storageConnector;
63-
6460
@Getter
6561
@Setter
6662
private String query;
@@ -69,10 +65,6 @@ public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
6965
@Setter
7066
private ExternalDataFormat dataFormat;
7167

72-
@Getter
73-
@Setter
74-
private String path;
75-
7668
@Getter
7769
@Setter
7870
private List<OnDemandOptions> options;

java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.logicalclocks.hsfs.OnlineConfig;
3232
import com.logicalclocks.hsfs.StatisticsConfig;
3333
import com.logicalclocks.hsfs.Storage;
34+
import com.logicalclocks.hsfs.StorageConnector;
3435
import com.logicalclocks.hsfs.TimeTravelFormat;
3536
import com.logicalclocks.hsfs.FeatureGroupBase;
3637
import com.logicalclocks.hsfs.metadata.Statistics;
@@ -64,7 +65,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
6465
String description, List<String> primaryKeys, List<String> partitionKeys,
6566
String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
6667
List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
67-
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) {
68+
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
69+
StorageConnector storageConnector, String path) {
6870
this();
6971
this.featureStore = featureStore;
7072
this.name = name;
@@ -85,6 +87,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
8587
this.notificationTopicName = notificationTopicName;
8688
this.eventTime = eventTime;
8789
this.onlineConfig = onlineConfig;
90+
this.storageConnector = storageConnector;
91+
this.path = path;
8892
}
8993

9094
public FeatureGroup() {

java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.logicalclocks.hsfs.OnlineConfig;
3131
import com.logicalclocks.hsfs.StatisticsConfig;
3232
import com.logicalclocks.hsfs.Storage;
33+
import com.logicalclocks.hsfs.StorageConnector;
3334
import com.logicalclocks.hsfs.FeatureGroupBase;
3435
import com.logicalclocks.hsfs.metadata.Statistics;
3536

@@ -62,7 +63,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
6263
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
6364
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
6465
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
65-
OnlineConfig onlineConfig) {
66+
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
6667
this();
6768
this.featureStore = featureStore;
6869
this.name = name;
@@ -81,6 +82,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
8182
this.notificationTopicName = notificationTopicName;
8283
this.eventTime = eventTime;
8384
this.onlineConfig = onlineConfig;
85+
this.storageConnector = storageConnector;
86+
this.path = path;
8487
}
8588

8689
public StreamFeatureGroup() {

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java

-4
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,6 @@ public Dataset<Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand
219219
? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup),
220220
onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));
221221

222-
if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) {
223-
sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect();
224-
}
225-
226222
dataset.createOrReplaceTempView(alias);
227223
return dataset;
228224
}

java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() {
6767

6868
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
6969
Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
70-
true, features, null, "onlineTopicName", null, null, null, null);
70+
true, features, null, "onlineTopicName", null, null, null, null, null, null);
7171

7272
Exception pkException = assertThrows(FeatureStoreException.class, () -> {
7373
featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
74-
null, null, null, null, null);;;
74+
null, null, null, null, null);
7575
});
7676

7777
// Assert
@@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() {
9393

9494
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
9595
Collections.singletonList("featureA"), null, null,
96-
true, features, null, "onlineTopicName", null, null, "eventTime", null);
96+
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
9797

9898
Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
9999
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
100-
null, null, null, null, null);;;
100+
null, null, null, null, null);
101101
});
102102

103103
// Assert
@@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {
119119

120120
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
121121
Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
122-
true, features, null, "onlineTopicName", null, null, null, null);
122+
true, features, null, "onlineTopicName", null, null, null, null, null, null);
123123

124124
Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
125125
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
@@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce
164164

165165
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
166166
Collections.singletonList("featureA"), null, null,
167-
true, features, null, "onlineTopicName", null, null, "eventTime", null);
167+
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
168168
featureGroup.featureGroupEngine = featureGroupEngine;
169169

170170
// Act

python/hsfs/constructor/fs_query.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(
3535
expand: Optional[List[str]] = None,
3636
items: Optional[List[Dict[str, Any]]] = None,
3737
type: Optional[str] = None,
38+
delta_cached_feature_groups: Optional[List[Dict[str, Any]]] = None,
3839
**kwargs,
3940
) -> None:
4041
self._query = query
@@ -60,6 +61,14 @@ def __init__(
6061
else:
6162
self._hudi_cached_feature_groups = []
6263

64+
if delta_cached_feature_groups is not None:
65+
self._delta_cached_feature_groups = [
66+
hudi_feature_group_alias.HudiFeatureGroupAlias.from_response_json(fg)
67+
for fg in delta_cached_feature_groups
68+
]
69+
else:
70+
self._delta_cached_feature_groups = []
71+
6372
@classmethod
6473
def from_response_json(cls, json_dict: Dict[str, Any]) -> "FsQuery":
6574
json_decamelized = humps.decamelize(json_dict)
@@ -127,7 +136,7 @@ def register_delta_tables(
127136
feature_store_name: str,
128137
read_options: Optional[Dict[str, Any]],
129138
) -> None:
130-
for hudi_fg in self._hudi_cached_feature_groups:
139+
for hudi_fg in self._delta_cached_feature_groups:
131140
engine.get_instance().register_delta_temporary_table(
132141
hudi_fg, feature_store_id, feature_store_name, read_options
133142
)

python/hsfs/core/arrow_flight_client.py

+26-2
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,13 @@ def _is_query_supported_rec(query: query.Query):
124124
and query._left_feature_group.storage_connector.type
125125
in ArrowFlightClient.SUPPORTED_EXTERNAL_CONNECTORS
126126
)
127-
supported = hudi_no_time_travel or supported_connector
127+
delta_s3 = (
128+
isinstance(query._left_feature_group, feature_group.FeatureGroup)
129+
and query._left_feature_group.time_travel_format == "DELTA"
130+
and query._left_feature_group.storage_connector
131+
and query._left_feature_group.storage_connector.type == StorageConnector.S3
132+
)
133+
supported = hudi_no_time_travel or supported_connector or delta_s3
128134
for j in query._joins:
129135
supported &= _is_query_supported_rec(j._query)
130136
return supported
@@ -549,6 +555,7 @@ def enabled_on_cluster(self) -> bool:
549555
def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases):
550556
connector = {}
551557
if isinstance(fg, feature_group.ExternalFeatureGroup):
558+
connector["time_travel_type"] = None
552559
connector["type"] = fg.storage_connector.type
553560
connector["options"] = fg.storage_connector.connector_options()
554561
connector["query"] = fg.query[:-1] if fg.query.endswith(";") else fg.query
@@ -566,8 +573,25 @@ def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases):
566573
connector["filters"] = _serialize_filter_expression(
567574
join_obj._query._filter, join_obj._query, True
568575
)
576+
elif fg.time_travel_format == "DELTA":
577+
connector["time_travel_type"] = "delta"
578+
connector["type"] = fg.storage_connector.type
579+
connector["options"] = fg.storage_connector.connector_options()
580+
if fg.storage_connector.type == StorageConnector.S3:
581+
connector["options"]["path"] = fg.location
582+
connector["query"] = ""
583+
if query._left_feature_group == fg:
584+
connector["filters"] = _serialize_filter_expression(
585+
query._filter, query, True
586+
)
587+
else:
588+
for join_obj in query._joins:
589+
if join_obj._query._left_feature_group == fg:
590+
connector["filters"] = _serialize_filter_expression(
591+
join_obj._query._filter, join_obj._query, True
592+
)
569593
else:
570-
connector["type"] = "hudi"
594+
connector["time_travel_type"] = "hudi"
571595
return connector
572596

573597

0 commit comments

Comments
 (0)