Skip to content

Commit 7131721

Browse files
authored
[FSTORE-1407] Add possibility to affect number of partitions in Feature Group tables (logicalclocks#257)
* add onlineComments and onlineStorageType * add onlineStorageType to feature * use OnlineConfig instead * mini fix * mini fix 2 * add to_dict * fix to dict * null pointer issue fix * move OnlineConfig * ruff fix * add tableSpace * address feedback
1 parent 2308296 commit 7131721

File tree

18 files changed

+236
-38
lines changed

18 files changed

+236
-38
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ dmypy.json
127127
.vscode
128128
*.iml
129129
target/
130+
delombok/
130131

131132
# Mac
132133
.DS_Store

java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.logicalclocks.hsfs.FeatureStoreBase;
2121
import com.logicalclocks.hsfs.FeatureStoreException;
22+
import com.logicalclocks.hsfs.OnlineConfig;
2223
import com.logicalclocks.hsfs.StatisticsConfig;
2324
import com.logicalclocks.hsfs.StorageConnector;
2425
import com.logicalclocks.hsfs.TimeTravelFormat;
@@ -73,7 +74,8 @@ public Object getOrCreateFeatureGroup(String name, Integer version, List<String>
7374
@Override
7475
public Object getOrCreateFeatureGroup(String name, Integer version, String description, List<String> primaryKeys,
7576
List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
76-
StatisticsConfig statisticsConfig, String topicName, String notificationTopicName, String eventTime)
77+
StatisticsConfig statisticsConfig, String topicName, String notificationTopicName, String eventTime,
78+
OnlineConfig onlineConfig)
7779
throws IOException, FeatureStoreException {
7880
throw new UnsupportedOperationException("Not supported for Beam");
7981
}
@@ -158,7 +160,8 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
158160
@Override
159161
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
160162
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled,
161-
StatisticsConfig statisticsConfig, String eventTime) throws IOException, FeatureStoreException {
163+
StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig)
164+
throws IOException, FeatureStoreException {
162165
throw new UnsupportedOperationException("Not supported for Beam");
163166
}
164167

java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.logicalclocks.hsfs.FeatureStoreException;
2323
import com.logicalclocks.hsfs.HudiOperationType;
2424
import com.logicalclocks.hsfs.JobConfiguration;
25+
import com.logicalclocks.hsfs.OnlineConfig;
2526
import com.logicalclocks.hsfs.StatisticsConfig;
2627
import com.logicalclocks.hsfs.Storage;
2728
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
@@ -46,8 +47,8 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
4647
@Builder
4748
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
4849
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
49-
boolean onlineEnabled, List<Feature> features,
50-
StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime) {
50+
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
51+
String eventTime, OnlineConfig onlineConfig) {
5152
this();
5253
this.featureStore = featureStore;
5354
this.name = name;
@@ -63,6 +64,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
6364
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
6465
this.onlineTopicName = onlineTopicName;
6566
this.eventTime = eventTime;
67+
this.onlineConfig = onlineConfig;
6668
}
6769

6870
public StreamFeatureGroup() {

java/flink/src/main/java/com/logicalclocks/hsfs/flink/FeatureStore.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.logicalclocks.hsfs.FeatureStoreBase;
2121
import com.logicalclocks.hsfs.FeatureStoreException;
22+
import com.logicalclocks.hsfs.OnlineConfig;
2223
import com.logicalclocks.hsfs.StatisticsConfig;
2324
import com.logicalclocks.hsfs.StorageConnector;
2425
import com.logicalclocks.hsfs.TimeTravelFormat;
@@ -76,7 +77,8 @@ public Object getOrCreateFeatureGroup(String name, Integer version, List<String>
7677
@Override
7778
public Object getOrCreateFeatureGroup(String name, Integer version, String description, List<String> primaryKeys,
7879
List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
79-
StatisticsConfig statisticsConfig, String topicName, String notificationTopicName, String eventTime) {
80+
StatisticsConfig statisticsConfig, String topicName, String notificationTopicName, String eventTime,
81+
OnlineConfig onlineConfig) {
8082
throw new UnsupportedOperationException("Not supported for Flink");
8183
}
8284

@@ -163,8 +165,8 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
163165
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
164166
List<String> primaryKeys, List<String> partitionKeys,
165167
String hudiPrecombineKey, boolean onlineEnabled,
166-
StatisticsConfig statisticsConfig,
167-
String eventTime)
168+
StatisticsConfig statisticsConfig, String eventTime,
169+
OnlineConfig onlineConfig)
168170
throws IOException, FeatureStoreException {
169171
throw new UnsupportedOperationException("Not supported for Flink");
170172
}

