Skip to content

Commit f7c1842

Browse files
authored
Merge branch 'master' into FSTORE-1023
2 parents 98ecfdd + fbfc07a commit f7c1842

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2068
-384
lines changed

.github/workflows/python-lint.yml

+16-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
- uses: actions/checkout@v3
1212
- uses: actions/setup-python@v4
1313
with:
14-
python-version: '3.8'
14+
python-version: '3.10'
1515
- name: install deps
1616
run: pip install flake8==3.9.0 black==22.3.0 pre-commit-hooks==2.4.0
1717

@@ -33,7 +33,7 @@ jobs:
3333
runs-on: ubuntu-latest
3434
strategy:
3535
matrix:
36-
python-version: ["3.7", "3.8", "3.9", "3.10"]
36+
python-version: ["3.8", "3.9", "3.10"]
3737

3838
steps:
3939
- name: Set Timezone
@@ -55,6 +55,8 @@ jobs:
5555
run: python --version
5656

5757
- name: Run Pytest suite
58+
env:
59+
ENABLE_HOPSWORKS_USAGE: "false"
5860
run: pytest python/tests
5961

6062
unit_tests_ubuntu_pandas:
@@ -73,7 +75,7 @@ jobs:
7375
- uses: actions/setup-python@v4
7476
name: Setup Python
7577
with:
76-
python-version: "3.9"
78+
python-version: "3.10"
7779
cache: "pip"
7880
cache-dependency-path: "python/setup.py"
7981
- run: pip install -e python[python,dev-pandas1,docs]
@@ -82,6 +84,8 @@ jobs:
8284
run: python --version
8385

8486
- name: Run Pytest suite
87+
env:
88+
ENABLE_HOPSWORKS_USAGE: "false"
8589
run: pytest python/tests
8690

8791
unit_tests_ubuntu_local:
@@ -100,7 +104,7 @@ jobs:
100104
- uses: actions/setup-python@v4
101105
name: Setup Python
102106
with:
103-
python-version: "3.9"
107+
python-version: "3.10"
104108
cache: "pip"
105109
cache-dependency-path: "python/setup.py"
106110
- run: pip install -e python[python,dev,docs]
@@ -109,6 +113,8 @@ jobs:
109113
run: python --version
110114

111115
- name: Run Pytest suite
116+
env:
117+
ENABLE_HOPSWORKS_USAGE: "false"
112118
run: pytest python/tests
113119

114120
unit_tests_windows_utc:
@@ -127,7 +133,7 @@ jobs:
127133
- uses: actions/setup-python@v4
128134
name: Setup Python
129135
with:
130-
python-version: "3.9"
136+
python-version: "3.10"
131137
cache: "pip"
132138
cache-dependency-path: "python/setup.py"
133139
- run: pip install -e python[python,dev,docs]
@@ -136,6 +142,8 @@ jobs:
136142
run: python --version
137143

138144
- name: Run Pytest suite
145+
env:
146+
ENABLE_HOPSWORKS_USAGE: "false"
139147
run: pytest python/tests
140148

141149

@@ -155,7 +163,7 @@ jobs:
155163
- uses: actions/setup-python@v4
156164
name: Setup Python
157165
with:
158-
python-version: "3.9"
166+
python-version: "3.10"
159167
cache: "pip"
160168
cache-dependency-path: "python/setup.py"
161169
- run: pip install -e python[python,dev,docs]
@@ -164,4 +172,6 @@ jobs:
164172
run: python --version
165173

166174
- name: Run Pytest suite
175+
env:
176+
ENABLE_HOPSWORKS_USAGE: "false"
167177
run: pytest python/tests

java/beam/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>hsfs-parent</artifactId>
77
<groupId>com.logicalclocks</groupId>
8-
<version>3.5.0-SNAPSHOT</version>
8+
<version>3.7.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

java/flink/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>hsfs-parent</artifactId>
77
<groupId>com.logicalclocks</groupId>
8-
<version>3.5.0-SNAPSHOT</version>
8+
<version>3.7.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

java/hsfs/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>hsfs-parent</artifactId>
77
<groupId>com.logicalclocks</groupId>
8-
<version>3.5.0-SNAPSHOT</version>
8+
<version>3.7.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

java/pom.xml

