Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test Actions #362

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
import com.logicalclocks.hsfs.constructor.QueryBase;
Expand All @@ -49,7 +48,7 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
String eventTime, OnlineConfig onlineConfig) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -66,8 +65,6 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.onlineTopicName = onlineTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public StreamFeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.constructor.QueryBase;

import com.logicalclocks.hsfs.metadata.Statistics;
Expand Down Expand Up @@ -55,7 +54,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
OnlineConfig onlineConfig) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -74,8 +73,6 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public StreamFeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,6 @@ public abstract class FeatureGroupBase<T> {
@Setter
protected OnlineConfig onlineConfig;

@Getter
@Setter
protected StorageConnector storageConnector;

@Getter
@Setter
protected String path;

@JsonIgnore
// These are only used in the client. In the server they are aggregated in the `features` field
protected List<String> partitionKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ public static class S3Connector extends StorageConnector {
@Getter @Setter
protected String bucket;

@Getter @Setter
protected String region;

@Getter @Setter
protected String sessionToken;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt
}

SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();;

return commitTimeStamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {

@Getter
@Setter
private StorageConnector storageConnector;

@Getter
@Setter
private String query;
Expand All @@ -65,6 +69,10 @@ public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
@Setter
private ExternalDataFormat dataFormat;

@Getter
@Setter
private String path;

@Getter
@Setter
private List<OnDemandOptions> options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.Statistics;
Expand Down Expand Up @@ -65,8 +64,7 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
String description, List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
StorageConnector storageConnector, String path) {
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -87,8 +85,6 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public FeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.Statistics;

Expand Down Expand Up @@ -63,7 +62,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
OnlineConfig onlineConfig) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -82,8 +81,6 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public StreamFeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ public Dataset<Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand
? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup),
onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));

if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) {
sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect();
}

dataset.createOrReplaceTempView(alias);
return dataset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null, null, null);
true, features, null, "onlineTopicName", null, null, null, null);

Exception pkException = assertThrows(FeatureStoreException.class, () -> {
featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
null, null, null, null, null);
null, null, null, null, null);;;
});

// Assert
Expand All @@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
true, features, null, "onlineTopicName", null, null, "eventTime", null);

Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
null, null, null, null, null);
null, null, null, null, null);;;
});

// Assert
Expand All @@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null, null, null);
true, features, null, "onlineTopicName", null, null, null, null);

Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
true, features, null, "onlineTopicName", null, null, "eventTime", null);
featureGroup.featureGroupEngine = featureGroupEngine;

// Act
Expand Down
11 changes: 1 addition & 10 deletions python/hsfs/constructor/fs_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def __init__(
expand: Optional[List[str]] = None,
items: Optional[List[Dict[str, Any]]] = None,
type: Optional[str] = None,
delta_cached_feature_groups: Optional[List[Dict[str, Any]]] = None,
**kwargs,
) -> None:
self._query = query
Expand All @@ -61,14 +60,6 @@ def __init__(
else:
self._hudi_cached_feature_groups = []

if delta_cached_feature_groups is not None:
self._delta_cached_feature_groups = [
hudi_feature_group_alias.HudiFeatureGroupAlias.from_response_json(fg)
for fg in delta_cached_feature_groups
]
else:
self._delta_cached_feature_groups = []

@classmethod
def from_response_json(cls, json_dict: Dict[str, Any]) -> "FsQuery":
json_decamelized = humps.decamelize(json_dict)
Expand Down Expand Up @@ -136,7 +127,7 @@ def register_delta_tables(
feature_store_name: str,
read_options: Optional[Dict[str, Any]],
) -> None:
for hudi_fg in self._delta_cached_feature_groups:
for hudi_fg in self._hudi_cached_feature_groups:
engine.get_instance().register_delta_temporary_table(
hudi_fg, feature_store_id, feature_store_name, read_options
)
28 changes: 2 additions & 26 deletions python/hsfs/core/arrow_flight_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,7 @@ def _is_query_supported_rec(query: query.Query):
and query._left_feature_group.storage_connector.type
in ArrowFlightClient.SUPPORTED_EXTERNAL_CONNECTORS
)
delta_s3 = (
isinstance(query._left_feature_group, feature_group.FeatureGroup)
and query._left_feature_group.time_travel_format == "DELTA"
and query._left_feature_group.storage_connector
and query._left_feature_group.storage_connector.type == StorageConnector.S3
)
supported = hudi_no_time_travel or supported_connector or delta_s3
supported = hudi_no_time_travel or supported_connector
for j in query._joins:
supported &= _is_query_supported_rec(j._query)
return supported
Expand Down Expand Up @@ -555,7 +549,6 @@ def enabled_on_cluster(self) -> bool:
def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases):
connector = {}
if isinstance(fg, feature_group.ExternalFeatureGroup):
connector["time_travel_type"] = None
connector["type"] = fg.storage_connector.type
connector["options"] = fg.storage_connector.connector_options()
connector["query"] = fg.query[:-1] if fg.query.endswith(";") else fg.query
Expand All @@ -573,25 +566,8 @@ def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases):
connector["filters"] = _serialize_filter_expression(
join_obj._query._filter, join_obj._query, True
)
elif fg.time_travel_format == "DELTA":
connector["time_travel_type"] = "delta"
connector["type"] = fg.storage_connector.type
connector["options"] = fg.storage_connector.connector_options()
if fg.storage_connector.type == StorageConnector.S3:
connector["options"]["path"] = fg.location
connector["query"] = ""
if query._left_feature_group == fg:
connector["filters"] = _serialize_filter_expression(
query._filter, query, True
)
else:
for join_obj in query._joins:
if join_obj._query._left_feature_group == fg:
connector["filters"] = _serialize_filter_expression(
join_obj._query._filter, join_obj._query, True
)
else:
connector["time_travel_type"] = "hudi"
connector["type"] = "hudi"
return connector


