Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nod 883 async riversamento #15

Merged
merged 6 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
384 changes: 181 additions & 203 deletions openapi/openapi.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ public ResponseEntity<List<SyncStatusResponse>> standin() {
produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<List<SyncStatusResponse>> cache() {
log.debug("[NODE-CFG-SYNC] Force {} configuration to update", TargetRefreshEnum.cache.label);

Map<String, SyncStatusEnum> syncStatusEnumMap = apiConfigCacheService.syncCache();
apiConfigCacheService.syncRiversamento();

List<SyncStatusResponse> syncStatusResponseList = syncStatusEnumMap.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
public enum TargetRefreshEnum {

cache("api-config-cache"),
standin("stand-in-manager");
standin("stand-in-manager"),
riversamento("riversamento");

public final String label;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import it.gov.pagopa.node.cfgsync.model.SyncStatusEnum;
import it.gov.pagopa.node.cfgsync.model.TargetRefreshEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -16,7 +15,6 @@
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Map;

@Slf4j
@Service
Expand Down Expand Up @@ -54,11 +52,20 @@ public void processEvent(EventContext eventContext) {
TargetRefreshEnum.cache.label,
eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
eventContext.getEventData().getBodyAsString());

try {
apiConfigCacheService.syncCache();
} catch (Exception ex) {
log.error("[{}][ALERT] Generic Error on consumer: {}", TargetRefreshEnum.cache.label, ex.getMessage(), ex);
}

try {
apiConfigCacheService.syncRiversamento();
} catch (Exception ex) {
log.error("[{}][ALERT] Generic Error on consumer: {}", TargetRefreshEnum.riversamento.label, ex.getMessage(), ex);
}


}

public void processError(ErrorContext errorContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ public Map<String, SyncStatusEnum> syncCache() {
saveNexiPostgres(syncStatusMap, configCache);
saveNexiOracle(syncStatusMap, configCache);

if(riversamentoEnabled) {
riversamentoElencoServizi();
riversamentoCdiPreferences();
}

return composeSyncStatusMapResult(TargetRefreshEnum.cache.label, syncStatusMap);
} catch (FeignException fEx) {
log.error("[{}] error: {}", TargetRefreshEnum.cache.label, fEx.getMessage(), fEx);
Expand All @@ -160,6 +155,15 @@ public Map<String, SyncStatusEnum> syncCache() {
}
}

@Transactional
public void syncRiversamento() {
if(riversamentoEnabled) {
log.info("riversamento elenco servizi e cdi preferences abilitato");
riversamentoElencoServizi();
riversamentoCdiPreferences();
}
}

private void savePagoPA(Map<String, SyncStatusEnum> syncStatusMap, ConfigCache configCache) {
try {
if(apiConfigCacheWritePagoPa) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class CacheSyncNexiOracleTest {
}

@Test
void nexioracle() {
void nexioracle() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoSource", "nexi-oracle");
ReflectionTestUtils.setField(cacheManagerService, "riversamentoTarget", "nexi-oracle");
Expand Down Expand Up @@ -124,6 +124,7 @@ void nexioracle() {
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(5000);
List<CDIPreferences> all = nexiCdiPreferencesOracleRepository.findAll();
assertThat(all.size()).isEqualTo(size);
assertThat(all.get(0).getSeller()).isEqualTo(arrayList.get(0).getSeller());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CacheSyncNexiPostgresTest {
}

@Test
void nexipostgres() {
void nexipostgres() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoSource", "nexi-postgres");
ReflectionTestUtils.setField(cacheManagerService, "riversamentoTarget", "nexi-postgres");
Expand Down Expand Up @@ -117,8 +117,154 @@ void nexipostgres() {
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(5000);
List<CDIPreferences> all = nexiCdiPreferencesPostgresRepository.findAll();
assertThat(all.size()).isEqualTo(size);

}

@Test
void nexipostgres2() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoEnabled", false);

long size = Math.round(Math.random()*500);
ArrayList<CDIPreferencesView> arrayList = new ArrayList();
for(long i = 0;i<size;i++){
arrayList.add(new CDIPreferencesView(new Long(i),"","", BigDecimal.ZERO,new Long(i)));
}
long originalcount = nexiCdiPreferencesPostgresRepository.count();
nexiCdiPreferencesViewPostgresRepository.deleteAll();
nexiCdiPreferencesViewPostgresRepository.saveAll(arrayList);

Map<String, Collection<String>> headersCustom =
Map.of(
HEADER_CACHE_ID, List.of(String.valueOf(System.currentTimeMillis())),
HEADER_CACHE_TIMESTAMP, List.of(Instant.now().toString()),
HEADER_CACHE_VERSION, List.of(StringUtils.repeat("*", 50))
);
when(apiConfigCacheClient.getCache(anyString())).thenReturn(Response
.builder()
.status(200)
.reason("Mocked")
.headers(headersCustom)
.request(mock(Request.class))
.body(new byte[0])
.build());
cacheManagerService.setApiConfigCacheClient(apiConfigCacheClient);

ResponseEntity<List<SyncStatusResponse>> response = restTemplate.exchange(CACHE_URL, HttpMethod.PUT, null, new ParameterizedTypeReference<>() {});

assertThat(response.getBody()).isNotNull();
assertFalse(response.getHeaders().isEmpty());
assertFalse(response.getBody().isEmpty());
assertEquals(3, response.getBody().size());
assertThat(response.getBody().get(0).getServiceIdentifier()).isEqualTo(PAGOPAPOSTGRES_SI);
assertThat(response.getBody().get(0).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(1).getServiceIdentifier()).isEqualTo(NEXIPOSTGRES_SI);
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(5000);
Long afterCount = nexiCdiPreferencesPostgresRepository.count();
assertThat(afterCount).isEqualTo(originalcount);

}

@Test
void nexipostgres3() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoSource", "exception");
ReflectionTestUtils.setField(cacheManagerService, "riversamentoTarget", "nexi-postgres");

long size = Math.round(Math.random()*500);
ArrayList<CDIPreferencesView> arrayList = new ArrayList();
for(long i = 0;i<size;i++){
arrayList.add(new CDIPreferencesView(new Long(i),"","", BigDecimal.ZERO,new Long(i)));
}
long originalcount = nexiCdiPreferencesPostgresRepository.count();
nexiCdiPreferencesViewPostgresRepository.saveAll(arrayList);

Map<String, Collection<String>> headersCustom =
Map.of(
HEADER_CACHE_ID, List.of(String.valueOf(System.currentTimeMillis())),
HEADER_CACHE_TIMESTAMP, List.of(Instant.now().toString()),
HEADER_CACHE_VERSION, List.of(StringUtils.repeat("*", 50))
);
when(apiConfigCacheClient.getCache(anyString())).thenReturn(Response
.builder()
.status(200)
.reason("Mocked")
.headers(headersCustom)
.request(mock(Request.class))
.body(new byte[0])
.build());
cacheManagerService.setApiConfigCacheClient(apiConfigCacheClient);

ResponseEntity<List<SyncStatusResponse>> response = restTemplate.exchange(CACHE_URL, HttpMethod.PUT, null, new ParameterizedTypeReference<>() {});

assertThat(response.getBody()).isNotNull();
assertFalse(response.getHeaders().isEmpty());
assertFalse(response.getBody().isEmpty());
assertEquals(3, response.getBody().size());
assertThat(response.getBody().get(0).getServiceIdentifier()).isEqualTo(PAGOPAPOSTGRES_SI);
assertThat(response.getBody().get(0).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(1).getServiceIdentifier()).isEqualTo(NEXIPOSTGRES_SI);
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(1000);
long all = nexiCdiPreferencesPostgresRepository.count();
assertThat(all).isEqualTo(originalcount);

}

@Test
void nexipostgres4() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoSource", "nexi-postgres");
ReflectionTestUtils.setField(cacheManagerService, "riversamentoTarget", "exception");

long size = Math.round(Math.random()*500);
ArrayList<CDIPreferencesView> arrayList = new ArrayList();
for(long i = 0;i<size;i++){
arrayList.add(new CDIPreferencesView(new Long(i),"","", BigDecimal.ZERO,new Long(i)));
}
long originalcount = nexiCdiPreferencesPostgresRepository.count();
nexiCdiPreferencesViewPostgresRepository.saveAll(arrayList);

Map<String, Collection<String>> headersCustom =
Map.of(
HEADER_CACHE_ID, List.of(String.valueOf(System.currentTimeMillis())),
HEADER_CACHE_TIMESTAMP, List.of(Instant.now().toString()),
HEADER_CACHE_VERSION, List.of(StringUtils.repeat("*", 50))
);
when(apiConfigCacheClient.getCache(anyString())).thenReturn(Response
.builder()
.status(200)
.reason("Mocked")
.headers(headersCustom)
.request(mock(Request.class))
.body(new byte[0])
.build());
cacheManagerService.setApiConfigCacheClient(apiConfigCacheClient);

ResponseEntity<List<SyncStatusResponse>> response = restTemplate.exchange(CACHE_URL, HttpMethod.PUT, null, new ParameterizedTypeReference<>() {});

assertThat(response.getBody()).isNotNull();
assertFalse(response.getHeaders().isEmpty());
assertFalse(response.getBody().isEmpty());
assertEquals(3, response.getBody().size());
assertThat(response.getBody().get(0).getServiceIdentifier()).isEqualTo(PAGOPAPOSTGRES_SI);
assertThat(response.getBody().get(0).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(1).getServiceIdentifier()).isEqualTo(NEXIPOSTGRES_SI);
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(1000);
long all = nexiCdiPreferencesPostgresRepository.count();
assertThat(all).isEqualTo(originalcount);

}

}
3 changes: 2 additions & 1 deletion src/test/java/it/gov/pagopa/node/cfgsync/CacheSyncTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void error500ConnectionRefused() {
}

@Test
void trimCacheVersionOnDb() {
void trimCacheVersionOnDb() throws InterruptedException {

long size = Math.round(Math.random()*500);
ArrayList<CDIPreferencesView> arrayList = new ArrayList();
Expand Down Expand Up @@ -205,6 +205,7 @@ void trimCacheVersionOnDb() {
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(5000);
List<CDIPreferences> all = pagoPaCdiPreferencesPostgresRepository.findAll();
assertThat(all.size()).isEqualTo(size);
}
Expand Down
Loading