java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.logicalclocks.hsfs.FeatureStoreException;
2424
import com.logicalclocks.hsfs.HudiOperationType;
2525
import com.logicalclocks.hsfs.JobConfiguration;
26+
import com.logicalclocks.hsfs.OnlineConfig;
2627
import com.logicalclocks.hsfs.StatisticsConfig;
2728
import com.logicalclocks.hsfs.Storage;
2829
import com.logicalclocks.hsfs.constructor.QueryBase;
@@ -52,7 +53,8 @@ public class StreamFeatureGroup extends FeatureGroupBase<DataStream<?>> {
5253
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
5354
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
5455
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
55-
String onlineTopicName, String topicName, String notificationTopicName, String eventTime) {
56+
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
57+
OnlineConfig onlineConfig) {
5658
this();
5759
this.featureStore = featureStore;
5860
this.name = name;
@@ -70,6 +72,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
7072
this.topicName = topicName;
7173
this.notificationTopicName = notificationTopicName;
7274
this.eventTime = eventTime;
75+
this.onlineConfig = onlineConfig;
7376
}
7477

7578
public StreamFeatureGroup() {

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java

+4
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ public abstract class FeatureGroupBase<T> {
128128
@Getter
129129
protected Boolean deprecated;
130130

131+
@Getter
132+
@Setter
133+
protected OnlineConfig onlineConfig;
134+
131135
@JsonIgnore
132136
// These are only used in the client. In the server they are aggregated in the `features` field
133137
protected List<String> partitionKeys;

java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public abstract Object getOrCreateFeatureGroup(String name, Integer version, Str
7878
List<String> primaryKeys, List<String> partitionKeys,
7979
String hudiPrecombineKey, boolean onlineEnabled,
8080
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig,
81-
String topicName, String notificationTopicName, String eventTime)
81+
String topicName, String notificationTopicName, String eventTime,
82+
OnlineConfig onlineConfig)
8283
throws IOException, FeatureStoreException;
8384

8485
/**
@@ -121,7 +122,8 @@ public abstract Object getOrCreateStreamFeatureGroup(String name, Integer versio
121122
public abstract Object getOrCreateStreamFeatureGroup(String name, Integer version, String description,
122123
List<String> primaryKeys, List<String> partitionKeys,
123124
String hudiPrecombineKey, boolean onlineEnabled,
124-
StatisticsConfig statisticsConfig, String eventTime)
125+
StatisticsConfig statisticsConfig, String eventTime,
126+
OnlineConfig onlineConfig)
125127
throws IOException, FeatureStoreException;
126128

127129
public abstract Object createExternalFeatureGroup();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2024. Hopsworks AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
*
14+
* See the License for the specific language governing permissions and limitations under the License.
15+
*
16+
*/
17+
18+
package com.logicalclocks.hsfs;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.Getter;
22+
import lombok.NoArgsConstructor;
23+
import lombok.Setter;
24+
25+
import java.util.List;
26+
27+
@NoArgsConstructor
28+
@AllArgsConstructor
29+
public class OnlineConfig {
30+
31+
@Getter
32+
@Setter
33+
private List<String> onlineComments;
34+
35+
@Getter
36+
@Setter
37+
private String tableSpace;
38+
}

java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.logicalclocks.hsfs.FeatureStoreException;
2828
import com.logicalclocks.hsfs.HudiOperationType;
2929
import com.logicalclocks.hsfs.JobConfiguration;
30+
import com.logicalclocks.hsfs.OnlineConfig;
3031
import com.logicalclocks.hsfs.StatisticsConfig;
3132
import com.logicalclocks.hsfs.Storage;
3233

@@ -87,7 +88,7 @@ public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Int
8788
@NonNull StorageConnector storageConnector, String description, List<String> primaryKeys,
8889
List<Feature> features, StatisticsConfig statisticsConfig, String eventTime,
8990
boolean onlineEnabled, String onlineTopicName, String topicName,
90-
String notificationTopicName) {
91+
String notificationTopicName, OnlineConfig onlineConfig) {
9192
this();
9293
this.timeTravelFormat = null;
9394
this.featureStore = featureStore;
@@ -111,6 +112,7 @@ public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Int
111112
this.onlineTopicName = onlineTopicName;
112113
this.topicName = topicName;
113114
this.notificationTopicName = notificationTopicName;
115+
this.onlineConfig = onlineConfig;
114116
}
115117

