Skip to content

Commit 1004fe5

Browse files
committed
[FSTORE-1564] Managed feature group delta deltastreamer (logicalclocks#359)
1 parent c9d6d74 commit 1004fe5

File tree

22 files changed

+517
-280
lines changed

22 files changed

+517
-280
lines changed

java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
160160
@Override
161161
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
162162
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled,
163-
StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig)
163+
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig)
164164
throws IOException, FeatureStoreException {
165165
throw new UnsupportedOperationException("Not supported for Beam");
166166
}

java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717

1818
package com.logicalclocks.hsfs.beam;
1919

20+
import java.io.IOException;
21+
import java.text.ParseException;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.stream.Collectors;
25+
26+
import org.apache.beam.sdk.values.PCollection;
27+
2028
import com.logicalclocks.hsfs.Feature;
2129
import com.logicalclocks.hsfs.FeatureGroupBase;
2230
import com.logicalclocks.hsfs.FeatureStoreException;
@@ -26,19 +34,14 @@
2634
import com.logicalclocks.hsfs.StatisticsConfig;
2735
import com.logicalclocks.hsfs.Storage;
2836
import com.logicalclocks.hsfs.StorageConnector;
29-
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
37+
import com.logicalclocks.hsfs.TimeTravelFormat;
3038
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
39+
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
3140
import com.logicalclocks.hsfs.constructor.QueryBase;
3241
import com.logicalclocks.hsfs.metadata.Statistics;
42+
3343
import lombok.Builder;
3444
import lombok.NonNull;
35-
import org.apache.beam.sdk.values.PCollection;
36-
37-
import java.io.IOException;
38-
import java.text.ParseException;
39-
import java.util.List;
40-
import java.util.Map;
41-
import java.util.stream.Collectors;
4245

4346
public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
4447

@@ -48,8 +51,9 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
4851
@Builder
4952
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
5053
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
51-
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
52-
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
54+
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
55+
StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime,
56+
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
5357
this();
5458
this.featureStore = featureStore;
5559
this.name = name;
@@ -61,6 +65,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
6165
? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
6266
this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
6367
this.onlineEnabled = onlineEnabled;
68+
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
6469
this.features = features;
6570
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
6671
this.onlineTopicName = onlineTopicName;

java/flink/src/main/java/com/logicalclocks/hsfs/flink/FeatureStore.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
165165
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
166166
List<String> primaryKeys, List<String> partitionKeys,
167167
String hudiPrecombineKey, boolean onlineEnabled,
168-
StatisticsConfig statisticsConfig, String eventTime,
169-
OnlineConfig onlineConfig)
168+
TimeTravelFormat timeTravelFormat,
169+
StatisticsConfig statisticsConfig,
170+
String eventTime, OnlineConfig onlineConfig)
170171
throws IOException, FeatureStoreException {
171172
throw new UnsupportedOperationException("Not supported for Flink");
172173
}

java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java

+15-13
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717

1818
package com.logicalclocks.hsfs.flink;
1919

20+
import java.io.IOException;
21+
import java.text.ParseException;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.stream.Collectors;
25+
26+
import org.apache.flink.streaming.api.datastream.DataStream;
27+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
28+
2029
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2130
import com.logicalclocks.hsfs.Feature;
2231
import com.logicalclocks.hsfs.FeatureGroupBase;
@@ -27,22 +36,14 @@
2736
import com.logicalclocks.hsfs.StatisticsConfig;
2837
import com.logicalclocks.hsfs.Storage;
2938
import com.logicalclocks.hsfs.StorageConnector;
39+
import com.logicalclocks.hsfs.TimeTravelFormat;
3040
import com.logicalclocks.hsfs.constructor.QueryBase;
31-
41+
import com.logicalclocks.hsfs.flink.engine.FeatureGroupEngine;
3242
import com.logicalclocks.hsfs.metadata.Statistics;
3343

34-
import com.logicalclocks.hsfs.flink.engine.FeatureGroupEngine;
3544
import lombok.AllArgsConstructor;
3645
import lombok.Builder;
3746
import lombok.NonNull;
38-
import org.apache.flink.streaming.api.datastream.DataStream;
39-
import org.apache.flink.streaming.api.datastream.DataStreamSink;
40-
41-
import java.io.IOException;
42-
import java.text.ParseException;
43-
import java.util.List;
44-
import java.util.Map;
45-
import java.util.stream.Collectors;
4647

