Skip to content

Commit eb7c05c

Browse files
committed
Merge remote-tracking branch 'origin/main' into prefix-msg
2 parents c45e315 + 147af3e commit eb7c05c

File tree

11 files changed

+60
-18
lines changed

11 files changed

+60
-18
lines changed

java/beam/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>hsfs-parent</artifactId>
77
<groupId>com.logicalclocks</groupId>
8-
<version>4.0.0-SNAPSHOT</version>
8+
<version>4.1.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

java/flink/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>hsfs-parent</artifactId>
77
<groupId>com.logicalclocks</groupId>
8-
<version>4.0.0-SNAPSHOT</version>
8+
<version>4.1.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

java/hsfs/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>hsfs-parent</artifactId>
77
<groupId>com.logicalclocks</groupId>
8-
<version>4.0.0-SNAPSHOT</version>
8+
<version>4.1.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

java/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<groupId>com.logicalclocks</groupId>
88
<artifactId>hsfs-parent</artifactId>
99
<packaging>pom</packaging>
10-
<version>4.0.0-SNAPSHOT</version>
10+
<version>4.1.0-SNAPSHOT</version>
1111
<modules>
1212
<module>hsfs</module>
1313
<module>spark</module>

java/spark/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parent>
2323
<artifactId>hsfs-parent</artifactId>
2424
<groupId>com.logicalclocks</groupId>
25-
<version>4.0.0-SNAPSHOT</version>
25+
<version>4.1.0-SNAPSHOT</version>
2626
</parent>
2727
<modelVersion>4.0.0</modelVersion>
2828

python/hopsworks_common/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414
# limitations under the License.
1515
#
1616

17-
__version__ = "4.0.0.dev1"
17+
__version__ = "4.1.0.dev1"

