Skip to content

Commit c37f249

Browse files
SirOibafdavitbzh
authored and
davitbzh
committed
Add getOrCreateFeatureView methods
1 parent 27d69d9 commit c37f249

File tree

24 files changed

+375
-17
lines changed

24 files changed

+375
-17
lines changed

java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,4 +248,28 @@ public FeatureView getFeatureView(String name) throws FeatureStoreException, IOE
248248
public FeatureView.FeatureViewBuilder createFeatureView() {
249249
return new FeatureView.FeatureViewBuilder(this);
250250
}
251+
252+
/**
253+
* Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update
254+
* existing feature view metadata.
255+
*
256+
* <pre>
257+
* {@code
258+
* // get feature store handle
259+
* FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
260+
* FeatureView fv = fs.getOrCreateFeatureView("fv_name", query, 1);
261+
* }
262+
* </pre>
263+
*
264+
* @param name Name of the feature view.
265+
* @param query Query object.
266+
* @param version Version of the feature view.
267+
* @return FeatureView The feature view metadata object.
268+
* @throws FeatureStoreException If unable to retrieve FeatureView from the feature store.
269+
* @throws IOException Generic IO exception.
270+
*/
271+
public FeatureView getOrCreateFeatureView(String name, Query query, Integer version)
272+
throws FeatureStoreException, IOException {
273+
return featureViewEngine.getOrCreateFeatureView(this, name, version, query, null, null);
274+
}
251275
}

java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureView.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ public FeatureView.FeatureViewBuilder labels(List<String> labels) {
8484
public FeatureView build() throws FeatureStoreException, IOException {
8585
FeatureView
8686
featureView = new FeatureView(name, version, query, description, featureStore, labels);
87-
featureViewEngine.save(featureView, FeatureView.class);
88-
return featureView;
87+
return featureViewEngine.save(featureView, FeatureView.class);
8988
}
9089
}
9190

java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import com.logicalclocks.hsfs.FeatureStoreException;
2626
import com.logicalclocks.hsfs.JobConfiguration;
2727
import com.logicalclocks.hsfs.StorageConnector;
28+
import com.logicalclocks.hsfs.beam.constructor.Query;
29+
import org.apache.beam.sdk.values.PCollection;
30+
2831
import com.logicalclocks.hsfs.Feature;
2932
import com.logicalclocks.hsfs.FeatureGroupBase;
3033
import com.logicalclocks.hsfs.OnlineConfig;
@@ -154,4 +157,62 @@ public Object insertStream(PCollection<Object> featureData) throws Exception {
154157
public Object insertStream(PCollection<Object> featureData, Map<String, String> writeOptions) throws Exception {
155158
throw new UnsupportedOperationException("Not supported for Beam");
156159
}
160+
161+
/**
162+
* Select a subset of features of the feature group and return a query object. The query can be used to construct
163+
* joins of feature groups or create a feature view with a subset of features of the feature group.
164+
* @param features List of Feature meta data objects.
165+
* @return Query object.
166+
*/
167+
public Query selectFeatures(List<Feature> features) {
168+
return new Query(this, features);
169+
}
170+
171+
/**
172+
* Select a subset of features of the feature group and return a query object. The query can be used to construct
173+
* joins of feature groups or create a feature view with a subset of features of the feature group.
174+
* @param features List of Feature names.
175+
* @return Query object.
176+
*/
177+
public Query select(List<String> features) {
178+
// Create a feature object for each string feature given by the user.
179+
// For the query building each feature need only the name set.
180+
List<Feature> featureObjList = features.stream().map(Feature::new).collect(Collectors.toList());
181+
return selectFeatures(featureObjList);
182+
}
183+
184+
/**
185+
* Select all features of the feature group and return a query object. The query can be used to construct
186+
* joins of feature groups or create a feature view with a subset of features of the feature group.
187+
* @return Query object.
188+
*/
189+
public Query selectAll() {
190+
return new Query(this, getFeatures());
191+
}
192+
193+
/**
194+
* Select all features including primary key and event time feature of the feature group except provided `features`
195+
* and return a query object.
196+
* The query can be used to construct joins of feature groups or create a feature view with a subset of features of
197+
* the feature group.
198+
* @param features List of Feature meta data objects.
199+
* @return Query object.
200+
*/
201+
public Query selectExceptFeatures(List<Feature> features) {
202+
List<String> exceptFeatures = features.stream().map(Feature::getName).collect(Collectors.toList());
203+
return selectExcept(exceptFeatures);
204+
}
205+
206+
/**
207+
* Select all features including primary key and event time feature of the feature group except provided `features`
208+
* and return a query object.
209+
* The query can be used to construct joins of feature groups or create a feature view with a subset of features of
210+
* the feature group.
211+
* @param features List of Feature names.
212+
* @return Query object.
213+
*/
214+
public Query selectExcept(List<String> features) {
215+
return new Query(this,
216+
getFeatures().stream().filter(f -> !features.contains(f.getName())).collect(Collectors.toList()));
217+
}
157218
}

