Skip to content

Commit 885a5ad

Browse files
authored
[SELC-4898] feat: Added logic to determine queue event type (#312)
1 parent 801c8fa commit 885a5ad

File tree

13 files changed

+169
-20
lines changed

13 files changed

+169
-20
lines changed

apps/onboarding-functions/src/main/java/it/pagopa/selfcare/onboarding/config/NotificationConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
@ConfigMapping(prefix = "notification")
99
public interface NotificationConfig {
1010
Map<String, Consumer> consumers();
11+
Integer minutesThresholdForUpdateNotification();
1112
interface Consumer {
1213
String topic();
1314
String name();

apps/onboarding-functions/src/main/java/it/pagopa/selfcare/onboarding/functions/NotificationFunctions.java

+15-14
Original file line numberDiff line numberDiff line change
@@ -44,28 +44,29 @@ public HttpResponseMessage sendNotification(
4444
context.getLogger().info("sendNotifications trigger processed a request");
4545

4646
final String queueEventString = request.getQueryParameters().get("queueEvent");
47-
final QueueEvent queueEvent = Objects.isNull(queueEventString) ? QueueEvent.ADD : QueueEvent.valueOf(queueEventString);
47+
final QueueEvent queueEvent = Objects.isNull(queueEventString) ? null : QueueEvent.valueOf(queueEventString);
4848

4949
// Check request body
5050
if (request.getBody().isEmpty()) {
5151
return request.createResponseBuilder(HttpStatus.BAD_REQUEST)
5252
.body("Request body cannot be empty.")
5353
.build();
54-
}
54+
} else {
55+
final Onboarding onboarding;
56+
final String onboardingString = request.getBody().get();
57+
try {
58+
onboarding = readOnboardingValue(objectMapper, onboardingString);
59+
context.getLogger().info(String.format(FORMAT_LOGGER_ONBOARDING_STRING, SEND_ONBOARDING_NOTIFICATION, onboardingString));
60+
} catch (Exception ex) {
61+
context.getLogger().warning(() -> "Error during sendNotifications execution, msg: " + ex.getMessage());
62+
return request.createResponseBuilder(HttpStatus.BAD_REQUEST)
63+
.body("Malformed object onboarding in input.")
64+
.build();
65+
}
5566

56-
final Onboarding onboarding;
57-
final String onboardingString = request.getBody().get();
58-
try {
59-
onboarding = readOnboardingValue(objectMapper, onboardingString);
60-
context.getLogger().info(String.format(FORMAT_LOGGER_ONBOARDING_STRING, SEND_ONBOARDING_NOTIFICATION, onboardingString));
61-
} catch (Exception ex) {
62-
context.getLogger().warning("Error during sendNotifications execution, msg: " + ex.getMessage());
63-
return request.createResponseBuilder(HttpStatus.BAD_REQUEST)
64-
.body("Malformed object onboarding in input.")
65-
.build();
67+
notificationEventService.send(context, onboarding, queueEvent);
68+
return request.createResponseBuilder(HttpStatus.OK).build();
6669
}
67-
notificationEventService.send(context, onboarding, queueEvent);
68-
return request.createResponseBuilder(HttpStatus.OK).build();
6970
}
7071

7172
/**

apps/onboarding-functions/src/main/java/it/pagopa/selfcare/onboarding/service/NotificationEventServiceDefault.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import it.pagopa.selfcare.onboarding.utils.NotificationBuilder;
1616
import it.pagopa.selfcare.onboarding.repository.TokenRepository;
1717
import it.pagopa.selfcare.onboarding.utils.NotificationBuilderFactory;
18+
import it.pagopa.selfcare.onboarding.utils.QueueEventExaminer;
1819
import it.pagopa.selfcare.product.entity.Product;
1920
import it.pagopa.selfcare.product.service.ProductService;
2021
import jakarta.enterprise.context.ApplicationScoped;
@@ -23,6 +24,7 @@
2324
import org.openapi.quarkus.core_json.api.InstitutionApi;
2425
import org.openapi.quarkus.core_json.model.InstitutionResponse;
2526

27+
import java.util.Objects;
2628
import java.util.Optional;
2729
import java.util.UUID;
2830

@@ -42,27 +44,34 @@ public class NotificationEventServiceDefault implements NotificationEventService
4244
private final NotificationBuilderFactory notificationBuilderFactory;
4345
private final TokenRepository tokenRepository;
4446
private final ObjectMapper mapper;
47+
private final QueueEventExaminer queueEventExaminer;
4548

4649
public NotificationEventServiceDefault(ProductService productService,
4750
NotificationConfig notificationConfig,
4851
NotificationBuilderFactory notificationBuilderFactory,
49-
TokenRepository tokenRepository) {
52+
TokenRepository tokenRepository,
53+
QueueEventExaminer queueEventExaminer) {
5054
this.productService = productService;
5155
this.notificationConfig = notificationConfig;
5256
this.notificationBuilderFactory = notificationBuilderFactory;
5357
this.tokenRepository = tokenRepository;
58+
this.queueEventExaminer = queueEventExaminer;
5459
mapper = new ObjectMapper();
5560
mapper.registerModule(new JavaTimeModule());
5661
}
5762

5863
public void send(ExecutionContext context, Onboarding onboarding, QueueEvent queueEvent) {
5964
Product product = productService.getProduct(onboarding.getProductId());
6065
if (product.getConsumers() == null || product.getConsumers().isEmpty()) {
61-
context.getLogger().warning(String.format("Node consumers is null or empty for product with ID %s", onboarding.getProductId()));
66+
context.getLogger().warning(() -> String.format("Node consumers is null or empty for product with ID %s", onboarding.getProductId()));
6267
return;
6368
}
6469

6570
try {
71+
if(Objects.isNull(queueEvent)) {
72+
queueEvent = queueEventExaminer.determineEventType(onboarding);
73+
}
74+
6675
Optional<Token> token = tokenRepository.findByOnboardingId(onboarding.getId());
6776
InstitutionResponse institution = institutionApi.retrieveInstitutionByIdUsingGET(onboarding.getInstitution().getId());
6877

@@ -71,7 +80,7 @@ public void send(ExecutionContext context, Onboarding onboarding, QueueEvent que
7180
prepareAndSendNotification(context, product, consumerConfig, onboarding, token.orElse(null), institution, queueEvent);
7281
}
7382
} catch (Exception e) {
74-
context.getLogger().warning(String.format("Error during send notification for onboarding with ID %s. Error: %s", onboarding.getId(), e.getMessage()));
83+
context.getLogger().warning(() -> String.format("Error during send notification for onboarding with ID %s. Error: %s", onboarding.getId(), e.getMessage()));
7584
throw new NotificationException(String.format("Impossible to send notification for onboarding %s", onboarding));
7685
}
7786
}
@@ -84,20 +93,20 @@ private void prepareAndSendNotification(ExecutionContext context, Product produc
8493
sendNotification(context, consumer.topic(), notificationToSend);
8594
sendTestEnvProductsNotification(context, product, consumer.topic(), notificationToSend);
8695
} else {
87-
context.getLogger().info(String.format("Notification not sent for onboarding %s on topic %s", onboarding.getId(), consumer.topic()));
96+
context.getLogger().info(() -> String.format("Notification not sent for onboarding %s on topic %s", onboarding.getId(), consumer.topic()));
8897
}
8998
}
9099

91100
private void sendNotification(ExecutionContext context, String topic, NotificationToSend notificationToSend) throws JsonProcessingException {
92101
String message = mapper.writeValueAsString(notificationToSend);
93-
context.getLogger().info(String.format("Sending notification on topic: %s with message: %s", topic, message));
102+
context.getLogger().info(() -> String.format("Sending notification on topic: %s with message: %s", topic, message));
94103
eventHubRestClient.sendMessage(topic, message);
95104
}
96105

97106
private void sendTestEnvProductsNotification(ExecutionContext context, Product product, String topic, NotificationToSend notificationToSend) throws JsonProcessingException {
98107
if (product.getTestEnvProductIds() != null) {
99108
for (String testEnvProductId : product.getTestEnvProductIds()) {
100-
context.getLogger().info(String.format("Notification for onboarding with id: %s should be sent on topic: %s for envProduct : %s", notificationToSend.getOnboardingTokenId(), topic, testEnvProductId));
109+
context.getLogger().info(() -> String.format("Notification for onboarding with id: %s should be sent on topic: %s for envProduct : %s", notificationToSend.getOnboardingTokenId(), topic, testEnvProductId));
101110
notificationToSend.setId(UUID.randomUUID().toString());
102111
notificationToSend.setProduct(testEnvProductId);
103112
sendNotification(context, topic, notificationToSend);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package it.pagopa.selfcare.onboarding.utils;
2+
3+
import it.pagopa.selfcare.onboarding.config.NotificationConfig;
4+
import it.pagopa.selfcare.onboarding.dto.QueueEvent;
5+
import it.pagopa.selfcare.onboarding.entity.Onboarding;
6+
import jakarta.enterprise.context.ApplicationScoped;
7+
8+
import java.time.LocalDateTime;
9+
import java.util.Objects;
10+
11+
@ApplicationScoped
12+
public class QueueEventExaminer {
13+
private final int minutesThresholdForUpdateNotification;
14+
15+
public QueueEventExaminer(NotificationConfig notificationConfig) {
16+
this.minutesThresholdForUpdateNotification = notificationConfig.minutesThresholdForUpdateNotification();
17+
}
18+
public QueueEvent determineEventType(Onboarding onboarding) {
19+
return switch (onboarding.getStatus()) {
20+
case COMPLETED -> (isOverUpdateThreshold(onboarding.getUpdatedAt(), onboarding.getActivatedAt())) ? QueueEvent.UPDATE : QueueEvent.ADD;
21+
case DELETED -> QueueEvent.UPDATE;
22+
default -> throw new IllegalArgumentException("Onboarding status not supported");
23+
};
24+
}
25+
26+
private boolean isOverUpdateThreshold(LocalDateTime updatedAt, LocalDateTime activatedAt) {
27+
return Objects.nonNull(updatedAt)
28+
&& Objects.nonNull(activatedAt)
29+
&& updatedAt.isAfter(activatedAt.plusMinutes(minutesThresholdForUpdateNotification));
30+
}
31+
}

apps/onboarding-functions/src/main/resources/application.properties

+1
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,4 @@ notification.consumers.fd.name=${FD_SHARED_ACCESS_KEY_NAME:test}
163163
notification.consumers.fd.key=${EVENTHUB_SC_CONTRACTS_FD_SELFCARE_WO_KEY_LC:test}
164164
notification.consumers.fd.allowed-institution-types=null
165165
notification.consumers.fd.allowed-origins=null
166+
notification.minutes-threshold-for-update-notification=${MINUTES_THRESHOLD_FOR_UPDATE_NOTIFICATION:5}

apps/onboarding-functions/src/test/java/it/pagopa/selfcare/onboarding/service/NotificationEventServiceDefaultTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import it.pagopa.selfcare.onboarding.utils.BaseNotificationBuilder;
1515
import it.pagopa.selfcare.onboarding.repository.TokenRepository;
1616
import it.pagopa.selfcare.onboarding.utils.NotificationBuilderFactory;
17+
import it.pagopa.selfcare.onboarding.utils.QueueEventExaminer;
1718
import it.pagopa.selfcare.product.entity.Product;
1819
import it.pagopa.selfcare.product.service.ProductService;
1920
import jakarta.inject.Inject;
@@ -52,6 +53,9 @@ public class NotificationEventServiceDefaultTest {
5253
@InjectMock
5354
InstitutionApi institutionApi;
5455

56+
@InjectMock
57+
QueueEventExaminer queueEventExaminer;
58+
5559

5660
@Test
5761
void sendMessage() {
@@ -69,6 +73,23 @@ void sendMessage() {
6973
.sendMessage(anyString(), anyString());
7074
}
7175

76+
@Test
77+
void sendMessageWithoutQueueEvent() {
78+
final Onboarding onboarding = createOnboarding();
79+
final Product product = createProduct();
80+
when(productService.getProduct(any())).thenReturn(product);
81+
mockNotificationMapper(true);
82+
when(tokenRepository.findByOnboardingId(any())).thenReturn(Optional.of(new Token()));
83+
when(institutionApi.retrieveInstitutionByIdUsingGET(any())).thenReturn(new InstitutionResponse());
84+
ExecutionContext context = mock(ExecutionContext.class);
85+
doReturn(Logger.getGlobal()).when(context).getLogger();
86+
doNothing().when(eventHubRestClient).sendMessage(anyString(), anyString());
87+
when(queueEventExaminer.determineEventType(any())).thenReturn(QueueEvent.ADD);
88+
messageServiceDefault.send(context, onboarding, null);
89+
verify(eventHubRestClient, times(3))
90+
.sendMessage(anyString(), anyString());
91+
}
92+
7293
private void mockNotificationMapper(boolean shouldSendNotification) {
7394
BaseNotificationBuilder notificationMapper = mock(BaseNotificationBuilder.class);
7495
when(notificationBuilderFactory.create(any())).thenReturn(notificationMapper);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package it.pagopa.selfcare.onboarding.utils;
2+
3+
import io.quarkus.test.InjectMock;
4+
import io.quarkus.test.junit.QuarkusTest;
5+
import it.pagopa.selfcare.onboarding.common.OnboardingStatus;
6+
import it.pagopa.selfcare.onboarding.config.NotificationConfig;
7+
import it.pagopa.selfcare.onboarding.dto.QueueEvent;
8+
import it.pagopa.selfcare.onboarding.entity.Onboarding;
9+
import jakarta.inject.Inject;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.DisplayName;
12+
import org.junit.jupiter.api.Test;
13+
14+
import java.time.LocalDateTime;
15+
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.junit.jupiter.api.Assertions.assertThrows;
18+
import static org.mockito.Mockito.mock;
19+
import static org.mockito.Mockito.when;
20+
21+
@QuarkusTest
22+
class QueueEventExaminerTest {
23+
@Inject
24+
private QueueEventExaminer queueEventExaminer;
25+
26+
private NotificationConfig notificationConfig;
27+
28+
@BeforeEach
29+
public void setup() {
30+
notificationConfig = mock(NotificationConfig.class);
31+
when(notificationConfig.minutesThresholdForUpdateNotification()).thenReturn(5);
32+
}
33+
34+
@Test
35+
@DisplayName("Should return ADD event for COMPLETED status and update within threshold")
36+
public void shouldReturnAddEventForCompletedStatusAndUpdateWithinThreshold() {
37+
Onboarding onboarding = new Onboarding();
38+
onboarding.setStatus(OnboardingStatus.COMPLETED);
39+
onboarding.setUpdatedAt(LocalDateTime.now());
40+
onboarding.setActivatedAt(LocalDateTime.now().minusMinutes(1));
41+
42+
QueueEvent result = queueEventExaminer.determineEventType(onboarding);
43+
44+
assertEquals(QueueEvent.ADD, result);
45+
}
46+
47+
@Test
48+
@DisplayName("Should return UPDATE event for COMPLETED status and update over threshold")
49+
public void shouldReturnUpdateEventForCompletedStatusAndUpdateOverThreshold() {
50+
Onboarding onboarding = new Onboarding();
51+
onboarding.setStatus(OnboardingStatus.COMPLETED);
52+
onboarding.setUpdatedAt(LocalDateTime.now());
53+
onboarding.setActivatedAt(LocalDateTime.now().minusMinutes(10));
54+
55+
QueueEvent result = queueEventExaminer.determineEventType(onboarding);
56+
57+
assertEquals(QueueEvent.UPDATE, result);
58+
}
59+
60+
@Test
61+
@DisplayName("Should return UPDATE event for DELETED status")
62+
public void shouldReturnUpdateEventForDeletedStatus() {
63+
Onboarding onboarding = new Onboarding();
64+
onboarding.setStatus(OnboardingStatus.DELETED);
65+
66+
QueueEvent result = queueEventExaminer.determineEventType(onboarding);
67+
68+
assertEquals(QueueEvent.UPDATE, result);
69+
}
70+
71+
@Test
72+
@DisplayName("Should throw IllegalArgumentException for unsupported status")
73+
public void shouldThrowIllegalArgumentExceptionForUnsupportedStatus() {
74+
Onboarding onboarding = new Onboarding();
75+
onboarding.setStatus(OnboardingStatus.PENDING);
76+
77+
assertThrows(IllegalArgumentException.class, () -> queueEventExaminer.determineEventType(onboarding));
78+
}
79+
}

infra/functions/onboarding-functions/env/dev-pnpg/terraform.tfvars

+1
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,5 @@ app_settings = {
8787
"FD_TOPIC_NAME" = "Selfcare-FD"
8888
"SAP_ALLOWED_INSTITUTION_TYPE" = "PA,GSP,SA,AS,SCP"
8989
"SAP_ALLOWED_ORIGINS" = "IPA,SELC"
90+
"MINUTES_THRESHOLD_FOR_UPDATE_NOTIFICATION" = "5"
9091
}

infra/functions/onboarding-functions/env/dev/terraform.tfvars

+1
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,5 @@ app_settings = {
8484
"FD_TOPIC_NAME" = "Selfcare-FD",
8585
"SAP_ALLOWED_INSTITUTION_TYPE" = "PA,GSP,SA,AS,SCP",
8686
"SAP_ALLOWED_ORIGINS" = "IPA,SELC"
87+
"MINUTES_THRESHOLD_FOR_UPDATE_NOTIFICATION" = "5"
8788
}

infra/functions/onboarding-functions/env/prod-pnpg/terraform.tfvars

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ app_settings = {
7878
"FD_TOPIC_NAME" = "Selfcare-FD"
7979
"SAP_ALLOWED_INSTITUTION_TYPE" = "PA,GSP,SA,AS,SCP"
8080
"SAP_ALLOWED_ORIGINS" = "IPA,SELC"
81+
"MINUTES_THRESHOLD_FOR_UPDATE_NOTIFICATION" = "5"
8182

8283
## IGNORE VALUES
8384

infra/functions/onboarding-functions/env/prod/terraform.tfvars

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ app_settings = {
8585
"FD_TOPIC_NAME" = "Selfcare-FD"
8686
"SAP_ALLOWED_INSTITUTION_TYPE" = "PA,GSP,SA,AS,SCP"
8787
"SAP_ALLOWED_ORIGINS" = "IPA,SELC"
88+
"MINUTES_THRESHOLD_FOR_UPDATE_NOTIFICATION" = "5"
8889

8990
##ARUBA SIGNATURE
9091
"PAGOPA_SIGNATURE_SOURCE" = "aruba",

infra/functions/onboarding-functions/env/uat-pnpg/terraform.tfvars

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ app_settings = {
7777
"FD_TOPIC_NAME" = "Selfcare-FD"
7878
"SAP_ALLOWED_INSTITUTION_TYPE" = "PA,GSP,SA,AS,SCP"
7979
"SAP_ALLOWED_ORIGINS" = "IPA,SELC"
80+
"MINUTES_THRESHOLD_FOR_UPDATE_NOTIFICATION" = "5"
8081

8182
## IGNORE VALUES
8283

infra/functions/onboarding-functions/env/uat/terraform.tfvars

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ app_settings = {
8484
"FD_TOPIC_NAME" = "Selfcare-FD"
8585
"SAP_ALLOWED_INSTITUTION_TYPE" = "PA,GSP,SA,AS,SCP"
8686
"SAP_ALLOWED_ORIGINS" = "IPA,SELC"
87+
"MINUTES_THRESHOLD_FOR_UPDATE_NOTIFICATION" = "5"
8788

8889
##ARUBA SIGNATURE
8990
"PAGOPA_SIGNATURE_SOURCE" = "disabled",

0 commit comments

Comments
 (0)