Skip to content

Commit

Permalink
fix/replication (#72)
Browse files Browse the repository at this point in the history
* feat(infrastructure): exit application if cannot create broker subscription or blockchain subscription in the application runner

* feat(infrastructure): exit application if cannot get access node public keys from external yaml

* feat(infrastructure): exit application if cannot synchronize in startup

* fix(service): remove consistency validation in p2p

* build(gradle): change version
  • Loading branch information
albertrodriguezin2 authored Feb 11, 2025
1 parent 68aae65 commit fb872dd
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 536 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Retry mechanism for Blockchain Adapter and Context Broker.
- Recover after a failure mechanism for Blockchain Adapter and Context Broker.

## [Released]: v1.0.7
- Fix entity updates to Scorpio during the synchronization process.
- Terminate the application if startup fails to complete successfully.
- Remove hash link validation in P2P synchronization.
- Add a warning log when an entity fails to synchronize in P2P due to an invalid integrity check.

## [Released]: v1.0.6
- Fix slow synchronization

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ plugins {
}

group = 'es.in2'
version = '1.0.6'
version = '1.0.7'

java {
sourceCompatibility = '17'
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/es/in2/desmos/DesmosApiApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import lombok.Getter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;

Expand All @@ -18,8 +20,11 @@ public class DesmosApiApplication {
.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true)
.build();

@Getter
private static ConfigurableApplicationContext context;

public static void main(String[] args) {
SpringApplication.run(DesmosApiApplication.class, args);
context = SpringApplication.run(DesmosApiApplication.class, args);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package es.in2.desmos.application.runners;

import es.in2.desmos.DesmosApiApplication;
import es.in2.desmos.application.workflows.DataSyncWorkflow;
import es.in2.desmos.application.workflows.PublishWorkflow;
import es.in2.desmos.application.workflows.SubscribeWorkflow;
Expand All @@ -14,6 +15,7 @@
import es.in2.desmos.infrastructure.configs.BrokerConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
Expand All @@ -23,6 +25,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -57,9 +60,13 @@ public Mono<Void> onApplicationReady() {
String processId = UUID.randomUUID().toString();
log.info("ProcessID: {} - Setting initial configurations...", processId);
return setBrokerSubscription(processId)
.then(setBlockchainSubscription(processId))
.then(setAccessNodePublicKeysFromExternalYaml(processId))
.thenMany(initializeDataSync(processId))
.doOnError(error -> finishApplication("Broker Subscription", error))
.then(setBlockchainSubscription(processId)
.doOnError(error -> finishApplication("Blockchain Subscription", error)))
.then(setAccessNodePublicKeysFromExternalYaml(processId)
.doOnError(error -> finishApplication("Access Node Public Keys Getting", error)))
.thenMany(initializeDataSync(processId)
.doOnError(error -> finishApplication("Initialize Data Sync", error)))
.then();
}

Expand Down Expand Up @@ -182,4 +189,14 @@ private void startBrokerEventProcessing(String processId) {
() -> log.info("ProcessID: {} - Subscribe Workflow completed", processId)
);
}

private void finishApplication(String step, Throwable error) {
log.error("Error in {}: {}", step, error.getMessage(), error);

Mono.fromRunnable(() -> {
int exitCode = SpringApplication.exit(DesmosApiApplication.getContext(), () -> 0);
log.info("Application exiting with code {}", exitCode);
System.exit(exitCode);
}).subscribeOn(Schedulers.boundedElastic()).subscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
import java.util.List;
import java.util.Map;

public interface DataVerificationJob {
public interface DataPublicationJob {
Mono<Void> verifyData(String processId, Mono<String> issuer, Mono<Map<Id, Entity>> entitiesByIdMono, Mono<List<MVEntity4DataNegotiation>> allMVEntity4DataNegotiation, Mono<Map<Id, HashAndHashLink>> existingEntitiesOriginalValidationDataById);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package es.in2.desmos.application.workflows.jobs.impl;

import es.in2.desmos.application.workflows.jobs.DataPublicationJob;
import es.in2.desmos.domain.models.*;
import es.in2.desmos.domain.services.api.AuditRecordService;
import es.in2.desmos.domain.services.broker.BrokerPublisherService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;

@Slf4j
@Service
@RequiredArgsConstructor
public class DataPublicationJobImpl implements DataPublicationJob {
private final AuditRecordService auditRecordService;
private final BrokerPublisherService brokerPublisherService;

public Mono<Void> verifyData(String processId, Mono<String> issuer, Mono<Map<Id, Entity>> entitiesByIdMono, Mono<List<MVEntity4DataNegotiation>> allMVEntity4DataNegotiation, Mono<Map<Id, HashAndHashLink>> existingEntitiesOriginalValidationDataById) {
log.info("ProcessID: {} - Starting Data Verification Job", processId);

return buildAndSaveAuditRecordFromDataSync(processId, issuer, entitiesByIdMono, allMVEntity4DataNegotiation, AuditRecordStatus.RETRIEVED)
.then(createEntitiesToContextBroker(processId, entitiesByIdMono))
.then(buildAndSaveAuditRecordFromDataSync(processId, issuer, entitiesByIdMono, allMVEntity4DataNegotiation, AuditRecordStatus.PUBLISHED));
}

private Mono<Void> buildAndSaveAuditRecordFromDataSync(String processId, Mono<String> issuerMono, Mono<Map<Id, Entity>> rcvdEntitiesByIdMono, Mono<List<MVEntity4DataNegotiation>> mvEntity4DataNegotiationListMono, AuditRecordStatus auditRecordStatus) {
return Mono.zip(rcvdEntitiesByIdMono, mvEntity4DataNegotiationListMono)
.flatMapMany(tuple -> {
Map<Id, Entity> rcvdEntitiesById = tuple.getT1();
List<MVEntity4DataNegotiation> mvEntity4DataNegotiationList = tuple.getT2();

return Flux.fromIterable(mvEntity4DataNegotiationList)
.filter(entity4DataNegotiation -> rcvdEntitiesById.containsKey(new Id(entity4DataNegotiation.id())))
.concatMap(entity4DataNegotiation -> issuerMono
.flatMap(issuer -> {
MVAuditServiceEntity4DataNegotiation mvAuditServiceEntity4DataNegotiation = new MVAuditServiceEntity4DataNegotiation(entity4DataNegotiation.id(), entity4DataNegotiation.type(), entity4DataNegotiation.hash(), entity4DataNegotiation.hashlink());
return auditRecordService.buildAndSaveAuditRecordFromDataSync(processId, issuer, mvAuditServiceEntity4DataNegotiation, auditRecordStatus);
}));
})
.collectList()
.then();
}

private Mono<Void> createEntitiesToContextBroker(String processId, Mono<Map<Id, Entity>> entitiesByIdMono) {
return entitiesByIdMono
.flatMapIterable(Map::entrySet)
.flatMap(x -> brokerPublisherService.publishDataToBroker(processId, x.getKey().id(), x.getValue().value()))
.collectList()
.then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonParser;
import es.in2.desmos.application.workflows.jobs.DataPublicationJob;
import es.in2.desmos.application.workflows.jobs.DataTransferJob;
import es.in2.desmos.application.workflows.jobs.DataVerificationJob;
import es.in2.desmos.domain.exceptions.InvalidSyncResponseException;
import es.in2.desmos.domain.models.*;
import es.in2.desmos.domain.services.api.AuditRecordService;
Expand All @@ -16,7 +16,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.security.NoSuchAlgorithmException;
Expand All @@ -32,7 +31,7 @@
public class DataTransferJobImpl implements DataTransferJob {
public static final String INVALID_ENTITY_SYNC_RESPONSE = "Invalid EntitySync response.";
private final EntitySyncWebClient entitySyncWebClient;
private final DataVerificationJob dataVerificationJob;
private final DataPublicationJob dataPublicationJob;
private final ObjectMapper objectMapper;
private final AuditRecordService auditRecordService;

Expand Down Expand Up @@ -88,14 +87,14 @@ public Mono<Void> syncData(String processId, Mono<DataNegotiationResult> dataNeg
Mono.just(result.existingEntitiesToSync())
);

return createReceivedAuditRecords(processId, issuer, decodedEntitySyncResponseMono)
.then(getInvalidIntegrityEntitiesIds(entitiesByIdMono, entitiesHashAndHashlinkById))
return createReceivedAuditRecords(processId, issuer, mvEntities4DataNegotiation)
.then(getInvalidIntegrityEntitiesIds(processId, entitiesByIdMono, entitiesHashAndHashlinkById))
.flatMap(invalidIntegrityEntitiesIds -> {
Mono<List<Id>> invalidIntegrityEntitiesIdsMono = Mono.just(invalidIntegrityEntitiesIds);
return filterEntitiesById(entitiesByIdMono, invalidIntegrityEntitiesIdsMono)
.flatMap(filteredEntitiesById -> {
Mono<Map<Id, Entity>> filteredEntitiesByIdMono = Mono.just(filteredEntitiesById);
return dataVerificationJob.verifyData(processId, issuer, filteredEntitiesByIdMono, mvEntities4DataNegotiation, existingEntitiesHashAndHashLinkById);
return dataPublicationJob.verifyData(processId, issuer, filteredEntitiesByIdMono, mvEntities4DataNegotiation, existingEntitiesHashAndHashLinkById);

});
});
Expand All @@ -108,45 +107,26 @@ public Mono<Void> syncData(String processId, Mono<DataNegotiationResult> dataNeg
});
}

private Mono<Void> createReceivedAuditRecords(String processId, Mono<String> issuerMono, Mono<String> entitiesArrayJsonMono) {
return entitiesArrayJsonMono
.flatMap(entitiesArrayJson -> {
try {
JsonNode entitiesJsonNode = objectMapper.readTree(entitiesArrayJson);

if (!entitiesJsonNode.isArray()) {
return Mono.error(new InvalidSyncResponseException(INVALID_ENTITY_SYNC_RESPONSE));
}

return Flux.fromIterable(entitiesJsonNode)
.concatMap(entityNode -> {
if (!entityNode.has("id") || !entityNode.has("type")) {
return Mono.error(new InvalidSyncResponseException(INVALID_ENTITY_SYNC_RESPONSE));
}

String entityId = entityNode.get("id").asText();
String entityType = entityNode.get("type").asText();

var mvAuditService = new MVAuditServiceEntity4DataNegotiation(
entityId,
entityType,
"",
"");

return issuerMono.flatMap(issuer ->
auditRecordService
.buildAndSaveAuditRecordFromDataSync(
processId,
issuer,
mvAuditService,
AuditRecordStatus.RECEIVED));
})
.collectList()
.then();
} catch (JsonProcessingException e) {
return Mono.error(e);
}
});
private Mono<Void> createReceivedAuditRecords(String processId, Mono<String> issuerMono, Mono<List<MVEntity4DataNegotiation>> mvEntities4DataNegotiation) {
return mvEntities4DataNegotiation
.flatMapIterable(x -> x)
.concatMap(x -> {
var mvAuditService = new MVAuditServiceEntity4DataNegotiation(
x.id(),
x.type(),
"",
"");

return issuerMono.flatMap(issuer ->
auditRecordService
.buildAndSaveAuditRecordFromDataSync(
processId,
issuer,
mvAuditService,
AuditRecordStatus.RECEIVED));
})
.collectList()
.then();
}

private Mono<String> decodeEntitySyncResponse(Mono<List<String>> entitySyncResponseMono) {
Expand Down Expand Up @@ -222,7 +202,7 @@ private Mono<Map<Id, HashAndHashLink>> getEntitiesHashAndHashLinkById(Mono<List<
});
}

private Mono<List<Id>> getInvalidIntegrityEntitiesIds(Mono<Map<Id, Entity>> entitiesByIdMono, Mono<Map<Id, HashAndHashLink>> allEntitiesExistingValidationDataById) {
private Mono<List<Id>> getInvalidIntegrityEntitiesIds(String processId, Mono<Map<Id, Entity>> entitiesByIdMono, Mono<Map<Id, HashAndHashLink>> allEntitiesExistingValidationDataById) {
return allEntitiesExistingValidationDataById
.flatMapIterable(Map::entrySet)
.flatMap(entry -> {
Expand All @@ -236,6 +216,9 @@ private Mono<List<Id>> getInvalidIntegrityEntitiesIds(Mono<Map<Id, Entity>> enti
if (calculatedHash.equals(hashValue)) {
return Mono.empty();
} else {
log.warn("ProcessID: {} - P2P replication attempt failed for entity '{}' " +
"due to hash mismatch: received hash does not match the " +
"calculated hash of the entity", processId, id);
log.debug("Expected hash: {}\nCurrent hash: {}", hashValue, calculatedHash);
return Mono.just(id);
}
Expand Down
Loading

0 comments on commit fb872dd

Please sign in to comment.