Skip to content

Commit 4558e8c

Browse files
authored
Merge branch 'master' into FSTORE-1023
2 parents f7c1842 + 16a688a commit 4558e8c

File tree

74 files changed

+4993
-1563
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+4993
-1563
lines changed

java/beam/src/main/java/com/logicalclocks/hsfs/beam/HopsworksConnection.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public HopsworksConnection(String host, int port, String project, Region region,
5555
hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue);
5656
this.projectObj = getProject();
5757
HopsworksClient.getInstance().setProject(this.projectObj);
58-
Credentials credentials = HopsworksClient.getInstance().getCredentials(this.projectObj);
58+
Credentials credentials = HopsworksClient.getInstance().getCredentials();
5959
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
6060
hopsworksHttpClient.setTrustStorePath(credentials.gettStore());
6161
hopsworksHttpClient.setKeyStorePath(credentials.getkStore());
@@ -84,6 +84,6 @@ public FeatureStore getFeatureStore() throws IOException, FeatureStoreException
8484
* @throws FeatureStoreException If client is not connected to Hopsworks
8585
*/
8686
public FeatureStore getFeatureStore(String name) throws IOException, FeatureStoreException {
87-
return featureStoreApi.get(projectObj.getProjectId(), rewriteFeatureStoreName(name), FeatureStore.class);
87+
return featureStoreApi.get(rewriteFeatureStoreName(name), FeatureStore.class);
8888
}
8989
}

java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.logicalclocks.hsfs.beam.StreamFeatureGroup;
2525
import com.logicalclocks.hsfs.metadata.DatasetApi;
2626
import com.logicalclocks.hsfs.engine.EngineBase;
27-
import com.logicalclocks.hsfs.metadata.HopsworksClient;
2827
import com.logicalclocks.hsfs.metadata.HopsworksInternalClient;
2928
import org.apache.avro.Schema;
3029

@@ -72,8 +71,7 @@ public String addFile(String filePath) throws IOException, FeatureStoreException
7271
}
7372
String targetPath = System.getProperty("java.io.tmpdir") + filePath.substring(filePath.lastIndexOf("/"));
7473
try (FileOutputStream outputStream = new FileOutputStream(targetPath)) {
75-
outputStream.write(DatasetApi.readContent(HopsworksClient.getInstance().getProject().getProjectId(),
76-
filePath, "HIVEDB"));
74+
outputStream.write(DatasetApi.readContent(filePath, "HIVEDB"));
7775
}
7876
return targetPath;
7977
}

java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,6 @@ public FeatureStore getFeatureStore() throws IOException, FeatureStoreException
8383
* @throws FeatureStoreException If client is not connected to Hopsworks
8484
*/
8585
public FeatureStore getFeatureStore(String name) throws IOException, FeatureStoreException {
86-
return featureStoreApi.get(projectObj.getProjectId(), rewriteFeatureStoreName(name), FeatureStore.class);
86+
return featureStoreApi.get(rewriteFeatureStoreName(name), FeatureStore.class);
8787
}
8888
}

java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java

+9
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import lombok.NoArgsConstructor;
3333
import lombok.Setter;
3434
import lombok.ToString;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3537
import software.amazon.awssdk.utils.CollectionUtils;
3638

3739
import java.io.IOException;
@@ -76,6 +78,8 @@ public abstract class StorageConnector {
7678

7779
protected StorageConnectorApi storageConnectorApi = new StorageConnectorApi();
7880

81+
protected static final Logger LOGGER = LoggerFactory.getLogger(StorageConnector.class);
82+
7983
public StorageConnector refetch() throws FeatureStoreException, IOException {
8084
return storageConnectorApi.get(getFeaturestoreId(), getName(), StorageConnector.class);
8185
}
@@ -449,6 +453,11 @@ public Map<String, String> kafkaOptions() throws FeatureStoreException {
449453
config.put(Constants.KAFKA_SSL_KEY_PASSWORD, sslKeyPassword);
450454
}
451455

456+
if (externalKafka) {
457+
LOGGER.info("Getting connection details to externally managed Kafka cluster. "
458+
+ "Make sure that the topic being used exists.");
459+
}
460+
452461
return config;
453462
}
454463