+16-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<groupId>com.logicalclocks</groupId>
88
<artifactId>hsfs-parent</artifactId>
99
<packaging>pom</packaging>
10-
<version>3.5.0-SNAPSHOT</version>
10+
<version>3.7.0-SNAPSHOT</version>
1111
<modules>
1212
<module>hsfs</module>
1313
<module>spark</module>
@@ -27,18 +27,20 @@
2727
<lombok.version>1.18.10</lombok.version>
2828
<fasterxml.jackson.databind.version>2.10.0</fasterxml.jackson.databind.version>
2929
<deequ.version>1.1.0.6-SNAPSHOT</deequ.version>
30-
<hudi.version>0.10.0.3</hudi.version>
30+
<hudi.version>0.12.3.0</hudi.version>
3131
<awssdk.version>2.10.40</awssdk.version>
3232
<scala.version>2.12.10</scala.version>
3333
<scala-short.version>2.12</scala-short.version>
3434
<dbutils.version>0.0.5</dbutils.version>
35-
<json.version>20230227</json.version>
35+
<json.version>20231013</json.version>
3636
<hoverfly.version>0.12.2</hoverfly.version>
3737
<junit.version>5.9.1</junit.version>
3838
<surefire-plugin.version>2.22.0</surefire-plugin.version>
3939
<mockito.version>4.3.1</mockito.version>
4040
<avro.version>1.8.2</avro.version>
4141

42+
<artifact.spark.version>spark3.1</artifact.spark.version>
43+
4244
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4345
<delombok.output>${project.basedir}/delombok</delombok.output>
4446
</properties>
@@ -271,6 +273,17 @@
271273
</testResource>
272274
</testResources>
273275
</build>
276+
277+
<profiles>
278+
<profile>
279+
<id>spark-3.3</id>
280+
<properties>
281+
<deequ.version>2.0.4.0-spark-3.3</deequ.version>
282+
<artifact.spark.version>spark3.3</artifact.spark.version>
283+
</properties>
284+
</profile>
285+
</profiles>
286+
274287
<repositories>
275288
<repository>
276289
<id>Hops</id>

java/spark/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
<parent>
2323
<artifactId>hsfs-parent</artifactId>
2424
<groupId>com.logicalclocks</groupId>
25-
<version>3.5.0-SNAPSHOT</version>
25+
<version>3.7.0-SNAPSHOT</version>
2626
</parent>
2727
<modelVersion>4.0.0</modelVersion>
2828

29-
<artifactId>hsfs-spark</artifactId>
29+
<artifactId>hsfs-spark-${artifact.spark.version}</artifactId>
3030

3131
<properties>
3232
<spark.version>3.1.1.3</spark.version>
@@ -123,7 +123,7 @@
123123
<!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark-bundle -->
124124
<dependency>
125125
<groupId>io.hops.hudi</groupId>
126-
<artifactId>hudi-spark3-bundle_${scala-short.version}</artifactId>
126+
<artifactId>hudi-spark3.1-bundle_${scala-short.version}</artifactId>
127127
<version>${hudi.version}</version>
128128
<scope>provided</scope>
129129
<exclusions>

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,7 @@ public <S> StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase
577577
byte[] featureGroupId = String.valueOf(featureGroupBase.getId()).getBytes(StandardCharsets.UTF_8);
578578
byte[] schemaId = String.valueOf(featureGroupBase.getSubject().getSchemaId()).getBytes(StandardCharsets.UTF_8);
579579

