Skip to content

Commit 5270a2c

Browse files
authored
[FSTORE-1537] Managed feature group metadata changes (#346)
1 parent 147af3e commit 5270a2c

File tree

20 files changed

+235
-119
lines changed

20 files changed

+235
-119
lines changed

java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.logicalclocks.hsfs.OnlineConfig;
2626
import com.logicalclocks.hsfs.StatisticsConfig;
2727
import com.logicalclocks.hsfs.Storage;
28+
import com.logicalclocks.hsfs.StorageConnector;
2829
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
2930
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
3031
import com.logicalclocks.hsfs.constructor.QueryBase;
@@ -48,7 +49,7 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
4849
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
4950
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
5051
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
51-
String eventTime, OnlineConfig onlineConfig) {
52+
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
5253
this();
5354
this.featureStore = featureStore;
5455
this.name = name;
@@ -65,6 +66,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
6566
this.onlineTopicName = onlineTopicName;
6667
this.eventTime = eventTime;
6768
this.onlineConfig = onlineConfig;
69+
this.storageConnector = storageConnector;
70+
this.path = path;
6871
}
6972

7073
public StreamFeatureGroup() {

java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.logicalclocks.hsfs.OnlineConfig;
2727
import com.logicalclocks.hsfs.StatisticsConfig;
2828
import com.logicalclocks.hsfs.Storage;
29+
import com.logicalclocks.hsfs.StorageConnector;
2930
import com.logicalclocks.hsfs.constructor.QueryBase;
3031

3132
import com.logicalclocks.hsfs.metadata.Statistics;
@@ -54,7 +55,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
5455
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
5556
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
5657
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
57-
OnlineConfig onlineConfig) {
58+
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
5859
this();
5960
this.featureStore = featureStore;
6061
this.name = name;
@@ -73,6 +74,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
7374
this.notificationTopicName = notificationTopicName;
7475
this.eventTime = eventTime;
7576
this.onlineConfig = onlineConfig;
77+
this.storageConnector = storageConnector;
78+
this.path = path;
7679
}
7780

7881
public StreamFeatureGroup() {

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java

+8
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,14 @@ public abstract class FeatureGroupBase<T> {
132132
@Setter
133133
protected OnlineConfig onlineConfig;
134134

135+
@Getter
136+
@Setter
137+
protected StorageConnector storageConnector;
138+
139+
@Getter
140+
@Setter
141+
protected String path;
142+
135143
@JsonIgnore
136144
// These are only used in the client. In the server they are aggregated in the `features` field
137145
protected List<String> partitionKeys;

java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java

+3
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ public static class S3Connector extends StorageConnector {
124124
@Getter @Setter
125125
protected String bucket;
126126

127+
@Getter @Setter
128+
protected String region;
129+
127130
@Getter @Setter
128131
protected String sessionToken;
129132

java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt
128128
}
129129

130130
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
131-
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();;
131+
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();
132132

133133
return commitTimeStamp;
134134
}

java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java

-8
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@
5757
@JsonIgnoreProperties(ignoreUnknown = true)
5858
public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
5959

60-
@Getter
61-
@Setter
62-
private StorageConnector storageConnector;
63-
6460
@Getter
6561
@Setter
6662
private String query;
@@ -69,10 +65,6 @@ public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
6965
@Setter
7066
private ExternalDataFormat dataFormat;
7167

72-
@Getter
73-
@Setter
74-
private String path;
75-
7668
@Getter
7769
@Setter
7870
private List<OnDemandOptions> options;

java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.logicalclocks.hsfs.OnlineConfig;
3232
import com.logicalclocks.hsfs.StatisticsConfig;
3333
import com.logicalclocks.hsfs.Storage;
34+
import com.logicalclocks.hsfs.StorageConnector;
3435
import com.logicalclocks.hsfs.TimeTravelFormat;
3536
import com.logicalclocks.hsfs.FeatureGroupBase;
3637
import com.logicalclocks.hsfs.metadata.Statistics;
@@ -64,7 +65,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
6465
String description, List<String> primaryKeys, List<String> partitionKeys,
6566
String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
6667
List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
67-
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) {
68+
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
69+
StorageConnector storageConnector, String path) {
6870
this();
6971
this.featureStore = featureStore;
7072
this.name = name;
@@ -85,6 +87,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
8587
this.notificationTopicName = notificationTopicName;
8688
this.eventTime = eventTime;
8789
this.onlineConfig = onlineConfig;
90+
this.storageConnector = storageConnector;
91+
this.path = path;
8892
}
8993

9094
public FeatureGroup() {

java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.logicalclocks.hsfs.OnlineConfig;
3131
import com.logicalclocks.hsfs.StatisticsConfig;
3232
import com.logicalclocks.hsfs.Storage;
33+
import com.logicalclocks.hsfs.StorageConnector;
3334
import com.logicalclocks.hsfs.FeatureGroupBase;
3435
import com.logicalclocks.hsfs.metadata.Statistics;
3536

@@ -62,7 +63,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
6263
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
6364
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
6465
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
65-
OnlineConfig onlineConfig) {
66+
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
6667
this();
6768
this.featureStore = featureStore;
6869
this.name = name;
@@ -81,6 +82,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
8182
this.notificationTopicName = notificationTopicName;
8283
this.eventTime = eventTime;
8384
this.onlineConfig = onlineConfig;
85+
this.storageConnector = storageConnector;
86+
this.path = path;
8487
}
8588

8689
public StreamFeatureGroup() {

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java

-4
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,6 @@ public Dataset<Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand
219219
? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup),
220220
onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));
221221

222-
if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) {
223-
sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect();
224-
}
225-
226222
dataset.createOrReplaceTempView(alias);
227223
return dataset;
228224
}