Expand Down
27 changes: 8 additions & 19 deletions python/hsfs/core/delta_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,10 @@ def save_delta_fg(self, dataset, write_options, validation_id=None):
return self._feature_group_api.commit(self._feature_group, fg_commit)

def register_temporary_table(self, delta_fg_alias, read_options):
location = self._feature_group.prepare_spark_location()

delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options)
self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options(
**delta_options
).load(location).createOrReplaceTempView(
).load(self._feature_group.location).createOrReplaceTempView(
delta_fg_alias.alias
)

Expand Down Expand Up @@ -87,17 +85,15 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options):
return delta_options

def delete_record(self, delete_df):
location = self._feature_group.prepare_spark_location()

if not DeltaTable.isDeltaTable(
self._spark_session, location
self._spark_session, self._feature_group.location
):
raise FeatureStoreException(
f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled "
)
else:
fg_source_table = DeltaTable.forPath(
self._spark_session, location
self._spark_session, self._feature_group.location
)

source_alias = (
Expand All @@ -113,18 +109,16 @@ def delete_record(self, delete_df):
).whenMatchedDelete().execute()

fg_commit = self._get_last_commit_metadata(
self._spark_session, location
self._spark_session, self._feature_group.location
)
return self._feature_group_api.commit(self._feature_group, fg_commit)

def _write_delta_dataset(self, dataset, write_options):
location = self._feature_group.prepare_spark_location()

if write_options is None:
write_options = {}

if not DeltaTable.isDeltaTable(
self._spark_session, location
self._spark_session, self._feature_group.location
):
(
dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT)
Expand All @@ -135,11 +129,11 @@ def _write_delta_dataset(self, dataset, write_options):
else []
)
.mode("append")
.save(location)
.save(self._feature_group.location)
)
else:
fg_source_table = DeltaTable.forPath(
self._spark_session, location
self._spark_session, self._feature_group.location
)

source_alias = (
Expand All @@ -155,14 +149,9 @@ def _write_delta_dataset(self, dataset, write_options):
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

return self._get_last_commit_metadata(
self._spark_session, location
self._spark_session, self._feature_group.location
)

def vacuum(self, retention_hours: int):
location = self._feature_group.prepare_spark_location()
retention = f"RETAIN {retention_hours} HOURS" if retention_hours is not None else ""
self._spark_session.sql(f"VACUUM '{location}' {retention}")

def _generate_merge_query(self, source_alias, updates_alias):
merge_query_list = []
primary_key = self._feature_group.primary_key
Expand Down
Loading
Loading