Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1008] add java client to hsfs #448

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
423b744
java engine
davitbzh Sep 4, 2023
9f21e7f
Featuregroup
davitbzh Sep 4, 2023
fc75a0b
remove abstact methods
davitbzh Sep 7, 2023
cc821c5
ken bug fix
davitbzh Sep 26, 2023
72b1798
handle null values for complex features
Jan 13, 2025
a451786
java_engine
Jan 15, 2025
a709877
Merge remote-tracking branch 'upstream/main' into java_engine
Jan 15, 2025
9b4e5b4
java write
Jan 20, 2025
0c7ca07
licence headers
Jan 20, 2025
1e54219
java write
Jan 21, 2025
ac0b1db
Copyright
Jan 26, 2025
cff0e25
fix rebase
Jan 27, 2025
e683026
headerMap
Jan 27, 2025
baf09be
headerMap
Jan 27, 2025
871fe0b
Merge remote-tracking branch 'upstream/main' into java_engine
Feb 12, 2025
217a02d
Merge remote-tracking branch 'upstream/main' into java_engine
Feb 17, 2025
dfdf692
Update MySQL JDBC driver name to avoid warning
SirOibaf Feb 12, 2025
b4c585b
Initialize Datumreader only when setting up the vector server
SirOibaf Feb 12, 2025
59a7c19
Parallelize retrieval of feature vectors (no batching)
SirOibaf Feb 13, 2025
2ce4b2c
Add unit tests
SirOibaf Feb 13, 2025
c40da60
rebase with 3.9 changes
Feb 17, 2025
fd7d658
[FSTORE-1008] add java client to hsfs (#1413)
davitbzh Feb 14, 2025
a6e0685
Add Unit Tests for VectorServer.java
SirOibaf Feb 14, 2025
06c6f3e
Add support for creating feature groups from Java client (#1418)
SirOibaf Feb 20, 2025
baa1936
cherry-pick
Mar 3, 2025
c1a7a9c
rebase
Mar 3, 2025
6f16243
Merge remote-tracking branch 'upstream/main' into java_engine
Mar 4, 2025
df3a6c5
docs
Mar 4, 2025
d349d01
numentries
Mar 5, 2025
3d505c4
Add POJO and GenericRecords support when inserting data from the Java…
SirOibaf Mar 6, 2025
66caf4a
ignore complexFieldSchema.getType() == Schema.Type.NULL
Mar 10, 2025
27d69d9
Initial work on supporting creating feature views
SirOibaf Mar 6, 2025
c37f249
Add getOrCreateFeatureView methods
SirOibaf Mar 6, 2025
ffa786c
Fix bug with deserialization of complex features with prefix
SirOibaf Mar 10, 2025
61c1d3b
unused import
Mar 11, 2025
011c30e
revert pojo to generic and remove getTransformation
Mar 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
306 changes: 128 additions & 178 deletions java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package com.logicalclocks.hsfs.beam;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureStoreBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDatasetBase;
import com.logicalclocks.hsfs.beam.constructor.Query;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.beam.engine.FeatureViewEngine;
Expand All @@ -39,45 +39,93 @@ public class FeatureStore extends FeatureStoreBase<Query> {
private FeatureViewEngine featureViewEngine;

public FeatureStore() {
storageConnectorApi = new StorageConnectorApi();
featureViewEngine = new FeatureViewEngine();
featureGroupEngine = new FeatureGroupEngine();
storageConnectorApi = new StorageConnectorApi();
}

@Override
public Object createFeatureGroup() {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getFeatureGroups(@NonNull String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureGroup(String name, Integer version) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}


@Override
public Object getOrCreateFeatureGroup(String name, Integer version, List<String> primaryKeys,
boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureGroup(String name, Integer version, List<String> primaryKeys,
List<String> partitionKeys, boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureGroup(String name, Integer version, String description, List<String> primaryKeys,
List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
/**
* Create a feature group builder object.
*
* <pre>
* {@code
* // get feature store handle
* FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
*
* // create feature group metadata object
* StreamFeatureGroup streamFeatureGroup = fs.createStreamFeatureGroup()
* .name("documentation_example")
* .version(1)
* .primaryKeys(Collections.singletonList("pk"))
* .eventTime("event_time")
* .onlineEnabled(true)
* .features(features)
* .build();
*
* // save the feature group metadata object on the feature store
* streamFeatureGroup.save()
* }
* </pre>
*
* @return StreamFeatureGroup.StreamFeatureGroupBuilder a StreamFeatureGroup builder object.
*/
public StreamFeatureGroup.StreamFeatureGroupBuilder createStreamFeatureGroup() {
return StreamFeatureGroup.builder().featureStore(this);
}

@Override
public StreamFeatureGroup createStreamFeatureGroup(@NonNull String name,
Integer version,
String description,
Boolean onlineEnabled,
TimeTravelFormat timeTravelFormat,
List<String> primaryKeys,
List<String> partitionKeys,
String eventTime,
String hudiPrecombineKey,
List<Feature> features,
StatisticsConfig statisticsConfig,
StorageConnector storageConnector,
String path) {

return new StreamFeatureGroup.StreamFeatureGroupBuilder()
.featureStore(this)
.name(name)
.version(version)
.description(description)
.onlineEnabled(onlineEnabled)
.timeTravelFormat(timeTravelFormat)
.primaryKeys(primaryKeys)
.partitionKeys(partitionKeys)
.eventTime(eventTime)
.hudiPrecombineKey(hudiPrecombineKey)
.features(features)
.statisticsConfig(statisticsConfig)
.storageConnector(storageConnector)
.path(path)
.build();
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(@NonNull String name,
Integer version,
String description,
Boolean onlineEnabled,
TimeTravelFormat timeTravelFormat,
List<String> primaryKeys,
List<String> partitionKeys,
String eventTime,
String hudiPrecombineKey,
List<Feature> features,
StatisticsConfig statisticsConfig,
StorageConnector storageConnector,
String path,
OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {

return featureGroupEngine.getOrCreateFeatureGroup(this, name, version, description, onlineEnabled,
timeTravelFormat, primaryKeys, partitionKeys, eventTime, hudiPrecombineKey, features, statisticsConfig,
storageConnector, path, onlineConfig);
}

/**
Expand Down Expand Up @@ -134,47 +182,6 @@ public StreamFeatureGroup getStreamFeatureGroup(@NonNull String name, @NonNull I
return featureGroupEngine.getStreamFeatureGroup(this, name, version);
}

@Override
public Object createStreamFeatureGroup() {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, List<String> primaryKeys,
boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, List<String> primaryKeys,
List<String> partitionKeys, boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled,
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object createExternalFeatureGroup() {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object createFeatureView() {
throw new UnsupportedOperationException("Not supported for Beam");
}

/**
* Get a feature view object from the selected feature store.
*
Expand Down Expand Up @@ -208,7 +215,7 @@ public FeatureView getFeatureView(@NonNull String name, @NonNull Integer version
* }
* </pre>
*
* @param name Name of the feature view.
* @param name Name of the feature view.
* @return FeatureView The feature view metadata object.
* @throws FeatureStoreException If unable to retrieve FeatureView from the feature store.
* @throws IOException Generic IO exception.
Expand All @@ -219,107 +226,50 @@ public FeatureView getFeatureView(String name) throws FeatureStoreException, IOE
return getFeatureView(name, DEFAULT_VERSION);
}

@Override
public Object getOrCreateFeatureView(String name, Query query, Integer version)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureView(String name, Query query, Integer version, String description,
List<String> labels) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getExternalFeatureGroup(@NonNull String name, @NonNull Integer version)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getExternalFeatureGroup(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StorageConnector getStorageConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getHopsFsConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getExternalFeatureGroups(@NonNull String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object sql(String name) {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getJdbcConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getS3Connector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getRedshiftConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getSnowflakeConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getAdlsConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for beam");
}

@Override
public Object getKafkaConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getBigqueryConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOnlineStorageConnector() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getGcsConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
/**
* Create a new feature view metadata object.
*
* <pre>
* {@code
* // get feature store handle
* FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
* FeatureView fv = fs.createFeatureView
* .name("fv_name")
* .version(1)
* .query(query)
* .build() // The build method also save the feature view metadata to Hopsworks
* }
* </pre>
*
* @return FeatureView.FeatureViewBuilder Feature View Builder object to build the feature view metadata object
* @throws FeatureStoreException If unable to retrieve FeatureView from the feature store.
* @throws IOException Generic IO exception.
*/
public FeatureView.FeatureViewBuilder createFeatureView() {
return new FeatureView.FeatureViewBuilder(this);
}

@Override
public TrainingDatasetBase getTrainingDataset(@NonNull String name, @NonNull Integer version)
/**
* Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update
* existing feature view metadata.
*
* <pre>
* {@code
* // get feature store handle
* FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
* FeatureView fv = fs.getOrCreateFeatureView("fv_name", query, 1);
* }
* </pre>
*
* @param name Name of the feature view.
* @param query Query object.
* @param version Version of the feature view.
* @return FeatureView The feature view metadata object.
* @throws FeatureStoreException If unable to retrieve FeatureView from the feature store.
* @throws IOException Generic IO exception.
*/
public FeatureView getOrCreateFeatureView(String name, Query query, Integer version)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public TrainingDatasetBase getTrainingDataset(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainingDatasets(@NonNull String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
return featureViewEngine.getOrCreateFeatureView(this, name, version, query, null, null);
}
}
Loading
Loading