java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() {
6767

6868
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
6969
Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
70-
true, features, null, "onlineTopicName", null, null, null, null);
70+
true, features, null, "onlineTopicName", null, null, null, null, null, null);
7171

7272
Exception pkException = assertThrows(FeatureStoreException.class, () -> {
7373
featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
74-
null, null, null, null, null);;;
74+
null, null, null, null, null);
7575
});
7676

7777
// Assert
@@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() {
9393

9494
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
9595
Collections.singletonList("featureA"), null, null,
96-
true, features, null, "onlineTopicName", null, null, "eventTime", null);
96+
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
9797

9898
Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
9999
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
100-
null, null, null, null, null);;;
100+
null, null, null, null, null);
101101
});
102102

103103
// Assert
@@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {
119119

120120
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
121121
Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
122-
true, features, null, "onlineTopicName", null, null, null, null);
122+
true, features, null, "onlineTopicName", null, null, null, null, null, null);
123123

124124
Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
125125
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
@@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce
164164

165165
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
166166
Collections.singletonList("featureA"), null, null,
167-
true, features, null, "onlineTopicName", null, null, "eventTime", null);
167+
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
168168
featureGroup.featureGroupEngine = featureGroupEngine;
169169

170170
// Act

python/hsfs/core/delta_engine.py

+23-8
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ def save_delta_fg(self, dataset, write_options, validation_id=None):
5252
return self._feature_group_api.commit(self._feature_group, fg_commit)
5353

5454
def register_temporary_table(self, delta_fg_alias, read_options):
55+
location = self._feature_group.prepare_spark_location()
56+
5557
delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options)
5658
self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options(
5759
**delta_options
58-
).load(self._feature_group.location).createOrReplaceTempView(
60+
).load(location).createOrReplaceTempView(
5961
delta_fg_alias.alias
6062
)
6163

@@ -85,15 +87,17 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options):
8587
return delta_options
8688

8789
def delete_record(self, delete_df):
90+
location = self._feature_group.prepare_spark_location()
91+
8892
if not DeltaTable.isDeltaTable(
89-
self._spark_session, self._feature_group.location
93+
self._spark_session, location
9094
):
9195
raise FeatureStoreException(
9296
f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled "
9397
)
9498
else:
9599
fg_source_table = DeltaTable.forPath(
96-
self._spark_session, self._feature_group.location
100+
self._spark_session, location
97101
)
98102

99103
source_alias = (
@@ -109,16 +113,18 @@ def delete_record(self, delete_df):
109113
).whenMatchedDelete().execute()
110114

111115
fg_commit = self._get_last_commit_metadata(
112-
self._spark_session, self._feature_group.location
116+
self._spark_session, location
113117
)
114118
return self._feature_group_api.commit(self._feature_group, fg_commit)
115119

116120
def _write_delta_dataset(self, dataset, write_options):
121+
location = self._feature_group.prepare_spark_location()
122+
117123
if write_options is None:
118124
write_options = {}
119125

120126
if not DeltaTable.isDeltaTable(
121-
self._spark_session, self._feature_group.location
127+
self._spark_session, location
122128
):
123129
(
124130
dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT)
@@ -129,11 +135,11 @@ def _write_delta_dataset(self, dataset, write_options):
129135
else []
130136
)
131137
.mode("append")
132-
.save(self._feature_group.location)
138+
.save(location)
133139
)
134140
else:
135141
fg_source_table = DeltaTable.forPath(
136-
self._spark_session, self._feature_group.location
142+
self._spark_session, location
137143
)
138144

139145
source_alias = (
@@ -149,9 +155,18 @@ def _write_delta_dataset(self, dataset, write_options):
149155
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
150156

151157
return self._get_last_commit_metadata(
152-
self._spark_session, self._feature_group.location
158+
self._spark_session, location
153159
)
154160

161+
def vacuum(self, retention_hours):
162+
location = self._feature_group.prepare_spark_location()
163+
164+
delta_table = DeltaTable.forPath(self._spark_session, location)
165+
166+
# Vacuum the table
167+
# https://docs.delta.io/1.0.1/api/python/index.html#delta.tables.DeltaTable.vacuum
168+
delta_table.vacuum(retention_hours)
169+
155170
def _generate_merge_query(self, source_alias, updates_alias):
156171
merge_query_list = []
157172
primary_key = self._feature_group.primary_key

python/hsfs/core/feature_group_engine.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,20 @@ def commit_delete(feature_group, delete_df, write_options):
246246
)
247247
return hudi_engine_instance.delete_record(delete_df, write_options)
248248

249+
@staticmethod
250+
def clean(feature_group, write_options):
251+
if feature_group.time_travel_format == "DELTA":
252+
delta_engine_instance = delta_engine.DeltaEngine(
253+
feature_group.feature_store_id,
254+
feature_group.feature_store_name,
255+
feature_group,
256+
engine.get_instance()._spark_session,
257+
engine.get_instance()._spark_context,
258+
)
259+
return delta_engine_instance.vacuum(write_options.get("retention_hours", None))
260+
else:
261+
return None
262+
249263
def sql(self, query, feature_store_name, dataframe_type, online, read_options):
250264
if online and self._online_conn is None:
251265
self._online_conn = self._storage_connector_api.get_online_connector(
@@ -285,7 +299,7 @@ def append_features(self, feature_group, new_features):
285299
if feature_group.time_travel_format == "DELTA":
286300
engine.get_instance().add_cols_to_delta_table(feature_group, new_features)
287301
else:
288-
engine.get_instance().save_empty_dataframe(feature_group)
302+
engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features)
289303

290304
def update_description(self, feature_group, description):
291305
"""Updates the description of a feature group."""

0 commit comments

Comments
 (0)