java/beam/src/main/java/com/logicalclocks/hsfs/beam/constructor/Query.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,21 @@
1717

1818
package com.logicalclocks.hsfs.beam.constructor;
1919

20+
import com.logicalclocks.hsfs.Feature;
21+
import com.logicalclocks.hsfs.FeatureGroupBase;
2022
import com.logicalclocks.hsfs.beam.StreamFeatureGroup;
2123
import com.logicalclocks.hsfs.constructor.QueryBase;
2224

25+
import lombok.NoArgsConstructor;
2326
import org.apache.beam.sdk.values.PCollection;
2427

28+
import java.util.List;
29+
30+
@NoArgsConstructor
2531
public class Query extends QueryBase<Query, StreamFeatureGroup, PCollection<Object>> {
32+
33+
public Query(FeatureGroupBase leftFeatureGroup, List<Feature> leftFeatures) {
34+
super(leftFeatureGroup, leftFeatures);
35+
}
36+
2637
}

java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/FeatureViewEngine.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.logicalclocks.hsfs.engine.FeatureViewEngineBase;
2626
import org.apache.beam.sdk.values.PCollection;
2727
import java.io.IOException;
28+
import java.util.List;
2829

2930
public class FeatureViewEngine extends FeatureViewEngineBase<Query, FeatureView, FeatureStore, StreamFeatureGroup,
3031
PCollection<Object>> {
@@ -42,4 +43,26 @@ public FeatureView get(FeatureStore featureStore, String name, Integer version)
4243
featureView.setFeatureStore(featureStore);
4344
return featureView;
4445
}
46+
47+
public FeatureView getOrCreateFeatureView(FeatureStore featureStore, String name, Integer version, Query query,
48+
String description, List<String> labels)
49+
throws FeatureStoreException, IOException {
50+
FeatureView featureView;
51+
try {
52+
featureView = get(featureStore, name, version, FeatureView.class);
53+
} catch (IOException | FeatureStoreException e) {
54+
if (e.getMessage().contains("Error: 404") && e.getMessage().contains("\"errorCode\":270181")) {
55+
featureView = new FeatureView.FeatureViewBuilder(featureStore)
56+
.name(name)
57+
.version(version)
58+
.query(query)
59+
.description(description)
60+
.labels(labels)
61+
.build();
62+
} else {
63+
throw e;
64+
}
65+
}
66+
return featureView;
67+
}
4568
}

java/flink/src/main/java/com/logicalclocks/hsfs/flink/FeatureStore.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,28 @@ public FeatureView getFeatureView(String name) throws FeatureStoreException, IOE
241241
public FeatureView.FeatureViewBuilder createFeatureView() {
242242
return new FeatureView.FeatureViewBuilder(this);
243243
}
244+
245+
/**
246+
* Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update
247+
* existing feature view metadata.
248+
*
249+
* <pre>
250+
* {@code
251+
* // get feature store handle
252+
* FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
253+
* FeatureView fv = fs.getOrCreateFeatureView("fv_name", query, 1);
254+
* }
255+
* </pre>
256+
*
257+
* @param name Name of the feature view.
258+
* @param query Query object.
259+
* @param version Version of the feature view.
260+
* @return FeatureView The feature view metadata object.
261+
* @throws FeatureStoreException If unable to retrieve FeatureView from the feature store.
262+
* @throws IOException Generic IO exception.
263+
*/
264+
public FeatureView getOrCreateFeatureView(String name, Query query, Integer version)
265+
throws FeatureStoreException, IOException {
266+
return featureViewEngine.getOrCreateFeatureView(this, name, version, query, null, null);
267+
}
244268
}