580+
queryName = makeQueryName(queryName, featureGroupBase);
580581
DataStreamWriter<Row> writer =
581582
onlineFeatureGroupToAvro(featureGroupBase, encodeComplexFeatures(featureGroupBase, dataset))
582583
.withColumn("headers", array(
@@ -596,8 +597,9 @@ public <S> StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase
596597
.writeStream()
597598
.format(Constants.KAFKA_FORMAT)
598599
.outputMode(outputMode)
600+
.queryName(queryName)
599601
.option("checkpointLocation", checkpointLocation == null
600-
? checkpointDirPath(queryName, featureGroupBase.getOnlineTopicName())
602+
? checkpointDirPath(queryName)
601603
: checkpointLocation)
602604
.options(writeOptions)
603605
.option("topic", featureGroupBase.getOnlineTopicName());
@@ -1082,12 +1084,21 @@ public String constructCheckpointPath(FeatureGroupBase featureGroup, String quer
10821084
+ "/Resources/" + queryName + "-checkpoint";
10831085
}
10841086

1085-
public String checkpointDirPath(String queryName, String onlineTopicName) throws FeatureStoreException {
1086-
if (Strings.isNullOrEmpty(queryName)) {
1087-
queryName = "insert_stream_" + onlineTopicName;
1088-
}
1087+
private String checkpointDirPath(String queryName) throws FeatureStoreException {
10891088
return "/Projects/" + HopsworksClient.getInstance().getProject().getProjectName()
10901089
+ "/Resources/" + queryName + "-checkpoint";
10911090
}
10921091

1092+
protected String makeQueryName(String queryName, FeatureGroupBase featureGroup) {
1093+
if (Strings.isNullOrEmpty(queryName)) {
1094+
queryName = String.format("insert_stream_%d_%d_%s_%d_onlinefs",
1095+
featureGroup.getFeatureStore().getProjectId(),
1096+
featureGroup.getId(),
1097+
featureGroup.getName(),
1098+
featureGroup.getVersion()
1099+
);
1100+
}
1101+
return queryName;
1102+
}
1103+
10931104
}

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,9 @@ private Map<String, String> setupHudiWriteOpts(FeatureGroupBase featureGroup, Hu
264264
hudiArgs.put(HUDI_HIVE_SYNC_DB, featureGroup.getFeatureStore().getName());
265265
hudiArgs.put(HIVE_AUTO_CREATE_DATABASE_OPT_KEY, HIVE_AUTO_CREATE_DATABASE_OPT_VAL);
266266
hudiArgs.put(HUDI_HIVE_SYNC_SUPPORT_TIMESTAMP, "true");
267-
hudiArgs.put(HUDI_TABLE_OPERATION, operation.getValue());
267+
if (operation != null) {
268+
hudiArgs.put(HUDI_TABLE_OPERATION, operation.getValue());
269+
}
268270
hudiArgs.putAll(HUDI_DEFAULT_PARALLELISM);
269271

270272
// Overwrite with user provided options if any
@@ -307,8 +309,7 @@ private void createEmptyTable(SparkSession sparkSession, StreamFeatureGroup stre
307309
throws IOException, FeatureStoreException {
308310
Configuration configuration = sparkSession.sparkContext().hadoopConfiguration();
309311
Properties properties = new Properties();
310-
properties.putAll(setupHudiWriteOpts(streamFeatureGroup,
311-
HudiOperationType.BULK_INSERT, null));
312+
properties.putAll(setupHudiWriteOpts(streamFeatureGroup, null, null));
312313
HoodieTableMetaClient.initTableAndGetMetaClient(configuration, streamFeatureGroup.getLocation(), properties);
313314
}
314315

java/spark/src/test/java/com/logicalclocks/hsfs/spark/engine/TestSparkEngine.java

+28-2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
2929
import com.logicalclocks.hsfs.spark.FeatureGroup;
3030
import com.logicalclocks.hsfs.spark.FeatureStore;
31-
import org.apache.spark.SparkContext;
3231
import org.apache.spark.sql.Dataset;
33-
import org.apache.spark.sql.SparkSession;
3432
import org.apache.spark.sql.types.DataTypes;
3533
import org.apache.spark.sql.types.StructField;
3634
import org.apache.spark.sql.types.StructType;
@@ -227,4 +225,32 @@ public void testGetKafkaConfigExternalClientInternalKafka() throws FeatureStoreE
227225
Mockito.verify(storageConnectorApi).getKafkaStorageConnector(Mockito.any(), externalArg.capture());
228226
Assertions.assertEquals(Boolean.FALSE, externalArg.getValue());
229227
}
228+
229+
@Test
230+
public void testMakeQueryName() {
231+
SparkEngine sparkEngine = SparkEngine.getInstance();
232+
FeatureGroup featureGroup = new FeatureGroup();
233+
Integer fgId = 1;
234+
String fgName = "test_fg";
235+
Integer fgVersion = 1;
236+
Integer projectId = 99;
237+
featureGroup.setId(fgId);
238+
featureGroup.setName(fgName);
239+
featureGroup.setVersion(fgVersion);
240+
FeatureStore featureStore = (new FeatureStore());
241+
featureStore.setProjectId(projectId);
242+
featureGroup.setFeatureStore(featureStore);
243+
String queryName = String.format("insert_stream_%d_%d_%s_%d_onlinefs",
244+
featureGroup.getFeatureStore().getProjectId(),
245+
featureGroup.getId(),
246+
featureGroup.getName(),
247+
featureGroup.getVersion()
248+
);
249+
// query name is null
250+
Assertions.assertEquals(queryName, sparkEngine.makeQueryName(null, featureGroup));
251+
// query name is empty
252+
Assertions.assertEquals(queryName, sparkEngine.makeQueryName("", featureGroup));
253+
// query name is not empty
254+
Assertions.assertEquals("test_qn", sparkEngine.makeQueryName("test_qn", featureGroup));
255+
}
230256
}

python/hsfs/client/auth.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class BearerAuth(requests.auth.AuthBase):
2121
"""Class to encapsulate a Bearer token."""
2222

2323
def __init__(self, token):
24-
self._token = token
24+
self._token = token.strip()
2525

2626
def __call__(self, r):
2727
r.headers["Authorization"] = "Bearer " + self._token
@@ -32,7 +32,7 @@ class ApiKeyAuth(requests.auth.AuthBase):
3232
"""Class to encapsulate an API key."""
3333

3434
def __init__(self, token):
35-
self._token = token
35+
self._token = token.strip()
3636

3737
def __call__(self, r):
3838
r.headers["Authorization"] = "ApiKey " + self._token

python/hsfs/client/external.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,12 @@ def __init__(
106106
with open(os.path.join(self._cert_folder, "material_passwd"), "w") as f:
107107
f.write(str(credentials["password"]))
108108

109-
elif engine == "spark":
109+
elif engine == "spark" or engine == "spark-no-metastore":
110110
_spark_session = SparkSession.builder.getOrCreate()
111111

112-
self.validate_spark_configuration(_spark_session)
112+
self.validate_spark_configuration(
113+
_spark_session, engine == "spark-no-metastore"
114+
)
113115
with open(
114116
_spark_session.conf.get("spark.hadoop.hops.ssl.keystores.passwd.name"),
115117
"r",
@@ -123,7 +125,7 @@ def __init__(
123125
"spark.hadoop.hops.ssl.keystore.name"
124126
)
125127

126-
def validate_spark_configuration(self, _spark_session):
128+
def validate_spark_configuration(self, _spark_session, no_metastore):
127129
exception_text = "Spark is misconfigured for communication with Hopsworks, missing or invalid property: "
128130

129131
configuration_dict = {
@@ -135,11 +137,13 @@ def validate_spark_configuration(self, _spark_session):
135137
"spark.hadoop.fs.hopsfs.impl": "io.hops.hopsfs.client.HopsFileSystem",
136138
"spark.hadoop.hops.ssl.keystores.passwd.name": None,
137139
"spark.hadoop.hops.ipc.server.ssl.enabled": "true",
138-
"spark.sql.hive.metastore.jars": None,
139140
"spark.hadoop.client.rpc.ssl.enabled.protocol": "TLSv1.2",
140-
"spark.hadoop.hive.metastore.uris": None,
141141
}
142142

143+
if not no_metastore:
144+
configuration_dict["spark.hadoop.hive.metastore.uris"] = None
145+
configuration_dict["spark.sql.hive.metastore.jars"] = None
146+
143147
for key, value in configuration_dict.items():
144148
if not (
145149
_spark_session.conf.get(key, "not_found") != "not_found"

python/hsfs/connection.py

+7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from requests.exceptions import ConnectionError
2121

2222
from hsfs.decorators import connected, not_connected
23+
from hsfs.core.opensearch import OpenSearchClientSingleton
2324
from hsfs import engine, client, util, usage
2425
from hsfs.core import (
2526
feature_store_api,
@@ -216,6 +217,11 @@ def connect(self):
216217
self._engine = "python"
217218
elif self._engine is not None and self._engine.lower() == "training":
218219
self._engine = "training"
220+
elif (
221+
self._engine is not None
222+
and self._engine.lower() == "spark-no-metastore"
223+
):
224+
self._engine = "spark-no-metastore"
219225
else:
220226
raise ConnectionError(
221227
"Engine you are trying to initialize is unknown. "
@@ -271,6 +277,7 @@ def close(self):
271277
conn.close()
272278
```
273279
"""
280+
OpenSearchClientSingleton().close()
274281
client.stop()
275282
self._feature_store_api = None
276283
engine.stop()

0 commit comments

Comments
 (0)