Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SELC-5034] feature: added logic to discriminate event type in cdc module #288

Merged
merged 3 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package it.pagopa.selfcare.onboarding.event;

import io.smallrye.mutiny.Uni;
import it.pagopa.selfcare.onboarding.event.entity.Onboarding;
import it.pagopa.selfcare.onboarding.event.entity.util.QueueEvent;
import it.pagopa.selfcare.onboarding.event.mapper.OnboardingMapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi;
import org.openapi.quarkus.onboarding_functions_json.model.OrchestrationResponse;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;

@ApplicationScoped
public class NotificationService {
@Inject
@RestClient
NotificationsApi notificationsApi;
private final OnboardingMapper onboardingMapper;
private final Integer retryMinBackOff;
private final Integer retryMaxBackOff;
private final Integer maxRetry;
private final Integer minutesThresholdForUpdateNotification;

public NotificationService(OnboardingMapper onboardingMapper,
@ConfigProperty(name = "onboarding-cdc.retry.min-backoff") Integer retryMinBackOff,
@ConfigProperty(name = "onboarding-cdc.retry.max-backoff") Integer retryMaxBackOff,
@ConfigProperty(name = "onboarding-cdc.retry") Integer maxRetry,
@ConfigProperty(name = "onboarding-cdc.minutes-threshold-for-update-notification") Integer minutesThresholdForUpdateNotification) {
this.onboardingMapper = onboardingMapper;
this.retryMinBackOff = retryMinBackOff;
this.retryMaxBackOff = retryMaxBackOff;
this.maxRetry = maxRetry;
this.minutesThresholdForUpdateNotification = minutesThresholdForUpdateNotification;
}

public Uni<OrchestrationResponse> invokeNotificationApi(Onboarding onboarding) {
assert onboarding != null;
QueueEvent queueEvent = determineEventType(onboarding);
return notificationsApi.apiNotificationPost(queueEvent.name(), onboardingMapper.toEntity(onboarding))
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofHours(retryMaxBackOff)).atMost(maxRetry);
}

private QueueEvent determineEventType(Onboarding onboarding) {
return switch (onboarding.getStatus()) {
case COMPLETED -> (isOverUpdateThreshold(onboarding.getUpdatedAt(), onboarding.getActivatedAt())) ? QueueEvent.UPDATE : QueueEvent.ADD;
case DELETED -> QueueEvent.UPDATE;
default -> throw new IllegalArgumentException("Onboarding status not supported");
};
}

