Skip to content

Commit

Permalink
fix: added update stmt for DataRequest (#1710)
Browse files Browse the repository at this point in the history
* fix: added update stmt

* also update DataRequest's ID
  • Loading branch information
paullatzelsperger authored Jul 20, 2022
1 parent 5875eb3 commit 492c8a1
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 43 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ in the detailed section referring to by linking pull requests or issues.
* Old RegistrationService (was used for a PoC) (#164)
* Deprecate `InlineDataFlowController` (replaced by the Data Plane) (#1464)
* Unused classes and interfaces at `ids.spi.policy` (#1471)
* Remove modules `:extensions:transfer-functions:transfer-functions-spi` and `:extensions:transfer-functions:transfer-functions-core` (#1482)
* Remove modules `:extensions:transfer-functions:transfer-functions-spi`
and `:extensions:transfer-functions:transfer-functions-core` (#1482)
* Remove `ConnectorVersionProvider`, provide version as static string (#1470)
* Remove `samples/other/run-from-junit` (#1456)

Expand All @@ -71,6 +72,7 @@ in the detailed section referring to by linking pull requests or issues.
* Adapt logs to the logging guide (#1425)
* Fix incompatibility `DecentralizedIdentityServiceExtension` and `FsPrivateKeyResolver` (#1696)
* Add support for domain port domain in Web Did resolver (#1652)
* Fixed persistence update bug with `DataRequest` (#1707)

## [milestone-4] - 2022-06-07

Expand Down Expand Up @@ -110,7 +112,8 @@ in the detailed section referring to by linking pull requests or issues.

* Restructure sql extension folder tree (#1154)
* Extract single `PolicyArchive` implementation (#1158)
* Replace `accessPolicy` and `contractPolicy` with `accessPolicyId` and `contractPolicyId` on `ContractDefinition` (#1144)
* Replace `accessPolicy` and `contractPolicy` with `accessPolicyId` and `contractPolicyId` on `ContractDefinition` (
#1144)
* Replace `policy` with `policyId` on `ContractAgreement` (#1220)
* All DMgmt Api methods now produce and consume `APPLICATION_JSON` (#1175)
* Make data-plane public api controller asynchronous (#1228)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,17 @@ public SqlTransferProcessStore(DataSourceRegistry dataSourceRegistry, String dat
});
}

/**
* Creates a new {@link TransferProcess}, or updates if one already exists.
*
* @param process The new TransferProcess.
* @throws IllegalArgumentException if the TransferProcess does not have a {@link DataRequest}.
*/
@Override
public void create(TransferProcess process) {
if (process.getDataRequest() == null) {
throw new IllegalArgumentException("Cannot store TransferProcess without a DataRequest");
}
transactionContext.execute(() -> {
if (find(process.getId()) != null) {
update(process);
Expand All @@ -123,6 +132,12 @@ public void create(TransferProcess process) {

}

/**
* Updates a TransferProcess overwriting all properties. The {@link DataRequest} that is associated with the {@link TransferProcess}
* will get updated including its ID (primary key).
*
* @param process The new TransferProcess
*/
@Override
public void update(TransferProcess process) {
transactionContext.execute(() -> {
Expand All @@ -133,9 +148,8 @@ public void update(TransferProcess process) {
insert(process);
} else {
try (var conn = getConnection()) {

leaseContext.by(leaseHolderName).withConnection(conn).breakLease(id);
update(conn, id, process);
update(conn, process, existing.getDataRequest().getId());
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
Expand Down Expand Up @@ -171,16 +185,32 @@ public Stream<TransferProcess> findAll(QuerySpec querySpec) {
return transactionContext.execute(() -> {
try (var conn = getConnection()) {
var statement = statements.createQuery(querySpec);
return executeQuery(conn, this::mapTransferProcess, statement.getQueryAsString(), statement.getParameters()).stream();
return executeQuery(conn, this::mapTransferProcess, statement.getQueryAsString(), statement.getParameters()).stream().distinct();
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

private void update(Connection conn, String transferProcessId, TransferProcess process) {
var stmt = statements.getUpdateTransferProcessTemplate();
executeQuery(conn, stmt, process.getState(),
public DataRequest mapDataRequest(ResultSet resultSet) throws SQLException {
return DataRequest.Builder.newInstance()
.id(resultSet.getString("edc_data_request_id"))
.assetId(resultSet.getString(statements.getAssetIdColumn()))
.protocol(resultSet.getString(statements.getProtocolColumn()))
.dataDestination(fromJson(resultSet.getString(statements.getDataDestinationColumn()), DataAddress.class))
.connectorId(resultSet.getString(statements.getConnectorIdColumn()))
.connectorAddress(resultSet.getString(statements.getConnectorAddressColumn()))
.contractId(resultSet.getString(statements.getContractIdColumn()))
.managedResources(resultSet.getBoolean(statements.getManagedResourcesColumn()))
.transferType(fromJson(resultSet.getString(statements.getTransferTypeColumn()), TransferType.class))
.processId(resultSet.getString(statements.getProcessIdColumn()))
.properties(fromJson(resultSet.getString(statements.getPropertiesColumn()), getTypeRef()))
.build();
}

private void update(Connection conn, TransferProcess process, String existingDataRequestId) {
var updateStmt = statements.getUpdateTransferProcessTemplate();
executeQuery(conn, updateStmt, process.getState(),
process.getStateCount(),
process.getStateTimestamp(),
toJson(process.getTraceContext()),
Expand All @@ -189,7 +219,28 @@ private void update(Connection conn, String transferProcessId, TransferProcess p
toJson(process.getProvisionedResourceSet()),
toJson(process.getContentDataAddress()),
toJson(process.getDeprovisionedResources()),
transferProcessId);
process.getId());

var newDr = process.getDataRequest();
updateDataRequest(conn, newDr, existingDataRequestId);
}

private void updateDataRequest(Connection conn, DataRequest dataRequest, String existingDataRequestId) {
var updateDrStmt = statements.getUpdateDataRequestTemplate();

executeQuery(conn, updateDrStmt,
dataRequest.getId(),
dataRequest.getProcessId(),
dataRequest.getConnectorAddress(),
dataRequest.getProtocol(),
dataRequest.getConnectorId(),
dataRequest.getAssetId(),
dataRequest.getContractId(),
toJson(dataRequest.getDataDestination()),
dataRequest.isManagedResources(),
toJson(dataRequest.getProperties()),
toJson(dataRequest.getTransferType()),
existingDataRequestId);
}

/**
Expand Down Expand Up @@ -229,26 +280,32 @@ private void insert(TransferProcess process) {

//insert DataRequest
var dr = process.getDataRequest();
var insertDrStmt = statements.getInsertDataRequestTemplate();
executeQuery(conn, insertDrStmt,
dr.getId(),
dr.getProcessId(),
dr.getConnectorAddress(),
dr.getConnectorId(),
dr.getAssetId(),
dr.getContractId(),
toJson(dr.getDataDestination()),
toJson(dr.getProperties()),
toJson(dr.getTransferType()),
process.getId(),
dr.getProtocol(),
dr.isManagedResources());
if (dr != null) {
insertDataRequest(process.getId(), dr, conn);
}
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

private void insertDataRequest(String processId, DataRequest dr, Connection conn) {
var insertDrStmt = statements.getInsertDataRequestTemplate();
executeQuery(conn, insertDrStmt,
dr.getId(),
dr.getProcessId(),
dr.getConnectorAddress(),
dr.getConnectorId(),
dr.getAssetId(),
dr.getContractId(),
toJson(dr.getDataDestination()),
toJson(dr.getProperties()),
toJson(dr.getTransferType()),
processId,
dr.getProtocol(),
dr.isManagedResources());
}

private TransferProcess mapTransferProcess(ResultSet resultSet) throws SQLException {
return TransferProcess.Builder.newInstance()
.id(resultSet.getString(statements.getIdColumn()))
Expand All @@ -268,24 +325,6 @@ private TransferProcess mapTransferProcess(ResultSet resultSet) throws SQLExcept
.build();
}


private DataRequest mapDataRequest(ResultSet resultSet) throws SQLException {
return DataRequest.Builder.newInstance()
.id(resultSet.getString("edc_data_request_id"))
.assetId(resultSet.getString(statements.getAssetIdColumn()))
.protocol(resultSet.getString(statements.getProtocolColumn()))
.dataDestination(fromJson(resultSet.getString(statements.getDataDestinationColumn()), DataAddress.class))
.connectorId(resultSet.getString(statements.getConnectorIdColumn()))
.connectorAddress(resultSet.getString(statements.getConnectorAddressColumn()))
.contractId(resultSet.getString(statements.getContractIdColumn()))
.managedResources(resultSet.getBoolean(statements.getManagedResourcesColumn()))
.transferType(fromJson(resultSet.getString(statements.getTransferTypeColumn()), TransferType.class))
.processId(resultSet.getString(statements.getProcessIdColumn()))
.properties(fromJson(resultSet.getString(statements.getPropertiesColumn()), getTypeRef()))

.build();
}

@NotNull
private <T> TypeReference<T> getTypeRef() {
return new TypeReference<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ public String getSelectTemplate() {
getTransferProcessTableName(), getDataRequestTable(), getTransferProcessTableName(), getIdColumn(), getProcessIdColumn());
}

@Override
public String getUpdateDataRequestTemplate() {
return format("UPDATE %s SET %s=?, %s=?, %s=?, %s=?, %s=?, %s=?, %s=?, %s=?%s, %s=?, %s=?%s, %s=?%s WHERE %s=?",
getDataRequestTable(),
getDataRequestIdColumn(), getProcessIdColumn(), getConnectorAddressColumn(), getProtocolColumn(), getConnectorIdColumn(), getAssetIdColumn(), getContractIdColumn(),
getDataDestinationColumn(), getFormatAsJsonOperator(), getManagedResourcesColumn(), getPropertiesColumn(), getFormatAsJsonOperator(), getTransferTypeColumn(), getFormatAsJsonOperator(),
getDataRequestIdColumn());
}

@Override
public SqlQueryStatement createQuery(QuerySpec querySpec) {
return new SqlQueryStatement(getSelectTemplate(), querySpec, new TransferProcessMapping(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public interface TransferProcessStoreStatements extends LeaseStatements {

String getSelectTemplate();

String getUpdateDataRequestTemplate();

default String getIdColumn() {
return "transferprocess_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@
import javax.sql.DataSource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.eclipse.dataspaceconnector.sql.SqlQueryExecutor.executeQuery;
import static org.eclipse.dataspaceconnector.sql.transferprocess.store.TestFunctions.createDataRequest;
import static org.eclipse.dataspaceconnector.sql.transferprocess.store.TestFunctions.createDataRequestBuilder;
import static org.eclipse.dataspaceconnector.sql.transferprocess.store.TestFunctions.createTransferProcess;
import static org.eclipse.dataspaceconnector.sql.transferprocess.store.TestFunctions.createTransferProcessBuilder;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -275,6 +278,24 @@ void processIdForTransferId_notExist() {
assertThat(store.processIdForTransferId("not-exist")).isNull();
}

@Test
void update_shouldPersistDataRequest() {
var t1 = createTransferProcess("id1", TransferProcessStates.IN_PROGRESS);
store.create(t1);

t1.getDataRequest().getProperties().put("newKey", "newValue");
store.update(t1);

var all = store.findAll(QuerySpec.none()).collect(Collectors.toList());
assertThat(all)
.hasSize(1)
.usingRecursiveFieldByFieldElementComparator()
.containsExactly(t1);

assertThat(all.get(0).getDataRequest().getProperties()).containsEntry("newKey", "newValue");
}


@Test
void update_exists_shouldUpdate() {
var t1 = createTransferProcess("id1", TransferProcessStates.IN_PROGRESS);
Expand Down Expand Up @@ -339,7 +360,7 @@ void delete() {
store.create(t1);

store.delete("id1");
assertThat(count()).isEqualTo(0);
assertThat(countTransferProcesses()).isEqualTo(0);
assertThat(store.findAll(QuerySpec.none())).isEmpty();
}

Expand Down Expand Up @@ -419,6 +440,43 @@ void findAll_verifyPaging_pageSizeOutsideCollection() {

}

@Test
void create_withoutDataRequest_throwsException() {
var t1 = createTransferProcessBuilder("id1")
.dataRequest(null)
.build();
assertThatIllegalArgumentException().isThrownBy(() -> store.create(t1));
}

@Test
void update_dataRequestWithNewId_replacesOld() {
var t1 = createTransferProcess("id1", TransferProcessStates.IN_PROGRESS);
store.create(t1);

var t2 = createTransferProcessBuilder("id1")
.dataRequest(createDataRequestBuilder()
.id("new-dr-id")
.assetId("new-asset")
.contractId("new-contract")
.protocol("test-protocol")
.connectorId("new-connector")
.build())
.build();
store.update(t2);

var all = store.findAll(QuerySpec.none()).collect(Collectors.toList());
assertThat(all)
.hasSize(1)
.usingRecursiveFieldByFieldElementComparator()
.containsExactly(t2);


var drs = all.stream().map(TransferProcess::getDataRequest).collect(Collectors.toList());
assertThat(drs).hasSize(1)
.usingRecursiveFieldByFieldElementComparator()
.containsOnly(t2.getDataRequest());
}

private Connection getConnection() {
try {
return dataSourceRegistry.resolve(DATASOURCE_NAME).getConnection();
Expand All @@ -427,14 +485,15 @@ private Connection getConnection() {
}
}

private int count() {
private int countTransferProcesses() {
try (var conn = dataSourceRegistry.resolve(DATASOURCE_NAME).getConnection()) {
return executeQuery(conn, "SELECT COUNT(*) FROM edc_transfer_process");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}


private static class H2DialectStatements extends BaseSqlDialectStatements {
}
}

0 comments on commit 492c8a1

Please sign in to comment.