116118
public ExternalFeatureGroup() {

java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.logicalclocks.hsfs.FeatureStoreException;
2929
import com.logicalclocks.hsfs.HudiOperationType;
3030
import com.logicalclocks.hsfs.JobConfiguration;
31+
import com.logicalclocks.hsfs.OnlineConfig;
3132
import com.logicalclocks.hsfs.StatisticsConfig;
3233
import com.logicalclocks.hsfs.Storage;
3334
import com.logicalclocks.hsfs.TimeTravelFormat;
@@ -63,7 +64,7 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
6364
String description, List<String> primaryKeys, List<String> partitionKeys,
6465
String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
6566
List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
66-
String topicName, String notificationTopicName, String eventTime) {
67+
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) {
6768
this();
6869
this.featureStore = featureStore;
6970
this.name = name;
@@ -83,6 +84,7 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
8384
this.topicName = topicName;
8485
this.notificationTopicName = notificationTopicName;
8586
this.eventTime = eventTime;
87+
this.onlineConfig = onlineConfig;
8688
}
8789

8890
public FeatureGroup() {

java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureStore.java

+16-11
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.logicalclocks.hsfs.spark.engine.SparkEngine;
2424
import com.logicalclocks.hsfs.FeatureStoreBase;
2525
import com.logicalclocks.hsfs.FeatureStoreException;
26+
import com.logicalclocks.hsfs.OnlineConfig;
2627
import com.logicalclocks.hsfs.StatisticsConfig;
2728
import com.logicalclocks.hsfs.StorageConnector;
2829
import com.logicalclocks.hsfs.TimeTravelFormat;
@@ -176,7 +177,7 @@ public FeatureGroup.FeatureGroupBuilder createFeatureGroup() {
176177
@Override
177178
public FeatureGroup getOrCreateFeatureGroup(String name, Integer version) throws IOException, FeatureStoreException {
178179
return featureGroupEngine.getOrCreateFeatureGroup(this, name, version, null, null,
179-
null, null, false, null, null, null, null, null);
180+
null, null, false, null, null, null, null, null, null);
180181
}
181182

182183
/**
@@ -210,7 +211,7 @@ public FeatureGroup getOrCreateFeatureGroup(String name, Integer version, List<S
210211
boolean onlineEnabled, String eventTime)
211212
throws IOException, FeatureStoreException {
212213
return featureGroupEngine.getOrCreateFeatureGroup(this, name, version, null, primaryKeys,
213-
null, null, onlineEnabled, null, null, null, null, eventTime);
214+
null, null, onlineEnabled, null, null, null, null, eventTime, null);
214215
}
215216

216217
/**
@@ -249,7 +250,7 @@ public FeatureGroup getOrCreateFeatureGroup(String name, Integer version,
249250
String eventTime) throws IOException, FeatureStoreException {
250251

251252
return featureGroupEngine.getOrCreateFeatureGroup(this, name, version, null, primaryKeys,
252-
partitionKeys, null, onlineEnabled, null, null, null, null, eventTime);
253+
partitionKeys, null, onlineEnabled, null, null, null, null, eventTime, null);
253254
}
254255

255256
/**
@@ -290,6 +291,7 @@ public FeatureGroup getOrCreateFeatureGroup(String name, Integer version,
290291
* @param eventTime Name of the feature containing the event time for the features in this feature group. If
291292
* eventTime is set the feature group can be used for point-in-time joins.
292293
* The supported data types for the eventTime column are: timestamp, date and bigint
294+
* @param onlineConfig Optionally, define configuration which is used to configure online table.
293295
* @return FeatureGroup: The feature group metadata object.
294296
* @throws IOException Generic IO exception.
295297
* @throws FeatureStoreException If unable to retrieve FeatureGroup from the feature store.
@@ -299,12 +301,13 @@ public FeatureGroup getOrCreateFeatureGroup(String name, Integer version, String
299301
List<String> primaryKeys, List<String> partitionKeys,
300302
String hudiPrecombineKey, boolean onlineEnabled,
301303
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig,
302-
String topicName, String notificationTopicName, String eventTime)
304+
String topicName, String notificationTopicName, String eventTime,
305+
OnlineConfig onlineConfig)
303306
throws IOException, FeatureStoreException {
304307

305308
return featureGroupEngine.getOrCreateFeatureGroup(this, name, version, description, primaryKeys,
306309
partitionKeys, hudiPrecombineKey, onlineEnabled, timeTravelFormat, statisticsConfig, topicName,
307-
notificationTopicName, eventTime);
310+
notificationTopicName, eventTime, onlineConfig);
308311
}
309312

310313
/**
@@ -401,7 +404,7 @@ public StreamFeatureGroup.StreamFeatureGroupBuilder createStreamFeatureGroup() {
401404
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version)
402405
throws IOException, FeatureStoreException {
403406
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
404-
null, null, null, false, null, null);
407+
null, null, null, false, null, null, null);
405408
}
406409

407410
/**
@@ -435,7 +438,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
435438
boolean onlineEnabled, String eventTime)
436439
throws IOException, FeatureStoreException {
437440
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
438-
primaryKeys, null, null, onlineEnabled, null, eventTime);
441+
primaryKeys, null, null, onlineEnabled, null, eventTime, null);
439442
}
440443

441444
/**
@@ -474,7 +477,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
474477

475478

476479
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
477-
primaryKeys, partitionKeys, null, onlineEnabled, null, eventTime);
480+
primaryKeys, partitionKeys, null, onlineEnabled, null, eventTime, null);
478481
}
479482

480483
/**
@@ -511,6 +514,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
511514
* @param eventTime Name of the feature containing the event
512515
* time for the features in this feature group. If eventTime is set
513516
* the feature group can be used for point-in-time joins.
517+
* @param onlineConfig Optionally, define configuration which is used to configure online table.
514518
* @return FeatureGroup: The feature group metadata object.
515519
* @throws IOException Generic IO exception.
516520
* @throws FeatureStoreException If unable to retrieve FeatureGroup from the feature store.
@@ -519,12 +523,13 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
519523
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
520524
List<String> primaryKeys, List<String> partitionKeys,
521525
String hudiPrecombineKey, boolean onlineEnabled,
522-
StatisticsConfig statisticsConfig,
523-
String eventTime)
526+
StatisticsConfig statisticsConfig, String eventTime,
527+
OnlineConfig onlineConfig)
524528
throws IOException, FeatureStoreException {
525529

526530
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, description,
527-
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, statisticsConfig, eventTime);
531+
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, statisticsConfig, eventTime,
532+
onlineConfig);
528533
}
529534

530535
/**

java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.logicalclocks.hsfs.FeatureStoreException;
2828
import com.logicalclocks.hsfs.HudiOperationType;
2929
import com.logicalclocks.hsfs.JobConfiguration;
30+
import com.logicalclocks.hsfs.OnlineConfig;
3031
import com.logicalclocks.hsfs.StatisticsConfig;
3132
import com.logicalclocks.hsfs.Storage;
3233
import com.logicalclocks.hsfs.FeatureGroupBase;
@@ -60,7 +61,8 @@ public class StreamFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
6061
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
6162
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
6263
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
63-
String onlineTopicName, String topicName, String notificationTopicName, String eventTime) {
64+
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
65+
OnlineConfig onlineConfig) {
6466
this();
6567
this.featureStore = featureStore;
6668
this.name = name;
@@ -78,6 +80,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
7880
this.topicName = topicName;
7981
this.notificationTopicName = notificationTopicName;
8082
this.eventTime = eventTime;
83+
this.onlineConfig = onlineConfig;
8184
}
8285

8386
public StreamFeatureGroup() {

0 commit comments

Comments
 (0)