java/flink/src/main/java/com/logicalclocks/hsfs/flink/FeatureView.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ public FeatureViewBuilder labels(List<String> labels) {
8787

8888
public FeatureView build() throws FeatureStoreException, IOException {
8989
FeatureView featureView = new FeatureView(name, version, query, description, featureStore, labels);
90-
featureViewEngine.save(featureView, FeatureView.class);
91-
return featureView;
90+
return featureViewEngine.save(featureView, FeatureView.class);
9291
}
9392
}
9493

java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.logicalclocks.hsfs.FeatureStoreException;
3131
import com.logicalclocks.hsfs.JobConfiguration;
3232
import com.logicalclocks.hsfs.TimeTravelFormat;
33+
import com.logicalclocks.hsfs.flink.constructor.Query;
3334
import com.logicalclocks.hsfs.flink.engine.FeatureGroupEngine;
3435

3536
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -157,4 +158,62 @@ public DataStreamSink<?> insertStream(DataStream<?> featureData) throws Exceptio
157158
public DataStreamSink<?> insertStream(DataStream<?> featureData, Map<String, String> writeOptions) throws Exception {
158159
return featureGroupEngine.insertStream(this, featureData, writeOptions);
159160
}
161+
162+
/**
163+
* Select a subset of features of the feature group and return a query object. The query can be used to construct
164+
* joins of feature groups or create a feature view with a subset of features of the feature group.
165+
* @param features List of Feature meta data objects.
166+
* @return Query object.
167+
*/
168+
public Query selectFeatures(List<Feature> features) {
169+
return new Query(this, features);
170+
}
171+
172+
/**
173+
* Select a subset of features of the feature group and return a query object. The query can be used to construct
174+
* joins of feature groups or create a feature view with a subset of features of the feature group.
175+
* @param features List of Feature names.
176+
* @return Query object.
177+
*/
178+
public Query select(List<String> features) {
179+
// Create a feature object for each string feature given by the user.
180+
// For the query building each feature need only the name set.
181+
List<Feature> featureObjList = features.stream().map(Feature::new).collect(Collectors.toList());
182+
return selectFeatures(featureObjList);
183+
}
184+
185+
/**
186+
* Select all features of the feature group and return a query object. The query can be used to construct
187+
* joins of feature groups or create a feature view with a subset of features of the feature group.
188+
* @return Query object.
189+
*/
190+
public Query selectAll() {
191+
return new Query(this, getFeatures());
192+
}
193+
194+
/**
195+
* Select all features including primary key and event time feature of the feature group except provided `features`
196+
* and return a query object.
197+
* The query can be used to construct joins of feature groups or create a feature view with a subset of features of
198+
* the feature group.
199+
* @param features List of Feature meta data objects.
200+
* @return Query object.
201+
*/
202+
public Query selectExceptFeatures(List<Feature> features) {
203+
List<String> exceptFeatures = features.stream().map(Feature::getName).collect(Collectors.toList());
204+
return selectExcept(exceptFeatures);
205+
}
206+
207+
/**
208+
* Select all features including primary key and event time feature of the feature group except provided `features`
209+
* and return a query object.
210+
* The query can be used to construct joins of feature groups or create a feature view with a subset of features of
211+
* the feature group.
212+
* @param features List of Feature names.
213+
* @return Query object.
214+
*/
215+
public Query selectExcept(List<String> features) {
216+
return new Query(this,
217+
getFeatures().stream().filter(f -> !features.contains(f.getName())).collect(Collectors.toList()));
218+
}
160219
}

java/flink/src/main/java/com/logicalclocks/hsfs/flink/constructor/Query.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,20 @@
1717

1818
package com.logicalclocks.hsfs.flink.constructor;
1919

20+
import com.logicalclocks.hsfs.Feature;
21+
import com.logicalclocks.hsfs.FeatureGroupBase;
2022
import com.logicalclocks.hsfs.constructor.QueryBase;
2123
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;
2224