java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/CodeApi.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,25 @@ public CodeApi(@NonNull EntityEndpointType entityType) {
4545

4646
public void post(FeatureGroupBase featureGroup, Code code, String entityId, Code.RunType type, String browserHostName)
4747
throws FeatureStoreException, IOException {
48-
post(featureGroup.getFeatureStore().getProjectId(), featureGroup.getFeatureStore().getId(),
48+
post(featureGroup.getFeatureStore().getId(),
4949
featureGroup.getId(), code, entityId, type, browserHostName);
5050
}
5151

5252
public void post(TrainingDatasetBase trainingDatasetBase, Code code, String entityId, Code.RunType type,
5353
String browserHostName)
5454
throws FeatureStoreException, IOException {
55-
post(trainingDatasetBase.getFeatureStore().getProjectId(), trainingDatasetBase.getFeatureStore().getId(),
55+
post(trainingDatasetBase.getFeatureStore().getId(),
5656
trainingDatasetBase.getId(), code, entityId, type, browserHostName);
5757
}
5858

59-
private void post(Integer projectId, Integer featureStoreId, Integer dataSetId, Code code,
59+
private void post(Integer featureStoreId, Integer dataSetId, Code code,
6060
String entityId, Code.RunType type, String browserHostName)
6161
throws FeatureStoreException, IOException {
6262
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
6363
String pathTemplate = HopsworksClient.PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH + CODE_PATH;
6464

6565
String uri = UriTemplate.fromTemplate(pathTemplate)
66-
.set("projectId", projectId)
66+
.set("projectId", hopsworksClient.getProject().getProjectId())
6767
.set("fsId", featureStoreId)
6868
.set("dataSetType", entityType.getValue())
6969
.set("dataSetId", dataSetId)

java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/DatasetApi.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class DatasetApi {
3434
public DatasetApi() {
3535
}
3636

37-
public static byte[] readContent(Integer projectId, String path, String datasetType) throws FeatureStoreException,
37+
public static byte[] readContent(String path, String datasetType) throws FeatureStoreException,
3838
IOException {
3939
if (Strings.isNullOrEmpty(datasetType)) {
4040
datasetType = "DATASET";
@@ -45,13 +45,14 @@ public static byte[] readContent(Integer projectId, String path, String datasetT
4545
.append("{/path}")
4646
.append("{?type}");
4747

48+
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
4849
UriTemplate uri = UriTemplate.fromTemplate(pathBuilder.toString())
49-
.set("projectId", projectId)
50+
.set("projectId", hopsworksClient.getProject().getProjectId())
5051
.set("path",path)
5152
.set("type",datasetType);
5253
String uriString = uri.expand();
5354

54-
return HopsworksClient.getInstance().handleRequest(new HttpGet(uriString),
55+
return hopsworksClient.handleRequest(new HttpGet(uriString),
5556
response -> EntityUtils.toByteArray(response.getEntity()));
5657
}
5758
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright (c) 2020-2024. Hopsworks AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
*
14+
* See the License for the specific language governing permissions and limitations under the License.
15+
*
16+
*/
17+
18+
package com.logicalclocks.hsfs.metadata;
19+
20+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
21+
import lombok.AllArgsConstructor;
22+
import lombok.Getter;
23+
import lombok.NoArgsConstructor;
24+
import lombok.Setter;
25+
import org.json.JSONObject;
26+
27+
import java.util.Map;
28+
29+
@JsonIgnoreProperties(ignoreUnknown = true)
30+
@NoArgsConstructor
31+
@AllArgsConstructor
32+
@Getter
33+
@Setter
34+
public class FeatureDescriptiveStatistics extends RestDto<FeatureDescriptiveStatistics> {
35+
private Integer id;
36+
private String featureType;
37+
private String featureName;
38+
39+
// for any feature type
40+
private Long count;
41+
private Double completeness;
42+
private Long numNonNullValues;
43+
private Long numNullValues;
44+
private Long approxNumDistinctValues;
45+
46+
// for numerical features
47+
private Double min;
48+
private Double max;
49+
private Double sum;
50+
private Double mean;
51+
private Double stddev;
52+
private Map<String, Double> percentiles;
53+
54+
// with exact uniqueness
55+
private Double distinctness;
56+
private Double entropy;
57+
private Double uniqueness;
58+
private Long exactNumDistinctValues;
59+
60+
// histogram, correlations, kll <- from hdfs file
61+
private String extendedStatistics;
62+
63+
public static FeatureDescriptiveStatistics fromDeequStatisticsJson(JSONObject statsJson) {
64+
FeatureDescriptiveStatistics fds = new FeatureDescriptiveStatistics();
65+
fds.setFeatureName(statsJson.getString("column"));
66+
67+
if (statsJson.has("dataType")) {
68+
fds.setFeatureType(statsJson.getString("dataType"));
69+
}
70+
71+
if (statsJson.has("count") && statsJson.getLong("count") == 0) {
72+
// if empty data, ignore the rest of statistics
73+
fds.setCount(0L);
74+
return fds;
75+
}
76+
77+
// common for all data types
78+
if (statsJson.has("numRecordsNull")) {
79+
fds.setNumNullValues(statsJson.getLong("numRecordsNull"));
80+
}
81+
if (statsJson.has("numRecordsNonNull")) {
82+
fds.setNumNonNullValues(statsJson.getLong("numRecordsNonNull"));
83+
}
84+
if (statsJson.has("numRecordsNull") && statsJson.has("numRecordsNonNull")) {
85+
fds.setCount(Long.valueOf(statsJson.getInt("numRecordsNull") + statsJson.getInt("numRecordsNonNull")));
86+
}
87+
if (statsJson.has("count")) {
88+
fds.setCount(statsJson.getLong("count"));
89+
}
90+
if (statsJson.has("completeness")) {
91+
fds.setCompleteness(statsJson.getDouble("completeness"));
92+
}
93+
if (statsJson.has("approximateNumDistinctValues")) {
94+
fds.setApproxNumDistinctValues(statsJson.getLong("approximateNumDistinctValues"));
95+
}
96+
97+
// commmon for all data types if exact_uniqueness is enabled
98+
if (statsJson.has("uniqueness")) {
99+
fds.setUniqueness(statsJson.getDouble("uniqueness"));
100+
}
101+
if (statsJson.has("entropy")) {
102+
fds.setEntropy(statsJson.getDouble("entropy"));
103+
}
104+
if (statsJson.has("distinctness")) {
105+
fds.setDistinctness(statsJson.getDouble("distinctness"));
106+
}
107+
if (statsJson.has("exactNumDistinctValues")) {
108+
fds.setExactNumDistinctValues(statsJson.getLong("exactNumDistinctValues"));
109+
}
110+
111+
// fractional / integral features
112+
if (statsJson.has("minimum")) {
113+
fds.setMin(statsJson.getDouble("minimum"));
114+
}
115+
if (statsJson.has("maximum")) {
116+
fds.setMax(statsJson.getDouble("maximum"));
117+
}
118+
if (statsJson.has("sum")) {
119+
fds.setSum(statsJson.getDouble("sum"));
120+
}
121+
if (statsJson.has("mean")) {
122+
fds.setMean(statsJson.getDouble("mean"));
123+
}
124+
if (statsJson.has("stdDev")) {
125+
fds.setStddev(statsJson.getDouble("stdDev"));
126+
}
127+
128+
JSONObject extendedStatistics = new JSONObject();
129+
if (statsJson.has("correlations")) {
130+
extendedStatistics.put("correlations", statsJson.getJSONArray("correlations"));
131+
}
132+
if (statsJson.has("histogram")) {
133+
extendedStatistics.put("histogram", statsJson.getJSONArray("histogram"));
134+
}
135+
if (statsJson.has("kll")) {
136+
extendedStatistics.put("kll", statsJson.get("kll"));
137+
}
138+
if (statsJson.has("unique_values")) {
139+
extendedStatistics.put("unique_values", statsJson.getJSONArray("unique_values"));
140+
}
141+
if (extendedStatistics.length() > 0) {
142+
fds.setExtendedStatistics(extendedStatistics.toString());
143+
}
144+
145+
return fds;
146+
}
147+
}

java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public <T extends FeatureGroupBase> T[] getInternal(FeatureStoreBase featureStor
6060
+ FEATURE_GROUP_PATH;
6161

6262
UriTemplate uri = UriTemplate.fromTemplate(pathTemplate)
63-
.set("projectId", featureStoreBase.getProjectId())
63+
.set("projectId", hopsworksClient.getProject().getProjectId())
6464
.set("fsId", featureStoreBase.getId())
6565
.set("fgName", fgName);
6666

@@ -83,12 +83,13 @@ public <U extends FeatureGroupBase> FeatureGroupBase save(FeatureGroupBase featu
8383

8484
public <U extends FeatureGroupBase> FeatureGroupBase saveInternal(FeatureGroupBase featureGroupBase,
8585
StringEntity entity, Class<U> fgType) throws FeatureStoreException, IOException {
86+
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
8687
String pathTemplate = HopsworksClient.PROJECT_PATH
8788
+ FeatureStoreApi.FEATURE_STORE_PATH
8889
+ FEATURE_GROUP_ROOT_PATH;
8990

9091
String uri = UriTemplate.fromTemplate(pathTemplate)
91-
.set("projectId", featureGroupBase.getFeatureStore().getProjectId())
92+
.set("projectId", hopsworksClient.getProject().getProjectId())
9293
.set("fsId", featureGroupBase.getFeatureStore().getId())
9394
.expand();
9495

@@ -98,7 +99,7 @@ public <U extends FeatureGroupBase> FeatureGroupBase saveInternal(FeatureGroupBa
9899

99100
LOGGER.info("Sending metadata request: " + uri);
100101

101-
return HopsworksClient.getInstance().handleRequest(postRequest, fgType);
102+
return hopsworksClient.handleRequest(postRequest, fgType);
102103
}
103104

104105
public void delete(FeatureGroupBase featureGroupBase) throws FeatureStoreException, IOException {
@@ -108,7 +109,7 @@ public void delete(FeatureGroupBase featureGroupBase) throws FeatureStoreExcepti
108109
+ FEATURE_GROUP_ID_PATH;
109110

110111
String uri = UriTemplate.fromTemplate(pathTemplate)
111-
.set("projectId", featureGroupBase.getFeatureStore().getProjectId())
112+
.set("projectId", hopsworksClient.getProject().getProjectId())
112113
.set("fsId", featureGroupBase.getFeatureStore().getId())
113114
.set("fgId", featureGroupBase.getId())
114115
.expand();
@@ -126,7 +127,7 @@ public void deleteContent(FeatureGroupBase featureGroup) throws FeatureStoreExce
126127
+ FEATURE_GROUP_CLEAR_PATH;
127128

128129
String uri = UriTemplate.fromTemplate(pathTemplate)
129-
.set("projectId", featureGroup.getFeatureStore().getProjectId())
130+
.set("projectId", hopsworksClient.getProject().getProjectId())
130131
.set("fsId", featureGroup.getFeatureStore().getId())
131132
.set("fgId", featureGroup.getId())
132133
.expand();
@@ -151,7 +152,7 @@ public <T extends FeatureGroupBase> T updateMetadata(FeatureGroupBase featureGro
151152
+ FEATURE_GROUP_ID_PATH;
152153

153154
String uri = UriTemplate.fromTemplate(pathTemplate)
154-
.set("projectId", featureGroup.getFeatureStore().getProjectId())
155+
.set("projectId", hopsworksClient.getProject().getProjectId())
155156
.set("fsId", featureGroup.getFeatureStore().getId())
156157
.set("fgId", featureGroup.getId())
157158
.set(queryParameter, value)
@@ -173,7 +174,7 @@ public FeatureGroupCommit featureGroupCommit(FeatureGroupBase featureGroup, Feat
173174
+ FEATURE_GROUP_COMMIT_PATH;
174175

175176
String uri = UriTemplate.fromTemplate(pathTemplate)
176-
.set("projectId", featureGroup.getFeatureStore().getProjectId())
177+
.set("projectId", hopsworksClient.getProject().getProjectId())
177178
.set("fsId", featureGroup.getFeatureStore().getId())
178179
.set("fgId", featureGroup.getId())
179180
.expand();
@@ -194,7 +195,7 @@ public List<FeatureGroupCommit> getCommitDetails(FeatureGroupBase featureGroupBa
194195
+ FEATURE_GROUP_COMMIT_PATH;
195196

196197
UriTemplate uriTemplate = UriTemplate.fromTemplate(pathTemplate)
197-
.set("projectId", featureGroupBase.getFeatureStore().getProjectId())
198+
.set("projectId", hopsworksClient.getProject().getProjectId())
198199
.set("fsId", featureGroupBase.getFeatureStore().getId())
199200
.set("fgId", featureGroupBase.getId())
200201
.set("sort_by", "committed_on:desc")

java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/FeatureStoreApi.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ public class FeatureStoreApi {
3333

3434
private static final Logger LOGGER = LoggerFactory.getLogger(FeatureStoreApi.class);
3535

36-
public <T> T get(int projectId, String name, Class<T> fsType)
36+
public <T> T get(String name, Class<T> fsType)
3737
throws IOException, FeatureStoreException {
3838
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
3939
String pathTemplate = HopsworksClient.PROJECT_PATH
4040
+ FEATURE_STORE_NAME_PATH;
4141

4242
String uri = UriTemplate.fromTemplate(pathTemplate)
43-
.set("projectId", projectId)
43+
.set("projectId", hopsworksClient.getProject().getProjectId())
4444
.set("fsName", name)
4545
.expand();
4646

0 commit comments

Comments
 (0)