4748
@AllArgsConstructor
4849
@JsonIgnoreProperties(ignoreUnknown = true)
@@ -53,9 +54,9 @@ public class StreamFeatureGroup extends FeatureGroupBase<DataStream<?>> {
5354
@Builder
5455
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
5556
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
56-
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
57-
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
58-
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
57+
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
58+
StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName,
59+
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
5960
this();
6061
this.featureStore = featureStore;
6162
this.name = name;
@@ -67,6 +68,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
6768
? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
6869
this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
6970
this.onlineEnabled = onlineEnabled;
71+
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
7072
this.features = features;
7173
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
7274
this.onlineTopicName = onlineTopicName;

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ public abstract Object getOrCreateStreamFeatureGroup(String name, Integer versio
122122
public abstract Object getOrCreateStreamFeatureGroup(String name, Integer version, String description,
123123
List<String> primaryKeys, List<String> partitionKeys,
124124
String hudiPrecombineKey, boolean onlineEnabled,
125-
StatisticsConfig statisticsConfig, String eventTime,
126-
OnlineConfig onlineConfig)
125+
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig,
126+
String eventTime, OnlineConfig onlineConfig)
127127
throws IOException, FeatureStoreException;
128128

129129
public abstract Object createExternalFeatureGroup();

java/hsfs/src/main/java/com/logicalclocks/hsfs/TimeTravelFormat.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@
1919

2020
public enum TimeTravelFormat {
2121
NONE,
22-
HUDI
22+
HUDI,
23+
DELTA
2324
}

java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureStore.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ public StreamFeatureGroup.StreamFeatureGroupBuilder createStreamFeatureGroup() {
404404
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version)
405405
throws IOException, FeatureStoreException {
406406
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
407-
null, null, null, false, null, null, null);
407+
null, null, null, false, TimeTravelFormat.HUDI, null, null, null);
408408
}
409409

