Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1537] Managed feature group metadata changes #346

Merged
merged 35 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
11ed246
init
bubriks Sep 11, 2024
ec6aa89
add region
bubriks Sep 11, 2024
6bc8e79
fix ruff
bubriks Sep 11, 2024
14363dc
Merge branch 'main' into FSTORE-1537
bubriks Sep 16, 2024
6b66989
temp
bubriks Sep 18, 2024
b3fa7e3
s3 -> s3a
bubriks Sep 18, 2024
07f3a35
time_travel_format
bubriks Sep 18, 2024
5d7dff6
s3 -> s3a
bubriks Sep 18, 2024
23670c7
s3 issue continues
bubriks Sep 18, 2024
8922c81
prepare_spark
bubriks Sep 18, 2024
6b1a53c
path fix?
bubriks Sep 18, 2024
150f836
reconcile_hudi_schema
bubriks Sep 18, 2024
d7ea12f
small undo
bubriks Sep 18, 2024
6db0d1c
remove textFile
bubriks Sep 19, 2024
600b726
register_temporary_table
bubriks Sep 19, 2024
d0738d4
add uri to FG
bubriks Sep 19, 2024
f6da8a3
hive_sync
bubriks Sep 19, 2024
d54c370
fix s3 replace
bubriks Sep 19, 2024
da9d1a4
fix ruff
bubriks Sep 19, 2024
4ea0141
fix some tests
bubriks Sep 19, 2024
91bea60
get_uri()
bubriks Sep 19, 2024
3447281
setup_storage_connector
bubriks Sep 19, 2024
3f048fe
read write from managed FG
bubriks Sep 19, 2024
993649e
Merge branch 'main' into FSTORE-1537
bubriks Sep 19, 2024
700e2a8
fix merge
bubriks Sep 19, 2024
59f2c98
test fix
bubriks Sep 19, 2024
9ddd618
small fixes
bubriks Sep 23, 2024
44d2f6f
fix reconcile and prepare_spark
bubriks Sep 23, 2024
8aeeb03
fix add_cols_to_delta_table
bubriks Sep 23, 2024
d852a34
fix schema evolution hudi
bubriks Sep 24, 2024
c92a1d5
reduce number of get_uri calls
bubriks Sep 24, 2024
4447e3b
get_uri -> prepare_spark_location
bubriks Sep 25, 2024
1cb68ff
Merge branch 'main' into FSTORE-1537
bubriks Sep 25, 2024
0303ad9
add clean fg (#7)
bubriks Sep 30, 2024
2a22b56
Merge branch 'main' into FSTORE-1537
bubriks Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
import com.logicalclocks.hsfs.constructor.QueryBase;
Expand All @@ -48,7 +49,7 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
String eventTime, OnlineConfig onlineConfig) {
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -65,6 +66,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.onlineTopicName = onlineTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public StreamFeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.constructor.QueryBase;

import com.logicalclocks.hsfs.metadata.Statistics;
Expand Down Expand Up @@ -54,7 +55,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig) {
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -73,6 +74,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public StreamFeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ public abstract class FeatureGroupBase<T> {
@Setter
protected OnlineConfig onlineConfig;

@Getter
@Setter
protected StorageConnector storageConnector;

@Getter
@Setter
protected String path;

@JsonIgnore
// These are only used in the client. In the server they are aggregated in the `features` field
protected List<String> partitionKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public static class S3Connector extends StorageConnector {
@Getter @Setter
protected String bucket;

@Getter @Setter
protected String region;

@Getter @Setter
protected String sessionToken;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt
}

SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();;
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();

return commitTimeStamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {

@Getter
@Setter
private StorageConnector storageConnector;

@Getter
@Setter
private String query;
Expand All @@ -69,10 +65,6 @@ public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
@Setter
private ExternalDataFormat dataFormat;

@Getter
@Setter
private String path;

@Getter
@Setter
private List<OnDemandOptions> options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.Statistics;
Expand Down Expand Up @@ -64,7 +65,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
String description, List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) {
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -85,6 +87,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public FeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.Statistics;

Expand Down Expand Up @@ -62,7 +63,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig) {
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -81,6 +82,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public StreamFeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,6 @@ public Dataset<Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand
? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup),
onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));

if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) {
sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect();
}

