Skip to content

Commit 3117e4c

Browse files
committed
Revert "[FSTORE-1556] Managed feature group (#351)"
This reverts commit 5e04e8a.
1 parent 8820e25 commit 3117e4c

File tree

26 files changed

+130
-382
lines changed

26 files changed

+130
-382
lines changed

java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.logicalclocks.hsfs.OnlineConfig;
2626
import com.logicalclocks.hsfs.StatisticsConfig;
2727
import com.logicalclocks.hsfs.Storage;
28-
import com.logicalclocks.hsfs.StorageConnector;
2928
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
3029
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
3130
import com.logicalclocks.hsfs.constructor.QueryBase;
@@ -49,7 +48,7 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
4948
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
5049
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
5150
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
52-
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
51+
String eventTime, OnlineConfig onlineConfig) {
5352
this();
5453
this.featureStore = featureStore;
5554
this.name = name;
@@ -66,8 +65,6 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
6665
this.onlineTopicName = onlineTopicName;
6766
this.eventTime = eventTime;
6867
this.onlineConfig = onlineConfig;
69-
this.storageConnector = storageConnector;
70-
this.path = path;
7168
}
7269

7370
public StreamFeatureGroup() {

java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.logicalclocks.hsfs.OnlineConfig;
2727
import com.logicalclocks.hsfs.StatisticsConfig;
2828
import com.logicalclocks.hsfs.Storage;
29-
import com.logicalclocks.hsfs.StorageConnector;
3029
import com.logicalclocks.hsfs.constructor.QueryBase;
3130

3231
import com.logicalclocks.hsfs.metadata.Statistics;
@@ -55,7 +54,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
5554
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
5655
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
5756
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
58-
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
57+
OnlineConfig onlineConfig) {
5958
this();
6059
this.featureStore = featureStore;
6160
this.name = name;
@@ -74,8 +73,6 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
7473
this.notificationTopicName = notificationTopicName;
7574
this.eventTime = eventTime;
7675
this.onlineConfig = onlineConfig;
77-
this.storageConnector = storageConnector;
78-
this.path = path;
7976
}
8077

8178
public StreamFeatureGroup() {

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java

-8
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,6 @@ public abstract class FeatureGroupBase<T> {
132132
@Setter
133133
protected OnlineConfig onlineConfig;
134134

135-
@Getter
136-
@Setter
137-
protected StorageConnector storageConnector;
138-
139-
@Getter
140-
@Setter
141-
protected String path;
142-
143135
@JsonIgnore
144136
// These are only used in the client. In the server they are aggregated in the `features` field
145137
protected List<String> partitionKeys;

java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java

-3
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,6 @@ public static class S3Connector extends StorageConnector {
124124
@Getter @Setter
125125
protected String bucket;
126126

127-
@Getter @Setter
128-
protected String region;
129-
130127
@Getter @Setter
131128
protected String sessionToken;
132129

java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt
128128
}
129129

130130
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
131-
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();
131+
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();;
132132

133133
return commitTimeStamp;
134134
}

java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java

+8
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@
5757
@JsonIgnoreProperties(ignoreUnknown = true)
5858
public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
5959

60+
@Getter
61+
@Setter
62+
private StorageConnector storageConnector;
63+
6064
@Getter
6165
@Setter
6266
private String query;
@@ -65,6 +69,10 @@ public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
6569
@Setter
6670
private ExternalDataFormat dataFormat;
6771

72+
@Getter
73+
@Setter
74+
private String path;
75+
6876
@Getter
6977
@Setter
7078
private List<OnDemandOptions> options;

java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import com.logicalclocks.hsfs.OnlineConfig;
3232
import com.logicalclocks.hsfs.StatisticsConfig;
3333
import com.logicalclocks.hsfs.Storage;
34-
import com.logicalclocks.hsfs.StorageConnector;
3534
import com.logicalclocks.hsfs.TimeTravelFormat;
3635
import com.logicalclocks.hsfs.FeatureGroupBase;
3736
import com.logicalclocks.hsfs.metadata.Statistics;
@@ -65,8 +64,7 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
6564
String description, List<String> primaryKeys, List<String> partitionKeys,
6665
String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
6766
List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
68-
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
69-
StorageConnector storageConnector, String path) {
67+
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) {
7068
this();
7169
this.featureStore = featureStore;
7270
this.name = name;
@@ -87,8 +85,6 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
8785
this.notificationTopicName = notificationTopicName;
8886
this.eventTime = eventTime;
8987
this.onlineConfig = onlineConfig;
90-
this.storageConnector = storageConnector;
91-
this.path = path;
9288
}
9389

