Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1008] add java client to hsfs #448

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
423b744
java engine
davitbzh Sep 4, 2023
9f21e7f
Featuregroup
davitbzh Sep 4, 2023
fc75a0b
remove abstact methods
davitbzh Sep 7, 2023
cc821c5
ken bug fix
davitbzh Sep 26, 2023
72b1798
handle null values for complex features
Jan 13, 2025
a451786
java_engine
Jan 15, 2025
a709877
Merge remote-tracking branch 'upstream/main' into java_engine
Jan 15, 2025
9b4e5b4
java write
Jan 20, 2025
0c7ca07
licence headers
Jan 20, 2025
1e54219
java write
Jan 21, 2025
ac0b1db
Copyright
Jan 26, 2025
cff0e25
fix rebase
Jan 27, 2025
e683026
headerMap
Jan 27, 2025
baf09be
headerMap
Jan 27, 2025
871fe0b
Merge remote-tracking branch 'upstream/main' into java_engine
Feb 12, 2025
217a02d
Merge remote-tracking branch 'upstream/main' into java_engine
Feb 17, 2025
dfdf692
Update MySQL JDBC driver name to avoid warning
SirOibaf Feb 12, 2025
b4c585b
Initialize Datumreader only when setting up the vector server
SirOibaf Feb 12, 2025
59a7c19
Parallelize retrieval of feature vectors (no batching)
SirOibaf Feb 13, 2025
2ce4b2c
Add unit tests
SirOibaf Feb 13, 2025
c40da60
rebase with 3.9 changes
Feb 17, 2025
fd7d658
[FSTORE-1008] add java client to hsfs (#1413)
davitbzh Feb 14, 2025
a6e0685
Add Unit Tests for VectorServer.java
SirOibaf Feb 14, 2025
06c6f3e
Add support for creating feature groups from Java client (#1418)
SirOibaf Feb 20, 2025
baa1936
cherry-pick
Mar 3, 2025
c1a7a9c
rebase
Mar 3, 2025
6f16243
Merge remote-tracking branch 'upstream/main' into java_engine
Mar 4, 2025
df3a6c5
docs
Mar 4, 2025
d349d01
numentries
Mar 5, 2025
3d505c4
Add POJO and GenericRecords support when inserting data from the Java…
SirOibaf Mar 6, 2025
66caf4a
ignore complexFieldSchema.getType() == Schema.Type.NULL
Mar 10, 2025
27d69d9
Initial work on supporting creating feature views
SirOibaf Mar 6, 2025
c37f249
Add getOrCreateFeatureView methods
SirOibaf Mar 6, 2025
ffa786c
Fix bug with deserialization of complex features with prefix
SirOibaf Mar 10, 2025
61c1d3b
unused import
Mar 11, 2025
011c30e
revert pojo to generic and remove getTransformation
Mar 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
193 changes: 2 additions & 191 deletions java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,67 +19,23 @@

import com.logicalclocks.hsfs.FeatureStoreBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDatasetBase;
import com.logicalclocks.hsfs.beam.constructor.Query;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.beam.engine.FeatureViewEngine;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import lombok.NonNull;

import java.io.IOException;
import java.util.List;

public class FeatureStore extends FeatureStoreBase<Query> {

private FeatureGroupEngine featureGroupEngine;
private FeatureViewEngine featureViewEngine;

public FeatureStore() {
storageConnectorApi = new StorageConnectorApi();
featureViewEngine = new FeatureViewEngine();
featureGroupEngine = new FeatureGroupEngine();
}

@Override
public Object createFeatureGroup() {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getFeatureGroups(@NonNull String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureGroup(String name, Integer version) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}


@Override
public Object getOrCreateFeatureGroup(String name, Integer version, List<String> primaryKeys,
boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureGroup(String name, Integer version, List<String> primaryKeys,
List<String> partitionKeys, boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureGroup(String name, Integer version, String description, List<String> primaryKeys,
List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

/**
* Get a stream feature group object from the feature store.
*
Expand Down Expand Up @@ -134,47 +90,6 @@ public StreamFeatureGroup getStreamFeatureGroup(@NonNull String name, @NonNull I
return featureGroupEngine.getStreamFeatureGroup(this, name, version);
}

@Override
public Object createStreamFeatureGroup() {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, List<String> primaryKeys,
boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, List<String> primaryKeys,
List<String> partitionKeys, boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled,
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object createExternalFeatureGroup() {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object createFeatureView() {
throw new UnsupportedOperationException("Not supported for Beam");
}

/**
* Get a feature view object from the selected feature store.
*
Expand Down Expand Up @@ -208,7 +123,7 @@ public FeatureView getFeatureView(@NonNull String name, @NonNull Integer version
* }
* </pre>
*
* @param name Name of the feature view.
* @param name Name of the feature view.
* @return FeatureView The feature view metadata object.
* @throws FeatureStoreException If unable to retrieve FeatureView from the feature store.
* @throws IOException Generic IO exception.
Expand All @@ -218,108 +133,4 @@ public FeatureView getFeatureView(String name) throws FeatureStoreException, IOE
+ DEFAULT_VERSION + "`.");
return getFeatureView(name, DEFAULT_VERSION);
}

@Override
public Object getOrCreateFeatureView(String name, Query query, Integer version)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureView(String name, Query query, Integer version, String description,
List<String> labels) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getExternalFeatureGroup(@NonNull String name, @NonNull Integer version)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getExternalFeatureGroup(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StorageConnector getStorageConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getHopsFsConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getExternalFeatureGroups(@NonNull String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object sql(String name) {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getJdbcConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getS3Connector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getRedshiftConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getSnowflakeConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getAdlsConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for beam");
}

@Override
public Object getKafkaConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getBigqueryConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOnlineStorageConnector() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getGcsConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public TrainingDatasetBase getTrainingDataset(@NonNull String name, @NonNull Integer version)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public TrainingDatasetBase getTrainingDataset(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainingDatasets(@NonNull String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}
}
127 changes: 0 additions & 127 deletions java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureView.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,136 +17,9 @@

package com.logicalclocks.hsfs.beam;

import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.FeatureViewBase;
import com.logicalclocks.hsfs.beam.constructor.Query;
import org.apache.beam.sdk.values.PCollection;

import java.io.IOException;
import java.text.ParseException;
import java.util.Map;

public class FeatureView extends FeatureViewBase<FeatureView, FeatureStore, Query, PCollection<Object>> {
@Override
public void addTag(String name, Object value) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Map<String, Object> getTags() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTag(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void deleteTag(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void addTrainingDatasetTag(Integer version, String name, Object value)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Map<String, Object> getTrainingDatasetTags(Integer version) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainingDatasetTag(Integer version, String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void deleteTrainingDatasetTag(Integer version, String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void delete() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void clean(FeatureStore featureStore, String featureViewName, Integer featureViewVersion)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public FeatureView update(FeatureView other) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public String getBatchQuery() throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public String getBatchQuery(String startTime, String endTime)
throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public PCollection<Object> getBatchData() throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public PCollection<Object> getBatchData(String startTime, String endTime)
throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public PCollection<Object> getBatchData(String startTime, String endTime, Map<String, String> readOptions)
throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainingData(Integer version, Map<String, String> readOptions)
throws IOException, FeatureStoreException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainTestSplit(Integer version, Map<String, String> readOptions)
throws IOException, FeatureStoreException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainValidationTestSplit(Integer version, Map<String, String> readOptions)
throws IOException, FeatureStoreException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void purgeTrainingData(Integer version) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void purgeAllTrainingData() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void deleteTrainingDataset(Integer version) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void deleteAllTrainingDatasets() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}
}
Loading
Loading