2325
import lombok.NoArgsConstructor;
2426
import org.apache.flink.streaming.api.datastream.DataStream;
2527

28+
import java.util.List;
29+
2630
@NoArgsConstructor
2731
public class Query extends QueryBase<Query, StreamFeatureGroup, DataStream<?>> {
32+
33+
public Query(FeatureGroupBase leftFeatureGroup, List<Feature> leftFeatures) {
34+
super(leftFeatureGroup, leftFeatures);
35+
}
2836
}

java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FeatureViewEngine.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.streaming.api.datastream.DataStream;
2929

3030
import java.io.IOException;
31+
import java.util.List;
3132

3233
public class FeatureViewEngine extends FeatureViewEngineBase<Query, FeatureView, FeatureStore, StreamFeatureGroup,
3334
DataStream<?>> {
@@ -45,4 +46,26 @@ public FeatureView get(FeatureStore featureStore, String name, Integer version)
4546
featureView.setFeatureStore(featureStore);
4647
return featureView;
4748
}
49+
50+
public FeatureView getOrCreateFeatureView(FeatureStore featureStore, String name, Integer version, Query query,
51+
String description, List<String> labels)
52+
throws FeatureStoreException, IOException {
53+
FeatureView featureView;
54+
try {
55+
featureView = get(featureStore, name, version, FeatureView.class);
56+
} catch (IOException | FeatureStoreException e) {
57+
if (e.getMessage().contains("Error: 404") && e.getMessage().contains("\"errorCode\":270181")) {
58+
featureView = new FeatureView.FeatureViewBuilder(featureStore)
59+
.name(name)
60+
.version(version)
61+
.query(query)
62+
.description(description)
63+
.labels(labels)
64+
.build();
65+
} else {
66+
throw e;
67+
}
68+
}
69+
return featureView;
70+
}
4871
}

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public abstract class FeatureGroupBase<T> {
5151
@JsonIgnore
5252
protected FeatureStoreBase featureStore;
5353

54+
@Getter
55+
@Setter
56+
protected Integer featurestoreId;
57+
5458
@Getter
5559
@Setter
5660
protected String type = "featuregroupDTO";

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStore.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,28 @@ public FeatureView getFeatureView(String name) throws FeatureStoreException, IOE
241241
public FeatureView.FeatureViewBuilder createFeatureView() {
242242
return new FeatureView.FeatureViewBuilder(this);
243243
}
244+
245+
/**
246+
* Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update
247+
* existing feature view metadata.
248+
*
249+
* <pre>
250+
* {@code
251+
* // get feature store handle
252+
* FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
253+
* FeatureView fv = fs.getOrCreateFeatureView("fv_name", query, 1);
254+
* }
255+
* </pre>
256+
*
257+
* @param name Name of the feature view.
258+
* @param query Query object.
259+
* @param version Version of the feature view.
260+
* @return FeatureView The feature view metadata object.
261+
* @throws FeatureStoreException If unable to retrieve FeatureView from the feature store.
262+
* @throws IOException Generic IO exception.
263+
*/
264+
public FeatureView getOrCreateFeatureView(String name, Query query, Integer version)
265+
throws FeatureStoreException, IOException {
266+
return featureViewEngine.getOrCreateFeatureView(this, name, version, query, null, null);
267+
}
244268
}

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureView.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ public FeatureViewBuilder labels(List<String> labels) {
8282

8383
public FeatureView build() throws FeatureStoreException, IOException {
8484
FeatureView featureView = new FeatureView(name, version, query, description, featureStore, labels);
85-
featureViewEngine.save(featureView, FeatureView.class);
86-
return featureView;
85+
return featureViewEngine.save(featureView, FeatureView.class);
8786
}
8887
}
8988

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureViewBase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,10 @@ public abstract class FeatureViewBase<T extends FeatureViewBase, T3 extends Feat
4242

4343
@Getter
4444
@Setter
45-
@JsonIgnore
4645
protected T3 featureStore;
4746

4847
@Getter
4948
@Setter
50-
@JsonIgnore
5149
protected Integer id;
5250

5351
@Getter

0 commit comments

Comments
 (0)