9490
public FeatureGroup() {

java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.logicalclocks.hsfs.OnlineConfig;
3131
import com.logicalclocks.hsfs.StatisticsConfig;
3232
import com.logicalclocks.hsfs.Storage;
33-
import com.logicalclocks.hsfs.StorageConnector;
3433
import com.logicalclocks.hsfs.FeatureGroupBase;
3534
import com.logicalclocks.hsfs.metadata.Statistics;
3635

@@ -63,7 +62,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
6362
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
6463
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
6564
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
66-
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
65+
OnlineConfig onlineConfig) {
6766
this();
6867
this.featureStore = featureStore;
6968
this.name = name;
@@ -82,8 +81,6 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
8281
this.notificationTopicName = notificationTopicName;
8382
this.eventTime = eventTime;
8483
this.onlineConfig = onlineConfig;
85-
this.storageConnector = storageConnector;
86-
this.path = path;
8784
}
8885

8986
public StreamFeatureGroup() {

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java

+4
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ public Dataset<Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand
219219
? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup),
220220
onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));
221221

222+
if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) {
223+
sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect();
224+
}
225+
222226
dataset.createOrReplaceTempView(alias);
223227
return dataset;
224228
}

java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() {
6767

6868
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
6969
Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
70-
true, features, null, "onlineTopicName", null, null, null, null, null, null);
70+
true, features, null, "onlineTopicName", null, null, null, null);
7171

7272
Exception pkException = assertThrows(FeatureStoreException.class, () -> {
7373
featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
74-
null, null, null, null, null);
74+
null, null, null, null, null);;;
7575
});
7676

7777
// Assert
@@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() {
9393

9494
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
9595
Collections.singletonList("featureA"), null, null,
96-
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
96+
true, features, null, "onlineTopicName", null, null, "eventTime", null);
9797

9898
Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
9999
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
100-
null, null, null, null, null);
100+
null, null, null, null, null);;;
101101
});
102102

103103
// Assert
@@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {
119119

120120
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
121121
Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
122-
true, features, null, "onlineTopicName", null, null, null, null, null, null);
122+
true, features, null, "onlineTopicName", null, null, null, null);
123123

124124
Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
125125
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
@@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce
164164

165165
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
166166
Collections.singletonList("featureA"), null, null,
167-
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
167+
true, features, null, "onlineTopicName", null, null, "eventTime", null);
168168
featureGroup.featureGroupEngine = featureGroupEngine;
169169

170170
// Act

python/hsfs/constructor/fs_query.py

+1-10
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ def __init__(
3535
expand: Optional[List[str]] = None,
3636
items: Optional[List[Dict[str, Any]]] = None,
3737
type: Optional[str] = None,
38-
delta_cached_feature_groups: Optional[List[Dict[str, Any]]] = None,
3938
**kwargs,
4039
) -> None:
4140
self._query = query
@@ -61,14 +60,6 @@ def __init__(
6160
else:
6261
self._hudi_cached_feature_groups = []
6362

64-
if delta_cached_feature_groups is not None:
65-
self._delta_cached_feature_groups = [
66-
hudi_feature_group_alias.HudiFeatureGroupAlias.from_response_json(fg)
67-
for fg in delta_cached_feature_groups
68-
]
69-
else:
70-
self._delta_cached_feature_groups = []
71-
7263
@classmethod
7364
def from_response_json(cls, json_dict: Dict[str, Any]) -> "FsQuery":
7465
json_decamelized = humps.decamelize(json_dict)
@@ -136,7 +127,7 @@ def register_delta_tables(
136127
feature_store_name: str,
137128
read_options: Optional[Dict[str, Any]],
138129
) -> None:
139-
for hudi_fg in self._delta_cached_feature_groups:
130+
for hudi_fg in self._hudi_cached_feature_groups:
140131
engine.get_instance().register_delta_temporary_table(
141132
hudi_fg, feature_store_id, feature_store_name, read_options
142133
)

python/hsfs/core/arrow_flight_client.py

+2-26
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,7 @@ def _is_query_supported_rec(query: query.Query):
124124
and query._left_feature_group.storage_connector.type
125125
in ArrowFlightClient.SUPPORTED_EXTERNAL_CONNECTORS
126126
)
127-
delta_s3 = (
128-
isinstance(query._left_feature_group, feature_group.FeatureGroup)
129-
and query._left_feature_group.time_travel_format == "DELTA"
130-
and query._left_feature_group.storage_connector
131-
and query._left_feature_group.storage_connector.type == StorageConnector.S3
132-
)
133-
supported = hudi_no_time_travel or supported_connector or delta_s3
127+
supported = hudi_no_time_travel or supported_connector
134128
for j in query._joins:
135129
supported &= _is_query_supported_rec(j._query)
136130
return supported
@@ -555,7 +549,6 @@ def enabled_on_cluster(self) -> bool:
555549
def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases):
556550
connector = {}
557551
if isinstance(fg, feature_group.ExternalFeatureGroup):
558-
connector["time_travel_type"] = None
559552
connector["type"] = fg.storage_connector.type
560553
connector["options"] = fg.storage_connector.connector_options()
561554
connector["query"] = fg.query[:-1] if fg.query.endswith(";") else fg.query
@@ -573,25 +566,8 @@ def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases):
573566
connector["filters"] = _serialize_filter_expression(
574567
join_obj._query._filter, join_obj._query, True
575568
)
576-
elif fg.time_travel_format == "DELTA":
577-
connector["time_travel_type"] = "delta"
578-
connector["type"] = fg.storage_connector.type
579-
connector["options"] = fg.storage_connector.connector_options()
580-
if fg.storage_connector.type == StorageConnector.S3:
581-
connector["options"]["path"] = fg.location
582-
connector["query"] = ""
583-
if query._left_feature_group == fg:
584-
connector["filters"] = _serialize_filter_expression(
585-
query._filter, query, True
586-
)
587-
else:
588-
for join_obj in query._joins:
589-
if join_obj._query._left_feature_group == fg:
590-
connector["filters"] = _serialize_filter_expression(
591-
join_obj._query._filter, join_obj._query, True
592-
)
593569
else:
594-
connector["time_travel_type"] = "hudi"
570+
connector["type"] = "hudi"
595571
return connector
596572