python/hsfs/core/delta_engine.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ def _generate_merge_query(self, source_alias, updates_alias):
172172
@staticmethod
173173
def _get_last_commit_metadata(spark_context, base_path):
174174
fg_source_table = DeltaTable.forPath(spark_context, base_path)
175+
176+
# Get info about the latest commit
175177
last_commit = fg_source_table.history(1).first().asDict()
176178
version = last_commit["version"]
177179
commit_timestamp = util.convert_event_time_to_timestamp(
@@ -180,6 +182,12 @@ def _get_last_commit_metadata(spark_context, base_path):
180182
commit_date_string = util.get_hudi_datestr_from_timestamp(commit_timestamp)
181183
operation_metrics = last_commit["operationMetrics"]
182184

185+
# Get info about the oldest remaining commit
186+
oldest_commit = fg_source_table.history().orderBy("version").first().asDict()
187+
oldest_commit_timestamp = util.convert_event_time_to_timestamp(
188+
oldest_commit["timestamp"]
189+
)
190+
183191
if version == 0:
184192
fg_commit = feature_group_commit.FeatureGroupCommit(
185193
commitid=None,
@@ -188,7 +196,7 @@ def _get_last_commit_metadata(spark_context, base_path):
188196
rows_inserted=operation_metrics["numOutputRows"],
189197
rows_updated=0,
190198
rows_deleted=0,
191-
last_active_commit_time=commit_timestamp,
199+
last_active_commit_time=oldest_commit_timestamp,
192200
)
193201
else:
194202
fg_commit = feature_group_commit.FeatureGroupCommit(
@@ -198,7 +206,7 @@ def _get_last_commit_metadata(spark_context, base_path):
198206
rows_inserted=operation_metrics["numTargetRowsInserted"],
199207
rows_updated=operation_metrics["numTargetRowsUpdated"],
200208
rows_deleted=operation_metrics["numTargetRowsDeleted"],
201-
last_active_commit_time=commit_timestamp,
209+
last_active_commit_time=oldest_commit_timestamp,
202210
)
203211

204212
return fg_commit

python/hsfs/storage_connector.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1575,9 +1575,8 @@ def connector_options(self) -> Dict[str, Any]:
15751575
"""Return options to be passed to an external BigQuery connector library"""
15761576
props = {
15771577
"key_path": self._key_path,
1578-
"project_id": self._query_project,
1578+
"project_id": self._parent_project,
15791579
"dataset_id": self._dataset,
1580-
"parent_project": self._parent_project,
15811580
}
15821581
return props
15831582

python/tests/fixtures/storage_connector_fixtures.json

+27-1
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@
495495
},
496496
"headers": null
497497
},
498-
"get_big_query": {
498+
"get_big_query_table": {
499499
"response": {
500500
"type": "featurestoreBigQueryConnectorDTO",
501501
"description": "BigQuery connector description",
@@ -508,6 +508,32 @@
508508
"dataset": "test_dataset",
509509
"query_table": "test_query_table",
510510
"query_project": "test_query_project",
511+
"arguments": [{"name": "test_name", "value": "test_value"}]
512+
},
513+
"method": "GET",
514+
"path_params": [
515+
"project",
516+
"119",
517+
"featurestores",
518+
67,
519+
"storageconnectors",
520+
"test_big_query"
521+
],
522+
"query_params": {
523+
"temporaryCredentials": true
524+
},
525+
"headers": null
526+
},
527+
"get_big_query_query": {
528+
"response": {
529+
"type": "featurestoreBigQueryConnectorDTO",
530+
"description": "BigQuery connector description",
531+
"featurestoreId": 67,
532+
"id": 1,
533+
"name": "test_big_query",
534+
"storageConnectorType": "BIGQUERY",
535+
"key_path": "test_key_path",
536+
"parent_project": "test_parent_project",
511537
"materialization_dataset": "test_materialization_dataset",
512538
"arguments": [{"name": "test_name", "value": "test_value"}]
513539
},

python/tests/test_storage_connector.py

+15-6
Original file line numberDiff line numberDiff line change
@@ -800,7 +800,7 @@ def test_default_path(self, mocker):
800800
class TestBigQueryConnector:
801801
def test_from_response_json(self, backend_fixtures):
802802
# Arrange
803-
json = backend_fixtures["storage_connector"]["get_big_query"]["response"]
803+
json = backend_fixtures["storage_connector"]["get_big_query_table"]["response"]
804804

805805
# Act
806806
sc = storage_connector.StorageConnector.from_response_json(json)
@@ -815,7 +815,6 @@ def test_from_response_json(self, backend_fixtures):
815815
assert sc.dataset == "test_dataset"
816816
assert sc.query_table == "test_query_table"
817817
assert sc.query_project == "test_query_project"
818-
assert sc.materialization_dataset == "test_materialization_dataset"
819818
assert sc.arguments == {"test_name": "test_value"}
820819

821820
def test_from_response_json_basic_info(self, backend_fixtures):
@@ -850,7 +849,7 @@ def test_credentials_base64_encoded(self, mocker, backend_fixtures, tmp_path):
850849
credentialsFile = tmp_path / "bigquery.json"
851850
credentialsFile.write_text(credentials)
852851

853-
json = backend_fixtures["storage_connector"]["get_big_query"]["response"]
852+
json = backend_fixtures["storage_connector"]["get_big_query_table"]["response"]
854853
if isinstance(tmp_path, WindowsPath):
855854
json["key_path"] = "file:///" + str(credentialsFile.resolve()).replace(
856855
"\\", "/"
@@ -891,9 +890,7 @@ def test_query_validation(self, mocker, backend_fixtures, tmp_path):
891890
credentials = '{"type": "service_account", "project_id": "test"}'
892891
credentialsFile = tmp_path / "bigquery.json"
893892
credentialsFile.write_text(credentials)
894-
json = backend_fixtures["storage_connector"]["get_big_query"]["response"]
895-
# remove property for query
896-
json.pop("materialization_dataset")
893+
json = backend_fixtures["storage_connector"]["get_big_query_table"]["response"]
897894
if isinstance(tmp_path, WindowsPath):
898895
json["key_path"] = "file:///" + str(credentialsFile.resolve()).replace(
899896
"\\", "/"
@@ -905,3 +902,15 @@ def test_query_validation(self, mocker, backend_fixtures, tmp_path):
905902
# Assert
906903
with pytest.raises(ValueError):
907904
sc.read(query="select * from")
905+
906+
def test_connector_options(self, backend_fixtures):
907+
# Arrange
908+
engine.set_instance("python", python.Engine())
909+
json = backend_fixtures["storage_connector"]["get_big_query_query"]["response"]
910+
sc = storage_connector.StorageConnector.from_response_json(json)
911+
912+
# Act
913+
options = sc.connector_options()
914+
915+
# Assert
916+
assert options["project_id"] == "test_parent_project"

utils/java/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<groupId>com.logicalclocks</groupId>
77
<artifactId>hsfs-utils</artifactId>
8-
<version>4.0.0-SNAPSHOT</version>
8+
<version>4.1.0-SNAPSHOT</version>
99

1010
<properties>
1111
<hops.version>3.2.0.0-SNAPSHOT</hops.version>

0 commit comments

Comments
 (0)