dataset.createOrReplaceTempView(alias);
return dataset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null);
true, features, null, "onlineTopicName", null, null, null, null, null, null);

Exception pkException = assertThrows(FeatureStoreException.class, () -> {
featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
null, null, null, null, null);;;
null, null, null, null, null);
});

// Assert
Expand All @@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null);
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);

Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
null, null, null, null, null);;;
null, null, null, null, null);
});

// Assert
Expand All @@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null);
true, features, null, "onlineTopicName", null, null, null, null, null, null);

Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null);
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
featureGroup.featureGroupEngine = featureGroupEngine;

// Act
Expand Down
31 changes: 23 additions & 8 deletions python/hsfs/core/delta_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ def save_delta_fg(self, dataset, write_options, validation_id=None):
return self._feature_group_api.commit(self._feature_group, fg_commit)

def register_temporary_table(self, delta_fg_alias, read_options):
location = self._feature_group.prepare_spark_location()

delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options)
self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options(
**delta_options
).load(self._feature_group.location).createOrReplaceTempView(
).load(location).createOrReplaceTempView(
delta_fg_alias.alias
)

Expand Down Expand Up @@ -85,15 +87,17 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options):
return delta_options

def delete_record(self, delete_df):
location = self._feature_group.prepare_spark_location()

if not DeltaTable.isDeltaTable(
self._spark_session, self._feature_group.location
self._spark_session, location
):
raise FeatureStoreException(
f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled "
)
else:
fg_source_table = DeltaTable.forPath(
self._spark_session, self._feature_group.location
self._spark_session, location
)

source_alias = (
Expand All @@ -109,16 +113,18 @@ def delete_record(self, delete_df):
).whenMatchedDelete().execute()

fg_commit = self._get_last_commit_metadata(
self._spark_session, self._feature_group.location
self._spark_session, location
)
return self._feature_group_api.commit(self._feature_group, fg_commit)

def _write_delta_dataset(self, dataset, write_options):
location = self._feature_group.prepare_spark_location()

if write_options is None:
write_options = {}

if not DeltaTable.isDeltaTable(
self._spark_session, self._feature_group.location
self._spark_session, location
):
(
dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT)
Expand All @@ -129,11 +135,11 @@ def _write_delta_dataset(self, dataset, write_options):
else []
)
.mode("append")
.save(self._feature_group.location)
.save(location)
)
else:
fg_source_table = DeltaTable.forPath(
self._spark_session, self._feature_group.location
self._spark_session, location
)

source_alias = (
Expand All @@ -149,9 +155,18 @@ def _write_delta_dataset(self, dataset, write_options):
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

return self._get_last_commit_metadata(
self._spark_session, self._feature_group.location
self._spark_session, location
)

def vacuum(self, retention_hours):
location = self._feature_group.prepare_spark_location()

delta_table = DeltaTable.forPath(self._spark_session, location)

# Vacuum the table
# https://docs.delta.io/1.0.1/api/python/index.html#delta.tables.DeltaTable.vacuum
delta_table.vacuum(retention_hours)

def _generate_merge_query(self, source_alias, updates_alias):
merge_query_list = []
primary_key = self._feature_group.primary_key
Expand Down
16 changes: 15 additions & 1 deletion python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,20 @@ def commit_delete(feature_group, delete_df, write_options):
)
return hudi_engine_instance.delete_record(delete_df, write_options)

@staticmethod
def clean(feature_group, write_options):
if feature_group.time_travel_format == "DELTA":
delta_engine_instance = delta_engine.DeltaEngine(
feature_group.feature_store_id,
feature_group.feature_store_name,
feature_group,
engine.get_instance()._spark_session,
engine.get_instance()._spark_context,
)
return delta_engine_instance.vacuum(write_options.get("retention_hours", None))
else:
return None

def sql(self, query, feature_store_name, dataframe_type, online, read_options):
if online and self._online_conn is None:
self._online_conn = self._storage_connector_api.get_online_connector(
Expand Down Expand Up @@ -285,7 +299,7 @@ def append_features(self, feature_group, new_features):
if feature_group.time_travel_format == "DELTA":
engine.get_instance().add_cols_to_delta_table(feature_group, new_features)
else:
engine.get_instance().save_empty_dataframe(feature_group)
engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features)

def update_description(self, feature_group, description):
"""Updates the description of a feature group."""
Expand Down
Loading
Loading