Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1564] Managed feature group delta deltastreamer #359

Merged
merged 41 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
71df144
_deserialize_from_avro (no tests fixed)
bubriks Oct 11, 2024
4ca58ff
dont limit to HUDI
bubriks Oct 11, 2024
97d6e35
hsfs utils changes
bubriks Oct 11, 2024
c744892
initial_check_point_string
bubriks Oct 14, 2024
0e92376
save offsets?
bubriks Oct 14, 2024
cf1c325
add filter to kafka
bubriks Oct 14, 2024
ddf72c5
initialCheckPointString fix
bubriks Oct 15, 2024
aba04f7
includeHeaders and limit fix
bubriks Oct 15, 2024
5a7c938
small fix
bubriks Oct 15, 2024
8692113
java client fix
bubriks Oct 15, 2024
afe70f4
ruff fix
bubriks Oct 15, 2024
969dde8
Merge branch 'main' into FSTORE-1564-new
bubriks Oct 15, 2024
32e49e7
some test fixes
bubriks Oct 15, 2024
e4eaa45
Merge branch 'main' into FSTORE-1564-new
bubriks Oct 15, 2024
2e52a99
small test fix
bubriks Oct 15, 2024
55bf758
continue test fix
bubriks Oct 15, 2024
d910f92
_time_travel_format test
bubriks Oct 15, 2024
ea8aef6
test_serialize_deserialize_avro
bubriks Oct 16, 2024
2135c5a
ruff fix
bubriks Oct 16, 2024
fd63b16
work on test
bubriks Oct 16, 2024
1e162b6
ruff fix
bubriks Oct 16, 2024
ba1fcac
try setting spark session
bubriks Oct 16, 2024
575c010
try more _spark_session
bubriks Oct 16, 2024
ec4b848
small changes
bubriks Oct 16, 2024
6057362
remove SparkSession.builder from tests
bubriks Oct 16, 2024
d40b2d1
remove unused import
bubriks Oct 16, 2024
f9a4553
Merge branch 'main' into FSTORE-1564-new
bubriks Oct 16, 2024
854eaff
Set spark jars
bubriks Oct 16, 2024
35933ef
tmp test fix
bubriks Oct 16, 2024
7cf4193
remove datetime
bubriks Oct 16, 2024
25d2f92
change location for saving kafka_offsets
bubriks Oct 16, 2024
dea2e25
python engine add_cols_to_delta_table no-op
bubriks Oct 17, 2024
714981e
update to append_features
bubriks Oct 18, 2024
34c6963
ruff fix
bubriks Oct 18, 2024
4b64f18
update_table_schema also for delta table
bubriks Oct 18, 2024
0a0a560
ruff fix
bubriks Oct 18, 2024
b859e16
small fix
bubriks Oct 18, 2024
774d5d6
update table schema job
bubriks Oct 21, 2024
317ece0
add startingOffsets log
bubriks Oct 21, 2024
15fd0ce
Merge branch 'main' into FSTORE-1564-new
bubriks Oct 29, 2024
542b096
MERGE FIX
bubriks Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig)
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package com.logicalclocks.hsfs.beam;

import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.beam.sdk.values.PCollection;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
Expand All @@ -26,19 +34,14 @@
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.constructor.QueryBase;
import com.logicalclocks.hsfs.metadata.Statistics;

import lombok.Builder;
import lombok.NonNull;
import org.apache.beam.sdk.values.PCollection;

import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {

Expand All @@ -48,8 +51,9 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
@Builder
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime,
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -61,6 +65,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
this.onlineEnabled = onlineEnabled;
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
this.features = features;
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
this.onlineTopicName = onlineTopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig, String eventTime,
OnlineConfig onlineConfig)
TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig,
String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Flink");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

package com.logicalclocks.hsfs.flink;

import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
Expand All @@ -27,22 +36,14 @@
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.constructor.QueryBase;

import com.logicalclocks.hsfs.flink.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.metadata.Statistics;

import com.logicalclocks.hsfs.flink.engine.FeatureGroupEngine;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NonNull;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;

