From fb872ddf59e8c9b782ff8cc2fb63387c95264e3b Mon Sep 17 00:00:00 2001 From: albertrodriguezin2 <166031280+albertrodriguezin2@users.noreply.github.com> Date: Tue, 11 Feb 2025 12:54:29 +0100 Subject: [PATCH] fix/replication (#72) * feat(infrastructure): exit application if cannot create broker subscription or blockchain subscription in the application runner * feat(infrastructure): exit application if cannot get access node public keys from external yaml * feat(infrastructure): exit application if cannot synchronize in startup * fix(service): remove consistency validation in p2p * build(gradle): change version --- CHANGELOG.md | 6 + build.gradle | 2 +- .../es/in2/desmos/DesmosApiApplication.java | 7 +- .../runners/ApplicationRunner.java | 23 +- ...cationJob.java => DataPublicationJob.java} | 2 +- .../jobs/impl/DataPublicationJobImpl.java | 56 ++++ .../jobs/impl/DataTransferJobImpl.java | 75 ++--- .../jobs/impl/DataVerificationJobImpl.java | 164 ---------- .../jobs/DataPublicationJobTest.java | 148 +++++++++ .../workflows/jobs/DataTransferJobTest.java | 28 +- .../jobs/DataVerificationJobTest.java | 306 ------------------ 11 files changed, 281 insertions(+), 536 deletions(-) rename src/main/java/es/in2/desmos/application/workflows/jobs/{DataVerificationJob.java => DataPublicationJob.java} (93%) create mode 100644 src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataPublicationJobImpl.java delete mode 100644 src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataVerificationJobImpl.java create mode 100644 src/test/java/es/in2/desmos/application/workflows/jobs/DataPublicationJobTest.java delete mode 100644 src/test/java/es/in2/desmos/application/workflows/jobs/DataVerificationJobTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 865dbbd6..57e3de2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Retry mechanism for Blockchain Adapter and Context Broker. - Recover after a failure mechanism for Blockchain Adapter and Context Broker. +## [Released]: v1.0.7 +- Fix entity updates to Scorpio during the synchronization process. +- Terminate the application if startup fails to complete successfully. +- Remove hash link validation in P2P synchronization. +- Add a warning log when an entity fails to synchronize in P2P due to an invalid integrity check. + ## [Released]: v1.0.6 - Fix slow synchronization diff --git a/build.gradle b/build.gradle index b1751fcc..58ea4096 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,7 @@ plugins { } group = 'es.in2' -version = '1.0.6' +version = '1.0.7' java { sourceCompatibility = '17' diff --git a/src/main/java/es/in2/desmos/DesmosApiApplication.java b/src/main/java/es/in2/desmos/DesmosApiApplication.java index 243b9899..257ef4c3 100644 --- a/src/main/java/es/in2/desmos/DesmosApiApplication.java +++ b/src/main/java/es/in2/desmos/DesmosApiApplication.java @@ -3,9 +3,11 @@ import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; +import lombok.Getter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableScheduling; @@ -18,8 +20,11 @@ public class DesmosApiApplication { .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true) .build(); + @Getter + private static ConfigurableApplicationContext context; + public static void main(String[] args) { - SpringApplication.run(DesmosApiApplication.class, args); + context = SpringApplication.run(DesmosApiApplication.class, args); } @Bean diff --git a/src/main/java/es/in2/desmos/application/runners/ApplicationRunner.java b/src/main/java/es/in2/desmos/application/runners/ApplicationRunner.java index b2ce9c55..7b975611 100644 --- a/src/main/java/es/in2/desmos/application/runners/ApplicationRunner.java +++ b/src/main/java/es/in2/desmos/application/runners/ApplicationRunner.java @@ -1,5 +1,6 @@ package es.in2.desmos.application.runners; +import es.in2.desmos.DesmosApiApplication; import es.in2.desmos.application.workflows.DataSyncWorkflow; import es.in2.desmos.application.workflows.PublishWorkflow; import es.in2.desmos.application.workflows.SubscribeWorkflow; @@ -14,6 +15,7 @@ import es.in2.desmos.infrastructure.configs.BrokerConfig; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; @@ -23,6 +25,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.util.ArrayList; import java.util.List; @@ -57,9 +60,13 @@ public Mono onApplicationReady() { String processId = UUID.randomUUID().toString(); log.info("ProcessID: {} - Setting initial configurations...", processId); return setBrokerSubscription(processId) - .then(setBlockchainSubscription(processId)) - .then(setAccessNodePublicKeysFromExternalYaml(processId)) - .thenMany(initializeDataSync(processId)) + .doOnError(error -> finishApplication("Broker Subscription", error)) + .then(setBlockchainSubscription(processId) + .doOnError(error -> finishApplication("Blockchain Subscription", error))) + .then(setAccessNodePublicKeysFromExternalYaml(processId) + .doOnError(error -> finishApplication("Access Node Public Keys Getting", error))) + .thenMany(initializeDataSync(processId) + .doOnError(error -> finishApplication("Initialize Data Sync", error))) .then(); } @@ -182,4 +189,14 @@ private void startBrokerEventProcessing(String processId) { () -> log.info("ProcessID: {} - Subscribe Workflow completed", processId) ); } + + private void finishApplication(String step, Throwable error) { + log.error("Error in {}: {}", step, error.getMessage(), error); + + Mono.fromRunnable(() -> { + int exitCode = SpringApplication.exit(DesmosApiApplication.getContext(), () -> 0); + log.info("Application exiting with code {}", exitCode); + System.exit(exitCode); + }).subscribeOn(Schedulers.boundedElastic()).subscribe(); + } } diff --git a/src/main/java/es/in2/desmos/application/workflows/jobs/DataVerificationJob.java b/src/main/java/es/in2/desmos/application/workflows/jobs/DataPublicationJob.java similarity index 93% rename from src/main/java/es/in2/desmos/application/workflows/jobs/DataVerificationJob.java rename to src/main/java/es/in2/desmos/application/workflows/jobs/DataPublicationJob.java index 8f106232..6a2f0592 100644 --- a/src/main/java/es/in2/desmos/application/workflows/jobs/DataVerificationJob.java +++ b/src/main/java/es/in2/desmos/application/workflows/jobs/DataPublicationJob.java @@ -9,6 +9,6 @@ import java.util.List; import java.util.Map; -public interface DataVerificationJob { +public interface DataPublicationJob { Mono verifyData(String processId, Mono issuer, Mono> entitiesByIdMono, Mono> allMVEntity4DataNegotiation, Mono> existingEntitiesOriginalValidationDataById); } diff --git a/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataPublicationJobImpl.java b/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataPublicationJobImpl.java new file mode 100644 index 00000000..c5376133 --- /dev/null +++ b/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataPublicationJobImpl.java @@ -0,0 +1,56 @@ +package es.in2.desmos.application.workflows.jobs.impl; + +import es.in2.desmos.application.workflows.jobs.DataPublicationJob; +import es.in2.desmos.domain.models.*; +import es.in2.desmos.domain.services.api.AuditRecordService; +import es.in2.desmos.domain.services.broker.BrokerPublisherService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; + +@Slf4j +@Service +@RequiredArgsConstructor +public class DataPublicationJobImpl implements DataPublicationJob { + private final AuditRecordService auditRecordService; + private final BrokerPublisherService brokerPublisherService; + + public Mono verifyData(String processId, Mono issuer, Mono> entitiesByIdMono, Mono> allMVEntity4DataNegotiation, Mono> existingEntitiesOriginalValidationDataById) { + log.info("ProcessID: {} - Starting Data Verification Job", processId); + + return buildAndSaveAuditRecordFromDataSync(processId, issuer, entitiesByIdMono, allMVEntity4DataNegotiation, AuditRecordStatus.RETRIEVED) + .then(createEntitiesToContextBroker(processId, entitiesByIdMono)) + .then(buildAndSaveAuditRecordFromDataSync(processId, issuer, entitiesByIdMono, allMVEntity4DataNegotiation, AuditRecordStatus.PUBLISHED)); + } + + private Mono buildAndSaveAuditRecordFromDataSync(String processId, Mono issuerMono, Mono> rcvdEntitiesByIdMono, Mono> mvEntity4DataNegotiationListMono, AuditRecordStatus auditRecordStatus) { + return Mono.zip(rcvdEntitiesByIdMono, mvEntity4DataNegotiationListMono) + .flatMapMany(tuple -> { + Map rcvdEntitiesById = tuple.getT1(); + List mvEntity4DataNegotiationList = tuple.getT2(); + + return Flux.fromIterable(mvEntity4DataNegotiationList) + .filter(entity4DataNegotiation -> rcvdEntitiesById.containsKey(new Id(entity4DataNegotiation.id()))) + .concatMap(entity4DataNegotiation -> issuerMono + .flatMap(issuer -> { + MVAuditServiceEntity4DataNegotiation mvAuditServiceEntity4DataNegotiation = new MVAuditServiceEntity4DataNegotiation(entity4DataNegotiation.id(), entity4DataNegotiation.type(), entity4DataNegotiation.hash(), entity4DataNegotiation.hashlink()); + return auditRecordService.buildAndSaveAuditRecordFromDataSync(processId, issuer, mvAuditServiceEntity4DataNegotiation, auditRecordStatus); + })); + }) + .collectList() + .then(); + } + + private Mono createEntitiesToContextBroker(String processId, Mono> entitiesByIdMono) { + return entitiesByIdMono + .flatMapIterable(Map::entrySet) + .flatMap(x -> brokerPublisherService.publishDataToBroker(processId, x.getKey().id(), x.getValue().value())) + .collectList() + .then(); + } +} \ No newline at end of file diff --git a/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataTransferJobImpl.java b/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataTransferJobImpl.java index 5f35ab9e..68cb4edf 100644 --- a/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataTransferJobImpl.java +++ b/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataTransferJobImpl.java @@ -5,8 +5,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.JsonParser; +import es.in2.desmos.application.workflows.jobs.DataPublicationJob; import es.in2.desmos.application.workflows.jobs.DataTransferJob; -import es.in2.desmos.application.workflows.jobs.DataVerificationJob; import es.in2.desmos.domain.exceptions.InvalidSyncResponseException; import es.in2.desmos.domain.models.*; import es.in2.desmos.domain.services.api.AuditRecordService; @@ -16,7 +16,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.security.NoSuchAlgorithmException; @@ -32,7 +31,7 @@ public class DataTransferJobImpl implements DataTransferJob { public static final String INVALID_ENTITY_SYNC_RESPONSE = "Invalid EntitySync response."; private final EntitySyncWebClient entitySyncWebClient; - private final DataVerificationJob dataVerificationJob; + private final DataPublicationJob dataPublicationJob; private final ObjectMapper objectMapper; private final AuditRecordService auditRecordService; @@ -88,14 +87,14 @@ public Mono syncData(String processId, Mono dataNeg Mono.just(result.existingEntitiesToSync()) ); - return createReceivedAuditRecords(processId, issuer, decodedEntitySyncResponseMono) - .then(getInvalidIntegrityEntitiesIds(entitiesByIdMono, entitiesHashAndHashlinkById)) + return createReceivedAuditRecords(processId, issuer, mvEntities4DataNegotiation) + .then(getInvalidIntegrityEntitiesIds(processId, entitiesByIdMono, entitiesHashAndHashlinkById)) .flatMap(invalidIntegrityEntitiesIds -> { Mono> invalidIntegrityEntitiesIdsMono = Mono.just(invalidIntegrityEntitiesIds); return filterEntitiesById(entitiesByIdMono, invalidIntegrityEntitiesIdsMono) .flatMap(filteredEntitiesById -> { Mono> filteredEntitiesByIdMono = Mono.just(filteredEntitiesById); - return dataVerificationJob.verifyData(processId, issuer, filteredEntitiesByIdMono, mvEntities4DataNegotiation, existingEntitiesHashAndHashLinkById); + return dataPublicationJob.verifyData(processId, issuer, filteredEntitiesByIdMono, mvEntities4DataNegotiation, existingEntitiesHashAndHashLinkById); }); }); @@ -108,45 +107,26 @@ public Mono syncData(String processId, Mono dataNeg }); } - private Mono createReceivedAuditRecords(String processId, Mono issuerMono, Mono entitiesArrayJsonMono) { - return entitiesArrayJsonMono - .flatMap(entitiesArrayJson -> { - try { - JsonNode entitiesJsonNode = objectMapper.readTree(entitiesArrayJson); - - if (!entitiesJsonNode.isArray()) { - return Mono.error(new InvalidSyncResponseException(INVALID_ENTITY_SYNC_RESPONSE)); - } - - return Flux.fromIterable(entitiesJsonNode) - .concatMap(entityNode -> { - if (!entityNode.has("id") || !entityNode.has("type")) { - return Mono.error(new InvalidSyncResponseException(INVALID_ENTITY_SYNC_RESPONSE)); - } - - String entityId = entityNode.get("id").asText(); - String entityType = entityNode.get("type").asText(); - - var mvAuditService = new MVAuditServiceEntity4DataNegotiation( - entityId, - entityType, - "", - ""); - - return issuerMono.flatMap(issuer -> - auditRecordService - .buildAndSaveAuditRecordFromDataSync( - processId, - issuer, - mvAuditService, - AuditRecordStatus.RECEIVED)); - }) - .collectList() - .then(); - } catch (JsonProcessingException e) { - return Mono.error(e); - } - }); + private Mono createReceivedAuditRecords(String processId, Mono issuerMono, Mono> mvEntities4DataNegotiation) { + return mvEntities4DataNegotiation + .flatMapIterable(x -> x) + .concatMap(x -> { + var mvAuditService = new MVAuditServiceEntity4DataNegotiation( + x.id(), + x.type(), + "", + ""); + + return issuerMono.flatMap(issuer -> + auditRecordService + .buildAndSaveAuditRecordFromDataSync( + processId, + issuer, + mvAuditService, + AuditRecordStatus.RECEIVED)); + }) + .collectList() + .then(); } private Mono decodeEntitySyncResponse(Mono> entitySyncResponseMono) { @@ -222,7 +202,7 @@ private Mono> getEntitiesHashAndHashLinkById(Mono> getInvalidIntegrityEntitiesIds(Mono> entitiesByIdMono, Mono> allEntitiesExistingValidationDataById) { + private Mono> getInvalidIntegrityEntitiesIds(String processId, Mono> entitiesByIdMono, Mono> allEntitiesExistingValidationDataById) { return allEntitiesExistingValidationDataById .flatMapIterable(Map::entrySet) .flatMap(entry -> { @@ -236,6 +216,9 @@ private Mono> getInvalidIntegrityEntitiesIds(Mono> enti if (calculatedHash.equals(hashValue)) { return Mono.empty(); } else { + log.warn("ProcessID: {} - P2P replication attempt failed for entity '{}' " + + "due to hash mismatch: received hash does not match the " + + "calculated hash of the entity", processId, id); log.debug("Expected hash: {}\nCurrent hash: {}", hashValue, calculatedHash); return Mono.just(id); } diff --git a/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataVerificationJobImpl.java b/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataVerificationJobImpl.java deleted file mode 100644 index ab8a85f7..00000000 --- a/src/main/java/es/in2/desmos/application/workflows/jobs/impl/DataVerificationJobImpl.java +++ /dev/null @@ -1,164 +0,0 @@ -package es.in2.desmos.application.workflows.jobs.impl; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import es.in2.desmos.application.workflows.jobs.DataVerificationJob; -import es.in2.desmos.domain.exceptions.InvalidConsistencyException; -import es.in2.desmos.domain.models.*; -import es.in2.desmos.domain.services.api.AuditRecordService; -import es.in2.desmos.domain.services.broker.BrokerPublisherService; -import es.in2.desmos.domain.utils.ApplicationUtils; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import reactor.core.publisher.Mono; - -import java.security.NoSuchAlgorithmException; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -@Slf4j -@Service -@RequiredArgsConstructor -public class DataVerificationJobImpl implements DataVerificationJob { - private final AuditRecordService auditRecordService; - private final BrokerPublisherService brokerPublisherService; - private final ObjectMapper objectMapper; - - public Mono verifyData(String processId, Mono issuer, Mono> entitiesByIdMono, Mono> allMVEntity4DataNegotiation, Mono> existingEntitiesOriginalValidationDataById) { - log.info("ProcessID: {} - Starting Data Verification Job", processId); - - return validateConsistency(processId, entitiesByIdMono, existingEntitiesOriginalValidationDataById) - .then(buildAndSaveAuditRecordFromDataSync(processId, issuer, entitiesByIdMono, allMVEntity4DataNegotiation, AuditRecordStatus.RETRIEVED)) - .then(createEntitiesToContextBroker(processId, entitiesByIdMono)) - .then(buildAndSaveAuditRecordFromDataSync(processId, issuer, entitiesByIdMono, allMVEntity4DataNegotiation, AuditRecordStatus.PUBLISHED)); - } - - private Mono validateConsistency(String processId, Mono> rcvdEntitiesByIdMono, Mono> existingEntitiesValidationDataByIdMono) { - return rcvdEntitiesByIdMono - .zipWith(existingEntitiesValidationDataByIdMono) - .flatMap(tuple -> { - Map rcvdEntity = tuple.getT1(); - Map existingEntitiesValidationDataById = tuple.getT2(); - - List commonEntities = rcvdEntity.keySet().stream() - .filter(recvdId -> existingEntitiesValidationDataById - .keySet() - .stream() - .anyMatch(existingId -> existingId.equals(recvdId))) - .toList(); - Mono> idMonoList = Mono.just(commonEntities); - return idMonoList - .flatMapIterable(list -> list) - .flatMap(id -> auditRecordService - .findLatestConsumerPublishedAuditRecordByEntityId(processId, id.id()) - .flatMap(auditRecord -> { - String entityData = rcvdEntity.get(id).value(); - Mono entityDataMono = Mono.just(entityData); - Mono currentEntityHashMono = calculateHash(entityDataMono); - return currentEntityHashMono.flatMap(currentEntityHash -> { - String existingHashLink = existingEntitiesValidationDataById.get(id).hashLink(); - - try { - String calculatedHashLink = ApplicationUtils.calculateHashLink(auditRecord.getEntityHashLink(), currentEntityHash); - if (calculatedHashLink.equals(existingHashLink)) { - return Mono.empty(); - } else { - log.debug("ProcessID: {} - Starting Data Verification Job\nId: {}\nEntity: {}\n OldHashlink: {}\n New Hashlink: {}\n Calculated hashlink: {}\n Expected hashlink: {}", processId, id.id(), entityData, auditRecord.getEntityHashLink(), currentEntityHash, calculatedHashLink, existingHashLink); - return Mono.error(new InvalidConsistencyException("The hashlink received does not correspond to that of the entity.")); - } - } catch (NoSuchAlgorithmException | JsonProcessingException e) { - log.warn("ProcessID: {} - Error Calculating hashlink: {}", processId, e.getMessage()); - log.debug("ProcessID: {} - Error Calculating hashlink:\nId: {}\nEntity: {}\n OldHashlink: {}\n New Hashlink: {}\n Expected hashlink: {}", processId, id.id(), entityData, auditRecord.getEntityHashLink(), currentEntityHash, existingHashLink); - return Mono.error(e); - } - }); - })) - .collectList() - .then(); - }); - } - - private Mono buildAndSaveAuditRecordFromDataSync(String processId, Mono issuerMono, Mono> rcvdEntitiesByIdMono, Mono> mvEntity4DataNegotiationListMono, AuditRecordStatus auditRecordStatus) { - return rcvdEntitiesByIdMono - .flatMapIterable(Map::entrySet) - .concatMap(rcvdEntityById -> { - String id = rcvdEntityById.getKey().id(); - - return mvEntity4DataNegotiationListMono - .flatMap(list -> { - Optional mvEntity4DataNegotiation = list.stream() - .filter(x -> x.id().equals(id)) - .findFirst(); - return mvEntity4DataNegotiation.map(entity4DataNegotiation -> issuerMono - .flatMap(issuer -> { - MVAuditServiceEntity4DataNegotiation mvAuditServiceEntity4DataNegotiation = new MVAuditServiceEntity4DataNegotiation(entity4DataNegotiation.id(), entity4DataNegotiation.type(), entity4DataNegotiation.hash(), entity4DataNegotiation.hashlink()); - return auditRecordService - .buildAndSaveAuditRecordFromDataSync(processId, issuer, mvAuditServiceEntity4DataNegotiation, auditRecordStatus); - })) - .orElseGet(() -> issuerMono - .flatMap(issuer -> getMVEntity4DataNegotiationForNewSubEntity(processId, Mono.just(rcvdEntityById), Mono.just(id)) - .flatMap(newMVEntity4DataNegotiation -> auditRecordService - .buildAndSaveAuditRecordFromDataSync(processId, issuer, newMVEntity4DataNegotiation, auditRecordStatus)))); - }); - - }) - .collectList() - .then(); - } - - // Canviar perquè retorni això. - private Mono getMVEntity4DataNegotiationForNewSubEntity(String processId, Mono> rcvdEntityByIdMono, Mono idMono) { - return rcvdEntityByIdMono.flatMap(rcvdEntityById -> { - String entity = rcvdEntityById.getValue().value(); - try { - JsonNode entityNode = objectMapper.readTree(entity); - - String type = entityNode.get("type").asText(); - - Mono calculatedHashMono = calculateHash(Mono.just(entity)); - Mono hashLinkMono = getHashLinkForNewSubEntity(processId, calculatedHashMono, idMono); - return idMono - .zipWith(hashLinkMono) - .flatMap(tuple -> { - String id = tuple.getT1(); - String hashLink = tuple.getT2(); - - return calculatedHashMono.map(hash -> - new MVAuditServiceEntity4DataNegotiation(id, type, hash, hashLink)); - }); - } catch (JsonProcessingException e) { - return Mono.error(e); - } - }); - } - - private Mono getHashLinkForNewSubEntity(String processId, Mono hashMono, Mono idMono) { - return idMono.flatMap(entityId -> - auditRecordService.findLatestConsumerPublishedAuditRecordByEntityId(processId, entityId) - .filter(auditRecord -> auditRecord.getEntityId().equals(entityId)) - .map(AuditRecord::getEntityHashLink) - .switchIfEmpty(hashMono) - ); - } - - private Mono createEntitiesToContextBroker(String processId, Mono> entitiesByIdMono) { - return entitiesByIdMono - .flatMapIterable(Map::values) - .flatMap(x -> brokerPublisherService.postEntity(processId, x.value())) - .collectList() - .then(); - } - - private Mono calculateHash(Mono retrievedBrokerEntityMono) { - return retrievedBrokerEntityMono.flatMap(sortedAttributesBrokerEntity -> { - try { - return Mono.just(ApplicationUtils.calculateSHA256(sortedAttributesBrokerEntity)); - } catch (NoSuchAlgorithmException | JsonProcessingException e) { - return Mono.error(e); - } - }); - } -} \ No newline at end of file diff --git a/src/test/java/es/in2/desmos/application/workflows/jobs/DataPublicationJobTest.java b/src/test/java/es/in2/desmos/application/workflows/jobs/DataPublicationJobTest.java new file mode 100644 index 00000000..1a19ae39 --- /dev/null +++ b/src/test/java/es/in2/desmos/application/workflows/jobs/DataPublicationJobTest.java @@ -0,0 +1,148 @@ +package es.in2.desmos.application.workflows.jobs; + +import com.fasterxml.jackson.core.JsonProcessingException; +import es.in2.desmos.application.workflows.jobs.impl.DataPublicationJobImpl; +import es.in2.desmos.domain.models.*; +import es.in2.desmos.domain.services.api.AuditRecordService; +import es.in2.desmos.domain.services.broker.BrokerPublisherService; +import es.in2.desmos.objectmothers.DataNegotiationResultMother; +import es.in2.desmos.objectmothers.EntityMother; +import es.in2.desmos.objectmothers.MVAuditServiceEntity4DataNegotiationMother; +import es.in2.desmos.objectmothers.MVEntity4DataNegotiationMother; +import org.json.JSONException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class DataPublicationJobTest { + + @InjectMocks + private DataPublicationJobImpl dataVerificationJob; + @Mock + private AuditRecordService auditRecordService; + @Mock + private BrokerPublisherService brokerPublisherService; + + @Captor + private ArgumentCaptor mvAuditServiceEntity4DataNegotiationArgumentCaptor; + + + @Test + void itShouldBuildAnSaveAuditRecord() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { + DataNegotiationResult dataNegotiationResult = DataNegotiationResultMother.newToSync4AndExistingToSync2(); + + String processId = "0"; + + when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); + + when(brokerPublisherService.publishDataToBroker(any(), any(), any())).thenReturn(Mono.empty()); + + Mono issuer = Mono.just("http://example.org"); + + Map entitiesById = new HashMap<>(); + entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new Entity(EntityMother.PRODUCT_OFFERING_2)); + entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new Entity(EntityMother.PRODUCT_OFFERING_4)); + + List allMVEntity4DataNegotiation = new ArrayList<>(); + allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample2()); + allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample4()); + + Map existingEntitiesOriginalValidationDataById = new HashMap<>(); + existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample2().hash(), MVEntity4DataNegotiationMother.sample2().hashlink())); + + Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); + + StepVerifier. + create(result) + .verifyComplete(); + + verify(auditRecordService, times(2)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq(dataNegotiationResult.issuer()), any(), eq(AuditRecordStatus.RETRIEVED)); + verify(auditRecordService, times(2)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq(dataNegotiationResult.issuer()), any(), eq(AuditRecordStatus.PUBLISHED)); + } + + @Test + void itShouldNotBuildAnSaveAuditRecordForSubEntity() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { + + String processId = "0"; + + when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); + + when(brokerPublisherService.publishDataToBroker(any(), any(), any())).thenReturn(Mono.empty()); + + Mono issuer = Mono.just("http://example.org"); + + Map entitiesById = new HashMap<>(); + entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new Entity(EntityMother.PRODUCT_OFFERING_2)); + entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new Entity(EntityMother.PRODUCT_OFFERING_4)); + + List allMVEntity4DataNegotiation = new ArrayList<>(); + allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample4()); + + Map existingEntitiesOriginalValidationDataById = new HashMap<>(); + existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample2().hash(), MVEntity4DataNegotiationMother.sample2().hashlink())); + + Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); + + StepVerifier. + create(result) + .verifyComplete(); + + verify(auditRecordService, times(1)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq("http://example.org"), mvAuditServiceEntity4DataNegotiationArgumentCaptor.capture(), eq(AuditRecordStatus.RETRIEVED)); + verify(auditRecordService, times(1)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq("http://example.org"), any(), eq(AuditRecordStatus.PUBLISHED)); + + var mvEntity4DataNegotiationSentToAuditRecord = mvAuditServiceEntity4DataNegotiationArgumentCaptor.getAllValues(); + + assertThat(mvEntity4DataNegotiationSentToAuditRecord.get(0)).isEqualTo(MVAuditServiceEntity4DataNegotiationMother.sample4()); + } + + @Test + void itShouldUpsertEntities() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { + String processId = "0"; + + when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); + + when(brokerPublisherService.publishDataToBroker(any(), any(), any())).thenReturn(Mono.empty()); + + Mono issuer = Mono.just("http://example.org"); + + Map entitiesById = new HashMap<>(); + String productOffering2 = EntityMother.PRODUCT_OFFERING_2; + entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new Entity(productOffering2)); + String productOffering4 = EntityMother.PRODUCT_OFFERING_4; + entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new Entity(productOffering4)); + + List allMVEntity4DataNegotiation = new ArrayList<>(); + allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample2()); + allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample4()); + + Map existingEntitiesOriginalValidationDataById = new HashMap<>(); + existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample2().hash(), MVEntity4DataNegotiationMother.sample2().hashlink())); + + Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); + + + StepVerifier. + create(result) + .verifyComplete(); + + verify(brokerPublisherService, times(1)).publishDataToBroker(processId, MVEntity4DataNegotiationMother.sample2().id(), productOffering2); + verify(brokerPublisherService, times(1)).publishDataToBroker(processId, MVEntity4DataNegotiationMother.sample4().id(), productOffering4); + verifyNoMoreInteractions(brokerPublisherService); + } +} \ No newline at end of file diff --git a/src/test/java/es/in2/desmos/application/workflows/jobs/DataTransferJobTest.java b/src/test/java/es/in2/desmos/application/workflows/jobs/DataTransferJobTest.java index 411088c8..410068a9 100644 --- a/src/test/java/es/in2/desmos/application/workflows/jobs/DataTransferJobTest.java +++ b/src/test/java/es/in2/desmos/application/workflows/jobs/DataTransferJobTest.java @@ -33,7 +33,7 @@ class DataTransferJobTest { private EntitySyncWebClient entitySyncWebClient; @Mock - private DataVerificationJob dataVerificationJob; + private DataPublicationJob dataPublicationJob; @Mock private AuditRecordService auditRecordService; @@ -88,7 +88,7 @@ void itShouldRequestEntitiesToExternalAccessNodeFromMultipleIssuers() throws IOE }); when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - when(dataVerificationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); + when(dataPublicationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); Mono result = dataTransferJob.syncDataFromList(processId, Mono.just(dataNegotiationResults)); @@ -147,7 +147,7 @@ void itShouldRequestEntitiesToExternalAccessNode() throws IOException, JSONExcep when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - when(dataVerificationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); + when(dataPublicationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); Mono result = dataTransferJob.syncData(processId, dataNegotiationResultMono); @@ -185,7 +185,7 @@ void itShouldPassOnlyValidEntitiesWhenHashIsIncorrect() throws IOException, JSON when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - when(dataVerificationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); + when(dataPublicationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); List expectedEntitiesById = List.of( new Id(MVEntity4DataNegotiationMother.sample2().id()), @@ -199,7 +199,7 @@ void itShouldPassOnlyValidEntitiesWhenHashIsIncorrect() throws IOException, JSON .create(result) .verifyComplete(); - verify(dataVerificationJob, times(1)).verifyData(any(), any(), entitiesByIdCaptor.capture(), any(), any()); + verify(dataPublicationJob, times(1)).verifyData(any(), any(), entitiesByIdCaptor.capture(), any(), any()); Mono> entitiesByIdCaptured = entitiesByIdCaptor.getValue(); @@ -226,7 +226,7 @@ void itShouldBuildAllMVEntities4DataNegotiation() throws IOException, JSONExcept when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - when(dataVerificationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); + when(dataPublicationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); Mono result = dataTransferJob.syncData(processId, dataNegotiationResultMono); @@ -234,8 +234,8 @@ void itShouldBuildAllMVEntities4DataNegotiation() throws IOException, JSONExcept create(result) .verifyComplete(); - verify(dataVerificationJob, times(1)).verifyData(eq(processId), any(), any(), mvEntities4DataNegotiationCaptor.capture(), any()); - verifyNoMoreInteractions(dataVerificationJob); + verify(dataPublicationJob, times(1)).verifyData(eq(processId), any(), any(), mvEntities4DataNegotiationCaptor.capture(), any()); + verifyNoMoreInteractions(dataPublicationJob); Mono> monoDataVerificationJobCaptured = mvEntities4DataNegotiationCaptor.getValue(); @@ -258,7 +258,7 @@ void itShouldNotDoDataTransferIfDataNegotiationResultIsEmpty() { .verifyComplete(); verifyNoInteractions(entitySyncWebClient); - verifyNoInteractions(dataVerificationJob); + verifyNoInteractions(dataPublicationJob); verifyNoInteractions(objectMapper); } @@ -272,7 +272,7 @@ void itShouldCreateReceivedAuditRecord() throws JSONException, NoSuchAlgorithmEx when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - when(dataVerificationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); + when(dataPublicationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); DataNegotiationResult dataNegotiationResult = DataNegotiationResultMother.sample(); @@ -307,7 +307,7 @@ void itShouldFilterEntitiesWithBadHash() throws JSONException, NoSuchAlgorithmEx when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - when(dataVerificationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); + when(dataPublicationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); DataNegotiationResult dataNegotiationResult = DataNegotiationResultMother.sampleBadHash2(); @@ -319,7 +319,7 @@ void itShouldFilterEntitiesWithBadHash() throws JSONException, NoSuchAlgorithmEx dataTransferJob.syncData(processId, Mono.just(dataNegotiationResult)).block(); - verify(dataVerificationJob, times(1)) + verify(dataPublicationJob, times(1)) .verifyData( any(), any(), @@ -356,7 +356,7 @@ void itShouldAddSubEntities() throws JSONException, NoSuchAlgorithmException, IO when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - when(dataVerificationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); + when(dataPublicationJob.verifyData(eq(processId), any(), any(), any(), any())).thenReturn(Mono.empty()); DataNegotiationResult dataNegotiationResult = DataNegotiationResultMother.sample(); @@ -370,7 +370,7 @@ void itShouldAddSubEntities() throws JSONException, NoSuchAlgorithmException, IO dataTransferJob.syncData(processId, Mono.just(dataNegotiationResult)).block(); - verify(dataVerificationJob, times(1)) + verify(dataPublicationJob, times(1)) .verifyData( any(), any(), diff --git a/src/test/java/es/in2/desmos/application/workflows/jobs/DataVerificationJobTest.java b/src/test/java/es/in2/desmos/application/workflows/jobs/DataVerificationJobTest.java deleted file mode 100644 index 7be8928d..00000000 --- a/src/test/java/es/in2/desmos/application/workflows/jobs/DataVerificationJobTest.java +++ /dev/null @@ -1,306 +0,0 @@ -package es.in2.desmos.application.workflows.jobs; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import es.in2.desmos.application.workflows.jobs.impl.DataVerificationJobImpl; -import es.in2.desmos.domain.exceptions.InvalidConsistencyException; -import es.in2.desmos.domain.models.*; -import es.in2.desmos.domain.services.api.AuditRecordService; -import es.in2.desmos.domain.services.broker.BrokerPublisherService; -import es.in2.desmos.objectmothers.DataNegotiationResultMother; -import es.in2.desmos.objectmothers.EntityMother; -import es.in2.desmos.objectmothers.MVAuditServiceEntity4DataNegotiationMother; -import es.in2.desmos.objectmothers.MVEntity4DataNegotiationMother; -import org.json.JSONException; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.*; -import org.mockito.junit.jupiter.MockitoExtension; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.*; - -@ExtendWith(MockitoExtension.class) -class DataVerificationJobTest { - @SuppressWarnings("CanBeFinal") - @Spy - private static ObjectMapper objectMapper = new ObjectMapper(); - @InjectMocks - private DataVerificationJobImpl dataVerificationJob; - @Mock - private AuditRecordService auditRecordService; - @Mock - private BrokerPublisherService brokerPublisherService; - - @Captor - private ArgumentCaptor mvAuditServiceEntity4DataNegotiationArgumentCaptor; - - - @Test - void itShouldBuildAnSaveAuditRecord() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { - DataNegotiationResult dataNegotiationResult = DataNegotiationResultMother.newToSync4AndExistingToSync2(); - - String processId = "0"; - - when(auditRecordService.findLatestConsumerPublishedAuditRecordByEntityId(processId, MVEntity4DataNegotiationMother.sample2().id())).thenReturn(Mono.just(AuditRecord.builder().entityHashLink(MVEntity4DataNegotiationMother.sample2VersionOld().hashlink()).build())); - - when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - - when(brokerPublisherService.postEntity(any(), any())).thenReturn(Mono.empty()); - - Mono issuer = Mono.just("http://example.org"); - - Map entitiesById = new HashMap<>(); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new Entity(EntityMother.PRODUCT_OFFERING_2)); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new Entity(EntityMother.PRODUCT_OFFERING_4)); - - List allMVEntity4DataNegotiation = new ArrayList<>(); - allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample2()); - allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample4()); - - Map existingEntitiesOriginalValidationDataById = new HashMap<>(); - existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample2().hash(), MVEntity4DataNegotiationMother.sample2().hashlink())); - - Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); - - StepVerifier. - create(result) - .verifyComplete(); - - verify(auditRecordService, times(2)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq(dataNegotiationResult.issuer()), any(), eq(AuditRecordStatus.RETRIEVED)); - verify(auditRecordService, times(2)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq(dataNegotiationResult.issuer()), any(), eq(AuditRecordStatus.PUBLISHED)); - } - - @Test - void itShouldBuildAnSaveAuditRecordForSubEntity() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { - - String processId = "0"; - - when(auditRecordService.findLatestConsumerPublishedAuditRecordByEntityId(processId, MVEntity4DataNegotiationMother.sample2().id())).thenReturn(Mono.just(AuditRecord.builder().entityId(MVEntity4DataNegotiationMother.sample2().id()).entityHashLink(MVEntity4DataNegotiationMother.sample2VersionOld().hashlink()).build())); - - when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - - when(brokerPublisherService.postEntity(any(), any())).thenReturn(Mono.empty()); - - Mono issuer = Mono.just("http://example.org"); - - Map entitiesById = new HashMap<>(); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new Entity(EntityMother.PRODUCT_OFFERING_2)); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new Entity(EntityMother.PRODUCT_OFFERING_4)); - - List allMVEntity4DataNegotiation = new ArrayList<>(); - allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample4()); - - Map existingEntitiesOriginalValidationDataById = new HashMap<>(); - existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample2().hash(), MVEntity4DataNegotiationMother.sample2().hashlink())); - - MVAuditServiceEntity4DataNegotiation expectedMVEntity4DataNegotiationSample2 = - new MVAuditServiceEntity4DataNegotiation( - MVEntity4DataNegotiationMother.sample2().id(), - MVEntity4DataNegotiationMother.sample2().type(), - MVEntity4DataNegotiationMother.sample2().hash(), - MVEntity4DataNegotiationMother.sample2VersionOld().hashlink()); - - Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); - - StepVerifier. - create(result) - .verifyComplete(); - - verify(auditRecordService, times(2)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq("http://example.org"), mvAuditServiceEntity4DataNegotiationArgumentCaptor.capture(), eq(AuditRecordStatus.RETRIEVED)); - verify(auditRecordService, times(2)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq("http://example.org"), any(), eq(AuditRecordStatus.PUBLISHED)); - - var mvEntity4DataNegotiationSentToAuditRecord = mvAuditServiceEntity4DataNegotiationArgumentCaptor.getAllValues(); - - assertThat(mvEntity4DataNegotiationSentToAuditRecord.get(0)).isEqualTo(expectedMVEntity4DataNegotiationSample2); - assertThat(mvEntity4DataNegotiationSentToAuditRecord.get(1)).isEqualTo(MVAuditServiceEntity4DataNegotiationMother.sample4()); - } - - @Test - void itShouldBuildAnSaveAuditRecordForSubEntityWhenNotExistsInAuditRecordDB() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { - - String processId = "0"; - - when(auditRecordService.findLatestConsumerPublishedAuditRecordByEntityId(processId, MVEntity4DataNegotiationMother.sample2().id())) - .thenReturn(Mono.just(AuditRecord.builder().entityId(MVEntity4DataNegotiationMother.sample2().id()).entityHashLink(MVEntity4DataNegotiationMother.sample2VersionOld().hashlink()).build())) - .thenReturn(Mono.just(AuditRecord.builder().entityId(MVEntity4DataNegotiationMother.sample2().id()).entityId("").entityHashLink(MVEntity4DataNegotiationMother.sample2VersionOld().hashlink()).build())); - - when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - - when(brokerPublisherService.postEntity(any(), any())).thenReturn(Mono.empty()); - - Mono issuer = Mono.just("http://example.org"); - - Map entitiesById = new HashMap<>(); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new Entity(EntityMother.PRODUCT_OFFERING_2)); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new Entity(EntityMother.PRODUCT_OFFERING_4)); - - List allMVEntity4DataNegotiation = new ArrayList<>(); - allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample4()); - - Map existingEntitiesOriginalValidationDataById = new HashMap<>(); - existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample2().hash(), MVEntity4DataNegotiationMother.sample2().hashlink())); - - MVAuditServiceEntity4DataNegotiation expectedMVAuditServiceEntity4DataNegotiation2 = - new MVAuditServiceEntity4DataNegotiation( - MVEntity4DataNegotiationMother.sample2().id(), - MVEntity4DataNegotiationMother.sample2().type(), - MVEntity4DataNegotiationMother.sample2().hash(), - MVEntity4DataNegotiationMother.sample2().hash()); - - Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); - - StepVerifier. - create(result) - .verifyComplete(); - - verify(auditRecordService, times(2)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq("http://example.org"), mvAuditServiceEntity4DataNegotiationArgumentCaptor.capture(), eq(AuditRecordStatus.RETRIEVED)); - verify(auditRecordService, times(2)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq("http://example.org"), any(), eq(AuditRecordStatus.PUBLISHED)); - - var mvEntity4DataNegotiationSentToAuditRecord = mvAuditServiceEntity4DataNegotiationArgumentCaptor.getAllValues(); - - assertThat(mvEntity4DataNegotiationSentToAuditRecord.get(0)).isEqualTo(expectedMVAuditServiceEntity4DataNegotiation2); - assertThat(mvEntity4DataNegotiationSentToAuditRecord.get(1)).isEqualTo(MVAuditServiceEntity4DataNegotiationMother.sample4()); - } - - @Test - void itShouldBuildAnSaveAuditRecordForSubEntityWhenNotExistsInAuditRecordDBWithNullLifecyclestatus() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { - String processId = "0"; - - when(auditRecordService.findLatestConsumerPublishedAuditRecordByEntityId(processId, MVEntity4DataNegotiationMother.sample1NullLifecyclestatus().id())) - .thenReturn(Mono.just(AuditRecord.builder().entityId(MVEntity4DataNegotiationMother.sample1NullLifecyclestatus().id()).entityHashLink(MVEntity4DataNegotiationMother.sample1NullLifecyclestatus().hashlink()).build())); - - when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - - when(brokerPublisherService.postEntity(any(), any())).thenReturn(Mono.empty()); - - Mono issuer = Mono.just("http://example.org"); - - Map entitiesById = new HashMap<>(); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample1NullLifecyclestatus().id()), new Entity(EntityMother.PRODUCT_OFFERING_1_NULL_LIFECYCLESTATUS)); - - List allMVEntity4DataNegotiation = new ArrayList<>(); - - Map existingEntitiesOriginalValidationDataById = new HashMap<>(); - - MVAuditServiceEntity4DataNegotiation expectedMVAuditServiceEntity4DataNegotiationSample1 = MVAuditServiceEntity4DataNegotiationMother.sample1NullLifecyclestatus(); - - Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); - - StepVerifier. - create(result) - .verifyComplete(); - - verify(auditRecordService, times(1)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq("http://example.org"), mvAuditServiceEntity4DataNegotiationArgumentCaptor.capture(), eq(AuditRecordStatus.RETRIEVED)); - verify(auditRecordService, times(1)).buildAndSaveAuditRecordFromDataSync(eq(processId), eq("http://example.org"), any(), eq(AuditRecordStatus.PUBLISHED)); - - var mvEntity4DataNegotiationSentToAuditRecord = mvAuditServiceEntity4DataNegotiationArgumentCaptor.getAllValues(); - - assertThat(mvEntity4DataNegotiationSentToAuditRecord.get(0)).isEqualTo(expectedMVAuditServiceEntity4DataNegotiationSample1); - } - - @Test - void itShouldReturnInvalidJsonProcessingExceptionWhenEntityIsInvalid() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { - - String processId = "0"; - - when(auditRecordService.findLatestConsumerPublishedAuditRecordByEntityId(processId, MVEntity4DataNegotiationMother.sample2().id())).thenReturn(Mono.just(AuditRecord.builder().entityId(MVEntity4DataNegotiationMother.sample2().id()).entityHashLink(MVEntity4DataNegotiationMother.sample2VersionOld().hashlink()).build())); - - when(objectMapper.readTree(anyString())).thenThrow(JsonProcessingException.class); - - Mono issuer = Mono.just("http://example.org"); - - Map entitiesById = new HashMap<>(); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new Entity(EntityMother.PRODUCT_OFFERING_2)); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new Entity(EntityMother.PRODUCT_OFFERING_4)); - - List allMVEntity4DataNegotiation = new ArrayList<>(); - allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample4()); - - Map existingEntitiesOriginalValidationDataById = new HashMap<>(); - existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample2().hash(), MVEntity4DataNegotiationMother.sample2().hashlink())); - - Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); - - StepVerifier - .create(result) - .expectErrorMatches(throwable -> throwable instanceof JsonProcessingException) - .verify(); - } - - @Test - void itShouldReturnInvalidConsistencyException() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { - - String processId = "0"; - - when(auditRecordService.findLatestConsumerPublishedAuditRecordByEntityId(processId, MVEntity4DataNegotiationMother.sample2().id())).thenReturn(Mono.just(AuditRecord.builder().entityHashLink("fa54fdsafdsadsfdsa").build())); - - Mono issuer = Mono.just("http://example.org"); - - Map entitiesById = new HashMap<>(); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new Entity(EntityMother.getJson2())); - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new Entity(EntityMother.getJson4())); - - List allMVEntity4DataNegotiation = new ArrayList<>(); - allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample2()); - allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample4()); - - Map existingEntitiesOriginalValidationDataById = new HashMap<>(); - existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample2().hash(), MVEntity4DataNegotiationMother.sample2().hashlink())); - existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample4().hash(), MVEntity4DataNegotiationMother.sample4().hashlink())); - - Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); - - StepVerifier. - create(result) - .expectErrorMatches(throwable -> throwable instanceof InvalidConsistencyException && - throwable.getMessage().equals("The hashlink received does not correspond to that of the entity.") - ) - .verify(); - } - - @Test - void itShouldUpsertEntities() throws JsonProcessingException, JSONException, NoSuchAlgorithmException { - String processId = "0"; - - when(auditRecordService.findLatestConsumerPublishedAuditRecordByEntityId(processId, MVEntity4DataNegotiationMother.sample2().id())).thenReturn(Mono.just(AuditRecord.builder().entityHashLink(MVEntity4DataNegotiationMother.sample2VersionOld().hashlink()).build())); - - when(auditRecordService.buildAndSaveAuditRecordFromDataSync(any(), any(), any(), any())).thenReturn(Mono.empty()); - - when(brokerPublisherService.postEntity(any(), any())).thenReturn(Mono.empty()); - - Mono issuer = Mono.just("http://example.org"); - - Map entitiesById = new HashMap<>(); - String productOffering2 = EntityMother.PRODUCT_OFFERING_2; - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new Entity(productOffering2)); - String productOffering4 = EntityMother.PRODUCT_OFFERING_4; - entitiesById.put(new Id(MVEntity4DataNegotiationMother.sample4().id()), new Entity(productOffering4)); - - List allMVEntity4DataNegotiation = new ArrayList<>(); - allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample2()); - allMVEntity4DataNegotiation.add(MVEntity4DataNegotiationMother.sample4()); - - Map existingEntitiesOriginalValidationDataById = new HashMap<>(); - existingEntitiesOriginalValidationDataById.put(new Id(MVEntity4DataNegotiationMother.sample2().id()), new HashAndHashLink(MVEntity4DataNegotiationMother.sample2().hash(), MVEntity4DataNegotiationMother.sample2().hashlink())); - - Mono result = dataVerificationJob.verifyData(processId, issuer, Mono.just(entitiesById), Mono.just(allMVEntity4DataNegotiation), Mono.just(existingEntitiesOriginalValidationDataById)); - - - StepVerifier. - create(result) - .verifyComplete(); - - verify(brokerPublisherService, times(1)).postEntity(processId, productOffering2); - verify(brokerPublisherService, times(1)).postEntity(processId, productOffering4); - verifyNoMoreInteractions(brokerPublisherService); - } -} \ No newline at end of file