private boolean isOverUpdateThreshold(LocalDateTime updatedAt, LocalDateTime activatedAt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be better to add some comments about this function...what means "isOverUpdateThreshold" ?

return Objects.nonNull(updatedAt)
&& Objects.nonNull(activatedAt)
&& updatedAt.isAfter(activatedAt.plusMinutes(minutesThresholdForUpdateNotification));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,24 @@
import io.smallrye.mutiny.Multi;
import it.pagopa.selfcare.onboarding.event.constant.CdcStartAtConstant;
import it.pagopa.selfcare.onboarding.event.entity.Onboarding;
import it.pagopa.selfcare.onboarding.event.mapper.OnboardingMapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi;

import java.time.Duration;
import java.util.*;

import static com.mongodb.client.model.Projections.fields;
import static com.mongodb.client.model.Projections.include;
import static it.pagopa.selfcare.onboarding.common.OnboardingStatus.COMPLETED;
import static it.pagopa.selfcare.onboarding.common.OnboardingStatus.DELETED;
import static java.util.Arrays.asList;

@Startup
@Slf4j
@ApplicationScoped
public class OnboardingCdcService {

@Inject
@RestClient
NotificationsApi notificationsApi;
private final OnboardingMapper onboardingMapper;
private static final String COLLECTION_NAME = "onboardings";
private static final String OPERATION_NAME = "ONBOARDING-CDC-OnboardingsUpdate";
private static final String EVENT_NAME = "ONBOARDING-CDC";
Expand All @@ -52,25 +44,18 @@ public class OnboardingCdcService {
private final TableClient tableClient;
private final String mongodbDatabase;
private final ReactiveMongoClient mongoClient;
private final Integer retryMinBackOff;
private final Integer retryMaxBackOff;
private final Integer maxRetry;
private final NotificationService notificationService;

public OnboardingCdcService(OnboardingMapper onboardingMapper, ReactiveMongoClient mongoClient,
public OnboardingCdcService(ReactiveMongoClient mongoClient,
@ConfigProperty(name = "quarkus.mongodb.database") String mongodbDatabase,
@ConfigProperty(name = "onboarding-cdc.retry.min-backoff") Integer retryMinBackOff,
@ConfigProperty(name = "onboarding-cdc.retry.max-backoff") Integer retryMaxBackOff,
@ConfigProperty(name = "onboarding-cdc.retry") Integer maxRetry,
TelemetryClient telemetryClient,
TableClient tableClient) {
this.onboardingMapper = onboardingMapper;
TableClient tableClient,
NotificationService notificationService) {
this.mongoClient = mongoClient;
this.mongodbDatabase = mongodbDatabase;
this.maxRetry = maxRetry;
this.retryMaxBackOff = retryMaxBackOff;
this.retryMinBackOff = retryMinBackOff;
this.telemetryClient = telemetryClient;
this.tableClient = tableClient;
this.notificationService = notificationService;
telemetryClient.getContext().getOperation().setName(OPERATION_NAME);
initOrderStream();
}
Expand All @@ -87,7 +72,7 @@ private void initOrderStream() {
if (Objects.nonNull(cdcStartAtEntity))
resumeToken = (String) cdcStartAtEntity.getProperty(CdcStartAtConstant.CDC_START_AT_PROPERTY);
} catch (TableServiceException e) {
log.warn("Table StarAt not found, it is starting from now ...");
log.warn("Table StartAt not found, it is starting from now ...");
}
}

Expand All @@ -100,7 +85,7 @@ private void initOrderStream() {

Bson match = Aggregates.match(Filters.and(
Filters.in("operationType", asList("update", "replace", "insert")),
Filters.in("fullDocument.status", Arrays.asList("COMPLETED", "DELETED"))));
Filters.in("fullDocument.status", Arrays.asList(COMPLETED.name(), DELETED.name()))));
Bson project = Aggregates.project(fields(include("_id", "ns", "documentKey", "fullDocument")));
List<Bson> pipeline = Arrays.asList(match, project);

Expand All @@ -123,16 +108,13 @@ private ReactiveMongoCollection<Onboarding> getCollection() {
}

protected void consumerOnboardingEvent(ChangeStreamDocument<Onboarding> document) {


assert document.getFullDocument() != null;
assert document.getDocumentKey() != null;

log.info("Starting consumerOnboardingEvent ... ");
log.info("Sending Onboarding notification having id {}", document.getFullDocument().getId());

notificationsApi.apiNotificationPost(onboardingMapper.toEntity(document.getFullDocument()))
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofHours(retryMaxBackOff)).atMost(maxRetry)
notificationService.invokeNotificationApi(document.getFullDocument())
.subscribe().with(
result -> {
log.info("Onboarding notification having id: {} successfully sent", document.getDocumentKey().toJson());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package it.pagopa.selfcare.onboarding.event.entity.util;

public enum QueueEvent {
ADD,
UPDATE
}
11 changes: 11 additions & 0 deletions apps/onboarding-cdc/src/main/openapi/onboarding_functions.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@
],
"summary": "",
"description": "",
"parameters": [
{
"name": "queueEvent",
"in": "query",
"description": "Query Event type",
"required": false,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ onboarding-cdc.storage.connection-string=${STORAGE_CONNECTION_STRING:UseDevelopm
onboarding-cdc.retry.min-backoff=${ONBOARDING-CDC-RETRY-MIN-BACKOFF:1}
onboarding-cdc.retry.max-backoff=${ONBOARDING-CDC-RETRY-MAX-BACKOFF:2}
onboarding-cdc.retry=${ONBOARDING-CDC-RETRY:3}
onboarding-cdc.minutes-threshold-for-update-notification=${ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION:5}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


quarkus.openapi-generator.codegen.spec.onboarding_functions_json.mutiny=true
quarkus.openapi-generator.codegen.spec.onboarding_functions_json.type-mappings.DateTime=java.time.LocalDateTime
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package it.pagopa.selfcare.onboarding.event;

import io.quarkus.test.InjectMock;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import it.pagopa.selfcare.onboarding.common.OnboardingStatus;
import it.pagopa.selfcare.onboarding.event.entity.Onboarding;
import it.pagopa.selfcare.onboarding.event.entity.util.QueueEvent;
import it.pagopa.selfcare.onboarding.event.mapper.OnboardingMapper;
import jakarta.inject.Inject;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi;
import org.openapi.quarkus.onboarding_functions_json.model.OrchestrationResponse;

import java.time.LocalDateTime;

import static org.mockito.Mockito.*;

@QuarkusTest
public class NotificationServiceTest {
@Mock
private OnboardingMapper onboardingMapper;
@InjectMock
@RestClient
private NotificationsApi notificationsApi;
@Inject
private NotificationService notificationService;

@Test
@DisplayName("Should handle Invoke Notification API Success passing event ADD")
public void shouldHandleInvokeNotificationApiSuccessForQueueEventAdd() {
Onboarding onboarding = new Onboarding();
onboarding.setStatus(OnboardingStatus.COMPLETED);
onboarding.setUpdatedAt(LocalDateTime.now());
onboarding.setActivatedAt(LocalDateTime.now());

when(notificationsApi.apiNotificationPost(any(), any()))
.thenReturn(Uni.createFrom().item(new OrchestrationResponse()));

UniAssertSubscriber<OrchestrationResponse> subscriber = notificationService
.invokeNotificationApi(onboarding)
.subscribe().withSubscriber(UniAssertSubscriber.create());

subscriber.assertCompleted().awaitItem();

verify(notificationsApi, times(1)).apiNotificationPost(eq(QueueEvent.ADD.name()), any());
}

@Test
@DisplayName("Should handle Invoke Notification API Success passing event UPDATE")
public void shouldHandleInvokeNotificationApiSuccessForQueueEventUpdate() {
Onboarding onboarding = new Onboarding();
onboarding.setStatus(OnboardingStatus.COMPLETED);
onboarding.setUpdatedAt(LocalDateTime.now().plusMinutes(10)); // 5 minutes should be the threshold
onboarding.setActivatedAt(LocalDateTime.now());

when(notificationsApi.apiNotificationPost(any(), any()))
.thenReturn(Uni.createFrom().item(new OrchestrationResponse()));

UniAssertSubscriber<OrchestrationResponse> subscriber = notificationService
.invokeNotificationApi(onboarding)
.subscribe().withSubscriber(UniAssertSubscriber.create());

subscriber.assertCompleted().awaitItem();

verify(notificationsApi, times(1)).apiNotificationPost(eq(QueueEvent.UPDATE.name()), any());
}

@Test
@DisplayName("Should handle Invoke Notification API Success passing event UPDATE with status DELETED")
public void shouldHandleInvokeNotificationApiSuccessForQueueEventUpdateWithStatusDeleted() {
Onboarding onboarding = new Onboarding();
onboarding.setStatus(OnboardingStatus.DELETED);
onboarding.setUpdatedAt(LocalDateTime.now()); // 5 minutes should be the threshold
onboarding.setActivatedAt(LocalDateTime.now());

when(notificationsApi.apiNotificationPost(any(), any()))
.thenReturn(Uni.createFrom().item(new OrchestrationResponse()));

UniAssertSubscriber<OrchestrationResponse> subscriber = notificationService
.invokeNotificationApi(onboarding)
.subscribe().withSubscriber(UniAssertSubscriber.create());

subscriber.assertCompleted().awaitItem();

verify(notificationsApi, times(1)).apiNotificationPost(eq(QueueEvent.UPDATE.name()), any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,17 @@ public HttpResponseMessage resendNotification(
.build();
}

final String queueEventString = request.getQueryParameters().get("queueEvent");
final QueueEvent queueEvent = Objects.isNull(queueEventString) ? QueueEvent.UPDATE : QueueEvent.valueOf(queueEventString);


final Optional<Onboarding> onboarding = onboardingService.getOnboarding(onboardingId);
if(onboarding.isEmpty()) {
return request.createResponseBuilder(HttpStatus.NOT_FOUND)
.body("Onboarding with ID: " + onboardingId + " not found")
.build();
}
notificationEventService.send(context, onboarding.get(), QueueEvent.UPDATE);
notificationEventService.send(context, onboarding.get(), queueEvent);
return request.createResponseBuilder(HttpStatus.OK).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ app_settings = [
{
name = "ONBOARDING-CDC-MONGODB-WATCH-ENABLED"
value = "true"
},
{
name = "ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION"
value = "5"
}
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ app_settings = [
{
name = "ONBOARDING_FUNCTIONS_URL"
value = "https://selc-d-onboarding-fn.azurewebsites.net"
},
{
name = "ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION"
value = "5"
}
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ app_settings = [
{
name = "ONBOARDING_FUNCTIONS_URL"
value = "https://selc-d-onboarding-fn.azurewebsites.net"
},
{
name = "ONBOARDING-CDC-MINUTES-THRESHOLD-FOR-UPDATE-NOTIFICATION"
value = "5"
}
]

Expand Down
Loading