import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -53,9 +54,9 @@ public class StreamFeatureGroup extends FeatureGroupBase<DataStream<?>> {
@Builder
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName,
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -67,6 +68,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
this.onlineEnabled = onlineEnabled;
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
this.features = features;
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
this.onlineTopicName = onlineTopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public abstract Object getOrCreateStreamFeatureGroup(String name, Integer versio
public abstract Object getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig, String eventTime,
OnlineConfig onlineConfig)
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig,
String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException;

public abstract Object createExternalFeatureGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@

public enum TimeTravelFormat {
NONE,
HUDI
HUDI,
DELTA
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public StreamFeatureGroup.StreamFeatureGroupBuilder createStreamFeatureGroup() {
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version)
throws IOException, FeatureStoreException {
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
null, null, null, false, null, null, null);
null, null, null, false, TimeTravelFormat.HUDI, null, null, null);
}

/**
Expand Down Expand Up @@ -438,7 +438,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
boolean onlineEnabled, String eventTime)
throws IOException, FeatureStoreException {
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
primaryKeys, null, null, onlineEnabled, null, eventTime, null);
primaryKeys, null, null, onlineEnabled, TimeTravelFormat.HUDI, null, eventTime, null);
}

/**
Expand Down Expand Up @@ -477,7 +477,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver


return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
primaryKeys, partitionKeys, null, onlineEnabled, null, eventTime, null);
primaryKeys, partitionKeys, null, onlineEnabled, TimeTravelFormat.HUDI, null, eventTime, null);
}

/**
Expand Down Expand Up @@ -506,6 +506,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
* the first primary key of the feature group will be used as hudi precombine key.
* @param onlineEnabled Define whether the feature group should be made available also in the online feature store
* for low latency access.
* @param timeTravelFormat Format used for time travel, defaults to `"HUDI"`.
* @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for
* this feature group, `"correlations`" to turn on feature correlation computation,
* `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute
Expand All @@ -523,13 +524,14 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig, String eventTime,
OnlineConfig onlineConfig)
TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig,
String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {

return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, description,
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, statisticsConfig, eventTime,
onlineConfig);
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, timeTravelFormat,
statisticsConfig, eventTime, onlineConfig);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,40 @@

package com.logicalclocks.hsfs.spark;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.logicalclocks.hsfs.spark.constructor.Query;
import com.logicalclocks.hsfs.spark.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.spark.engine.StatisticsEngine;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.StreamingQuery;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.hsfs.EntityEndpointType;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.JobConfiguration;
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.metadata.Statistics;
import com.logicalclocks.hsfs.spark.constructor.Query;
import com.logicalclocks.hsfs.spark.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.spark.engine.StatisticsEngine;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NonNull;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class StreamFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
Expand All @@ -61,9 +61,10 @@ public class StreamFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
@Builder
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
StatisticsConfig statisticsConfig, String onlineTopicName, String topicName,
String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -75,6 +76,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
this.onlineEnabled = onlineEnabled;
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
this.features = features;
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
this.onlineTopicName = onlineTopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ public List<FeatureGroup> getFeatureGroups(FeatureStore featureStore, String fgN
public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStore, String name, Integer version,
String description, List<String> primaryKeys,
List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, StatisticsConfig statisticsConfig,
boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig,
String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
StreamFeatureGroup featureGroup;
Expand All @@ -381,6 +382,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStor
.partitionKeys(partitionKeys)
.hudiPrecombineKey(hudiPrecombineKey)
.onlineEnabled(onlineEnabled)
.timeTravelFormat(timeTravelFormat)
.statisticsConfig(statisticsConfig)
.eventTime(eventTime)
.onlineConfig(onlineConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.Project;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void testFeatureGroupPrimaryKey() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null, null, null);
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, null, null, null, null);

Exception pkException = assertThrows(FeatureStoreException.class, () -> {
featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
Expand All @@ -93,7 +94,7 @@ public void testFeatureGroupEventTimeFeature() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);

Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
Expand All @@ -119,7 +120,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null, null, null);
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, null, null, null, null);

Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
Expand Down Expand Up @@ -164,7 +165,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
featureGroup.featureGroupEngine = featureGroupEngine;

// Act
Expand Down
Loading
Loading