597573

python/hsfs/core/delta_engine.py

+8-19
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,10 @@ def save_delta_fg(self, dataset, write_options, validation_id=None):
5252
return self._feature_group_api.commit(self._feature_group, fg_commit)
5353

5454
def register_temporary_table(self, delta_fg_alias, read_options):
55-
location = self._feature_group.prepare_spark_location()
56-
5755
delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options)
5856
self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options(
5957
**delta_options
60-
).load(location).createOrReplaceTempView(
58+
).load(self._feature_group.location).createOrReplaceTempView(
6159
delta_fg_alias.alias
6260
)
6361

@@ -87,17 +85,15 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options):
8785
return delta_options
8886

8987
def delete_record(self, delete_df):
90-
location = self._feature_group.prepare_spark_location()
91-
9288
if not DeltaTable.isDeltaTable(
93-
self._spark_session, location
89+
self._spark_session, self._feature_group.location
9490
):
9591
raise FeatureStoreException(
9692
f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled "
9793
)
9894
else:
9995
fg_source_table = DeltaTable.forPath(
100-
self._spark_session, location
96+
self._spark_session, self._feature_group.location
10197
)
10298

10399
source_alias = (
@@ -113,18 +109,16 @@ def delete_record(self, delete_df):
113109
).whenMatchedDelete().execute()
114110

115111
fg_commit = self._get_last_commit_metadata(
116-
self._spark_session, location
112+
self._spark_session, self._feature_group.location
117113
)
118114
return self._feature_group_api.commit(self._feature_group, fg_commit)
119115

120116
def _write_delta_dataset(self, dataset, write_options):
121-
location = self._feature_group.prepare_spark_location()
122-
123117
if write_options is None:
124118
write_options = {}
125119

126120
if not DeltaTable.isDeltaTable(
127-
self._spark_session, location
121+
self._spark_session, self._feature_group.location
128122
):
129123
(
130124
dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT)
@@ -135,11 +129,11 @@ def _write_delta_dataset(self, dataset, write_options):
135129
else []
136130
)
137131
.mode("append")
138-
.save(location)
132+
.save(self._feature_group.location)
139133
)
140134
else:
141135
fg_source_table = DeltaTable.forPath(
142-
self._spark_session, location
136+
self._spark_session, self._feature_group.location
143137
)
144138

145139
source_alias = (
@@ -155,14 +149,9 @@ def _write_delta_dataset(self, dataset, write_options):
155149
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
156150

157151
return self._get_last_commit_metadata(
158-
self._spark_session, location
152+
self._spark_session, self._feature_group.location
159153
)
160154

161-
def vacuum(self, retention_hours: int):
162-
location = self._feature_group.prepare_spark_location()
163-
retention = f"RETAIN {retention_hours} HOURS" if retention_hours is not None else ""
164-
self._spark_session.sql(f"VACUUM '{location}' {retention}")
165-
166155
def _generate_merge_query(self, source_alias, updates_alias):
167156
merge_query_list = []
168157
primary_key = self._feature_group.primary_key

0 commit comments

Comments
 (0)