410410
/**
@@ -438,7 +438,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
438438
boolean onlineEnabled, String eventTime)
439439
throws IOException, FeatureStoreException {
440440
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
441-
primaryKeys, null, null, onlineEnabled, null, eventTime, null);
441+
primaryKeys, null, null, onlineEnabled, TimeTravelFormat.HUDI, null, eventTime, null);
442442
}
443443

444444
/**
@@ -477,7 +477,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
477477

478478

479479
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
480-
primaryKeys, partitionKeys, null, onlineEnabled, null, eventTime, null);
480+
primaryKeys, partitionKeys, null, onlineEnabled, TimeTravelFormat.HUDI, null, eventTime, null);
481481
}
482482

483483
/**
@@ -506,6 +506,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
506506
* the first primary key of the feature group will be used as hudi precombine key.
507507
* @param onlineEnabled Define whether the feature group should be made available also in the online feature store
508508
* for low latency access.
509+
* @param timeTravelFormat Format used for time travel, defaults to `"HUDI"`.
509510
* @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for
510511
* this feature group, `"correlations`" to turn on feature correlation computation,
511512
* `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute
@@ -523,13 +524,14 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
523524
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
524525
List<String> primaryKeys, List<String> partitionKeys,
525526
String hudiPrecombineKey, boolean onlineEnabled,
526-
StatisticsConfig statisticsConfig, String eventTime,
527-
OnlineConfig onlineConfig)
527+
TimeTravelFormat timeTravelFormat,
528+
StatisticsConfig statisticsConfig,
529+
String eventTime, OnlineConfig onlineConfig)
528530
throws IOException, FeatureStoreException {
529531

530532
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, description,
531-
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, statisticsConfig, eventTime,
532-
onlineConfig);
533+
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, timeTravelFormat,
534+
statisticsConfig, eventTime, onlineConfig);
533535
}
534536

535537
/**

java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java

+23-21
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,40 @@
1717

1818
package com.logicalclocks.hsfs.spark;
1919

20-
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
20+
import java.io.IOException;
21+
import java.text.ParseException;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.stream.Collectors;
2127

22-
import com.logicalclocks.hsfs.spark.constructor.Query;
23-
import com.logicalclocks.hsfs.spark.engine.FeatureGroupEngine;
24-
import com.logicalclocks.hsfs.spark.engine.StatisticsEngine;
28+
import org.apache.spark.sql.Dataset;
29+
import org.apache.spark.sql.Row;
30+
import org.apache.spark.sql.SaveMode;
31+
import org.apache.spark.sql.streaming.StreamingQuery;
32+
33+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2534
import com.logicalclocks.hsfs.EntityEndpointType;
2635
import com.logicalclocks.hsfs.Feature;
36+
import com.logicalclocks.hsfs.FeatureGroupBase;
2737
import com.logicalclocks.hsfs.FeatureStoreException;
2838
import com.logicalclocks.hsfs.HudiOperationType;
2939
import com.logicalclocks.hsfs.JobConfiguration;
3040
import com.logicalclocks.hsfs.OnlineConfig;
3141
import com.logicalclocks.hsfs.StatisticsConfig;
3242
import com.logicalclocks.hsfs.Storage;
3343
import com.logicalclocks.hsfs.StorageConnector;
34-
import com.logicalclocks.hsfs.FeatureGroupBase;
44+
import com.logicalclocks.hsfs.TimeTravelFormat;
3545
import com.logicalclocks.hsfs.metadata.Statistics;
46+
import com.logicalclocks.hsfs.spark.constructor.Query;
47+
import com.logicalclocks.hsfs.spark.engine.FeatureGroupEngine;
48+
import com.logicalclocks.hsfs.spark.engine.StatisticsEngine;
3649

3750
import lombok.AllArgsConstructor;
3851
import lombok.Builder;
3952
import lombok.NonNull;
4053

41-
import org.apache.spark.sql.Dataset;
42-
import org.apache.spark.sql.Row;
43-
import org.apache.spark.sql.SaveMode;
44-
import org.apache.spark.sql.streaming.StreamingQuery;
45-
46-
import java.io.IOException;
47-
import java.text.ParseException;
48-
import java.util.ArrayList;
49-
import java.util.Collections;
50-
import java.util.List;
51-
import java.util.Map;
52-
import java.util.stream.Collectors;
53-
5454
@AllArgsConstructor
5555
@JsonIgnoreProperties(ignoreUnknown = true)
5656
public class StreamFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
@@ -61,9 +61,10 @@ public class StreamFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
6161
@Builder
6262
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
6363
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
64-
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
65-
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
66-
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
64+
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
65+
StatisticsConfig statisticsConfig, String onlineTopicName, String topicName,
66+
String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
67+
StorageConnector storageConnector, String path) {
6768
this();
6869
this.featureStore = featureStore;
6970
this.name = name;
@@ -75,6 +76,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
7576
? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
7677
this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
7778
this.onlineEnabled = onlineEnabled;
79+
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
7880
this.features = features;
7981
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
8082
this.onlineTopicName = onlineTopicName;

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ public List<FeatureGroup> getFeatureGroups(FeatureStore featureStore, String fgN
364364
public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStore, String name, Integer version,
365365
String description, List<String> primaryKeys,
366366
List<String> partitionKeys, String hudiPrecombineKey,
367-
boolean onlineEnabled, StatisticsConfig statisticsConfig,
367+
boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
368+
StatisticsConfig statisticsConfig,
368369
String eventTime, OnlineConfig onlineConfig)
369370
throws IOException, FeatureStoreException {
370371
StreamFeatureGroup featureGroup;
@@ -381,6 +382,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStor
381382
.partitionKeys(partitionKeys)
382383
.hudiPrecombineKey(hudiPrecombineKey)
383384
.onlineEnabled(onlineEnabled)
385+
.timeTravelFormat(timeTravelFormat)
384386
.statisticsConfig(statisticsConfig)
385387
.eventTime(eventTime)
386388
.onlineConfig(onlineConfig)

java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.logicalclocks.hsfs.Feature;
2121
import com.logicalclocks.hsfs.FeatureStoreException;
2222
import com.logicalclocks.hsfs.Project;
23+
import com.logicalclocks.hsfs.TimeTravelFormat;
2324
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
2425
import com.logicalclocks.hsfs.FeatureGroupBase;
2526
import com.logicalclocks.hsfs.metadata.HopsworksClient;
@@ -67,7 +68,7 @@ public void testFeatureGroupPrimaryKey() {
6768

6869
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
6970
Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
70-
true, features, null, "onlineTopicName", null, null, null, null, null, null);
71+
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, null, null, null, null);
7172

7273
Exception pkException = assertThrows(FeatureStoreException.class, () -> {
7374
featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
@@ -93,7 +94,7 @@ public void testFeatureGroupEventTimeFeature() {
9394

9495
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
9596
Collections.singletonList("featureA"), null, null,
96-
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
97+
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
9798

9899
Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
99100
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
@@ -119,7 +120,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {
119120

120121
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
121122
Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
122-
true, features, null, "onlineTopicName", null, null, null, null, null, null);
123+
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, null, null, null, null);
123124

124125
Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
125126
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
@@ -164,7 +165,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce
164165

165166
StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
166167
Collections.singletonList("featureA"), null, null,
167-
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
168+
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
168169
featureGroup.featureGroupEngine = featureGroupEngine;
169170

170171
// Act

0 commit comments

Comments
 (0)