Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SELC-4897] feat: Added function to resend notification #276

Merged
merged 10 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void consumerOnboardingEvent(ChangeStreamDocument<Onboarding> document
log.info("Starting consumerOnboardingEvent ... ");
log.info("Sending Onboarding notification having id {}", document.getFullDocument().getId());

notificationsApi.apiNotificationsPost(onboardingMapper.toEntity(document.getFullDocument()))
notificationsApi.apiNotificationPost(onboardingMapper.toEntity(document.getFullDocument()))
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofHours(retryMaxBackOff)).atMost(maxRetry)
.subscribe().with(
result -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
]
}
},
"/api/Notifications": {
"/api/Notification": {
"post": {
"tags": [
"Notifications"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ onboarding-cdc.retry=${ONBOARDING-CDC-RETRY:3}
quarkus.openapi-generator.codegen.spec.onboarding_functions_json.mutiny=true
quarkus.openapi-generator.codegen.spec.onboarding_functions_json.additional-model-type-annotations=@lombok.Builder; @lombok.NoArgsConstructor; @lombok.AllArgsConstructor
quarkus.openapi-generator.onboarding_functions_json.auth.api_key.api-key=${ONBOARDING-FUNCTIONS-API-KEY:example-api-key}
quarkus.rest-client."org.openapi.quarkus.onboarding_functions_json.api.OrchestrationApi".url=${ONBOARDING_FUNCTIONS_URL:http://localhost:8080}
quarkus.rest-client."org.openapi.quarkus.onboarding_functions_json.api.OrchestrationApi".read-timeout=60000
quarkus.rest-client."org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi".url=${ONBOARDING_FUNCTIONS_URL:http://localhost:8080}
quarkus.rest-client."org.openapi.quarkus.onboarding_functions_json.api.NotificationsApi".read-timeout=60000
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.Objects;

public class EventhubSasTokenAuthorization implements ClientRequestFilter {

Expand All @@ -32,12 +32,11 @@ public EventhubSasTokenAuthorization(@Context @ConfigProperty(name = "rest-clien
public void filter(ClientRequestContext clientRequestContext) {
final String[] paths = clientRequestContext.getUri().getPath().split("/");
final String topic = paths[1];
NotificationConfig.Consumer consumerConfiguration = notificationConfig.consumers().entrySet().stream()
.filter(consumer -> consumer.getValue().topic().equals(topic))
.findFirst()
.map(Map.Entry::getValue)
.get();
clientRequestContext.getHeaders().add("Authorization", getSASToken(resourceUri.toString(), consumerConfiguration.name(), consumerConfiguration.key()));
NotificationConfig.Consumer consumerConfiguration = notificationConfig.consumers().values().stream()
.filter(consumer -> consumer.topic().equals(topic))
.findFirst().orElse(null);
if(Objects.nonNull(consumerConfiguration))
clientRequestContext.getHeaders().add("Authorization", getSASToken(resourceUri.toString(), consumerConfiguration.name(), consumerConfiguration.key()));
}

private static String getSASToken(String resourceUri, String keyName, String key) {
Expand All @@ -55,15 +54,15 @@ private static String getSASToken(String resourceUri, String keyName, String key
}

private static String getHMAC256(String key, String input) {
Mac sha256_HMAC;
Mac sha256HMAC;
String hash = null;
try {
sha256_HMAC = Mac.getInstance("HmacSHA256");
sha256HMAC = Mac.getInstance("HmacSHA256");
SecretKeySpec secretKey = new SecretKeySpec(key.getBytes(), "HmacSHA256");
sha256_HMAC.init(secretKey);
sha256HMAC.init(secretKey);
Base64.Encoder encoder = Base64.getEncoder();

hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes(StandardCharsets.UTF_8))));
hash = new String(encoder.encode(sha256HMAC.doFinal(input.getBytes(StandardCharsets.UTF_8))));

} catch (Exception e) {
log.warn("Impossible to sign token for event hub rest client. Error: {}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package it.pagopa.selfcare.onboarding.entity;

public enum Topic {
SC_CONTRACTS_FD("SC-Contracts-FD"),
SC_CONTRACTS_FD("Selfcare-FD"),
SC_CONTRACTS_SAP("SC-Contracts-SAP"),
SC_CONTRACTS("SC-Contracts");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import it.pagopa.selfcare.onboarding.entity.Onboarding;
import it.pagopa.selfcare.onboarding.dto.QueueEvent;
import it.pagopa.selfcare.onboarding.service.NotificationEventService;
import it.pagopa.selfcare.onboarding.service.OnboardingService;

import java.util.Objects;
import java.util.Optional;
Expand All @@ -20,21 +21,24 @@
public class NotificationFunctions {

private final NotificationEventService notificationEventService;
private final OnboardingService onboardingService;
private final ObjectMapper objectMapper;

public NotificationFunctions(ObjectMapper objectMapper,
NotificationEventService notificationEventService) {
NotificationEventService notificationEventService,
OnboardingService onboardingService) {
this.objectMapper = objectMapper;
this.notificationEventService = notificationEventService;
this.onboardingService = onboardingService;
}

/**
* This HTTP-triggered function sends messages through event hub.
* It gets invoked by module onboarding-cdc when status is COMPLETED or DELETED
*/
@FunctionName("Notifications")
@FunctionName("Notification")
@FixedDelayRetry(maxRetryCount = 3, delayInterval = "00:00:30")
public HttpResponseMessage sendNotifications(
public HttpResponseMessage sendNotification(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.FUNCTION) HttpRequestMessage<Optional<String>> request,
final ExecutionContext context) {
context.getLogger().info("sendNotifications trigger processed a request");
Expand All @@ -60,7 +64,34 @@ public HttpResponseMessage sendNotifications(
.body("Malformed object onboarding in input.")
.build();
}
notificationEventService.send(onboarding, queueEvent);
notificationEventService.send(context, onboarding, queueEvent);
return request.createResponseBuilder(HttpStatus.OK).build();
}

/**
* This HTTP-triggered function retrieves onboarding given its identifier
* After that, It sends a message on topics through the event bus
*/
@FunctionName("ResendNotification")
public HttpResponseMessage resendNotification(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.FUNCTION) HttpRequestMessage<Optional<String>> request,
final ExecutionContext context) {
context.getLogger().info("sendNotifications trigger processed a request");

final String onboardingId = request.getQueryParameters().get("onboardingId");
if (Objects.isNull(onboardingId)) {
return request.createResponseBuilder(HttpStatus.BAD_REQUEST)
.body("onboardingId cannot be empty.")
.build();
}

final Optional<Onboarding> onboarding = onboardingService.getOnboarding(onboardingId);
if(onboarding.isEmpty()) {
return request.createResponseBuilder(HttpStatus.NOT_FOUND)
.body("Onboarding with ID: " + onboardingId + " not found")
.build();
}
notificationEventService.send(context, onboarding.get(), QueueEvent.UPDATE);
return request.createResponseBuilder(HttpStatus.OK).build();
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package it.pagopa.selfcare.onboarding.service;

import com.microsoft.azure.functions.ExecutionContext;
import it.pagopa.selfcare.onboarding.entity.Onboarding;
import it.pagopa.selfcare.onboarding.dto.QueueEvent;

public interface NotificationEventService {
void send(Onboarding onboarding, QueueEvent queueEvent);

void send(ExecutionContext context, Onboarding onboarding, QueueEvent queueEvent);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.microsoft.azure.functions.ExecutionContext;
import it.pagopa.selfcare.onboarding.client.eventhub.EventHubRestClient;
import it.pagopa.selfcare.onboarding.config.NotificationConfig;
import it.pagopa.selfcare.onboarding.dto.QueueEvent;
Expand All @@ -19,8 +20,6 @@
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.openapi.quarkus.core_json.api.InstitutionApi;
import org.openapi.quarkus.core_json.model.InstitutionResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Objects;
Expand All @@ -41,7 +40,6 @@ public class NotificationEventServiceDefault implements NotificationEventService
private final NotificationConfig notificationConfig;
private final NotificationMapperFactory notificationMapperFactory;
private final TokenRepository tokenRepository;
private static final Logger log = LoggerFactory.getLogger(NotificationEventServiceDefault.class);
private final ObjectMapper mapper;

public NotificationEventServiceDefault(ProductService productService,
Expand All @@ -57,32 +55,30 @@ public NotificationEventServiceDefault(ProductService productService,
}

@Override
public void send(Onboarding onboarding, QueueEvent queueEvent) {
public void send(ExecutionContext context, Onboarding onboarding, QueueEvent queueEvent) {
final Product product = productService.getProduct(onboarding.getProductId());
final Map<String, NotificationConfig.Consumer> config = notificationConfig.consumers();
if (Objects.isNull(product.getConsumers())) {
log.warn("Node consumers is null for product with ID {}", onboarding.getProductId());
context.getLogger().warning("Node consumers is null for product with ID " + onboarding.getProductId());
return;
}

try {
Optional<Token> token = tokenRepository.findByOnboardingId(onboarding.getId());
if (token.isEmpty()) {
log.warn("Token not found for onboarding {}", onboarding.getId());
context.getLogger().warning("Token not found for onboarding " + onboarding.getId());
return;
}
InstitutionResponse institution = institutionApi.retrieveInstitutionByIdUsingGET(onboarding.getInstitution().getId());

for (String consumer : product.getConsumers()) {
final String topic = config.get(consumer.toLowerCase()).topic();
NotificationMapper notificationMapper = notificationMapperFactory.create(topic);
final String message = mapper.writeValueAsString(notificationMapper.toNotificationToSend(onboarding, token.get(), institution, queueEvent));
eventHubRestClient.sendMessage(topic, message);
log.info("Sent notification on topic: {}", topic);
context.getLogger().info("Sent notification on topic: " + topic);
}
} catch (Exception e) {
log.warn("Error during send notification for object {}: {} ", onboarding, e.getMessage(), e);
throw new NotificationException("Impossible to send notification for object " + onboarding);
context.getLogger().warning("Error during send notification for onboarding with ID " + onboarding.getId() + ". Error: " + e.getMessage());
throw new NotificationException("Impossible to send notification for onboarding " + onboarding);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ quarkus.rest-client."it.pagopa.selfcare.onboarding.client.eventhub.EventHubRestC
rest-client.event-hub.uri=${EVENT_HUB_BASE_PATH:null}
notification.consumers.standard.topic=${STANDARD_TOPIC_NAME:SC-Contracts}
notification.consumers.standard.name=${STANDARD_SHARED_ACCESS_KEY_NAME:test}
notification.consumers.standard.key=${EVENTHUB-SC-CONTRACTS-SELFCARE-WO-KEY-LC:test}
notification.consumers.standard.key=${EVENTHUB_SC_CONTRACTS_SELFCARE_WO_KEY_LC:test}
notification.consumers.sap.topic=${SAP_TOPIC_NAME:SC-Contracts-SAP}
notification.consumers.sap.name=${SAP_SHARED_ACCESS_KEY_NAME:test}
notification.consumers.sap.key=${EVENTHUB-SC-CONTRACTS_SAP-SELFCARE-WO-KEY-LC:test}
notification.consumers.fd.topic=${FD_TOPIC_NAME:SC-Contracts-FD}
notification.consumers.sap.key=${EVENTHUB_SC_CONTRACTS_SAP_SELFCARE_WO_KEY-LC:test}
notification.consumers.fd.topic=${FD_TOPIC_NAME:Selfcare-FD}
notification.consumers.fd.name=${FD_SHARED_ACCESS_KEY_NAME:test}
notification.consumers.fd.key=${EVENTHUB-SC-CONTRACTS_FD-SELFCARE-WO-KEY-LC:test}
notification.consumers.fd.key=${EVENTHUB_SC_CONTRACTS_FD_SELFCARE_WO_KEY_LC:test}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
import io.quarkus.test.InjectMock;
import io.quarkus.test.junit.QuarkusTest;
import it.pagopa.selfcare.onboarding.HttpResponseMessageMock;
import it.pagopa.selfcare.onboarding.entity.Onboarding;
import it.pagopa.selfcare.onboarding.service.NotificationEventService;
import it.pagopa.selfcare.onboarding.service.OnboardingService;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Logger;

Expand All @@ -30,6 +34,9 @@ public class NotificationFunctionsTest {
@Inject
NotificationFunctions function;

@InjectMock
OnboardingService onboardingService;

@InjectMock
NotificationEventService notificationEventService;

Expand Down Expand Up @@ -60,18 +67,98 @@ public void sendNotificationTrigger() {
doReturn(Logger.getGlobal()).when(context).getLogger();

// Invoke
HttpResponseMessage responseMessage = function.sendNotifications(req, context);
HttpResponseMessage responseMessage = function.sendNotification(req, context);

// Verify
Mockito.verify(notificationEventService, times(1))
.send(any(), any());
.send(any(), any(), any());
assertEquals(HttpStatus.OK.value(), responseMessage.getStatusCode());

}

@Test
public void resendNotificationTrigger() {
// Setup
@SuppressWarnings("unchecked")
final HttpRequestMessage<Optional<String>> req = mock(HttpRequestMessage.class);

final Map<String, String> queryParams = new HashMap<>();
final String onboardingId = "onboardingId";
queryParams.put("onboardingId", onboardingId);
doReturn(queryParams).when(req).getQueryParameters();

doAnswer((Answer<HttpResponseMessage.Builder>) invocation -> {
HttpStatus status = (HttpStatus) invocation.getArguments()[0];
return new HttpResponseMessageMock.HttpResponseMessageBuilderMock().status(status);
}).when(req).createResponseBuilder(any(HttpStatus.class));

final ExecutionContext context = mock(ExecutionContext.class);
doReturn(Logger.getGlobal()).when(context).getLogger();
when(onboardingService.getOnboarding(onboardingId)).thenReturn(Optional.of(new Onboarding()));

// Invoke
HttpResponseMessage responseMessage = function.resendNotification(req, context);

// Verify
Mockito.verify(notificationEventService, times(1))
.send(any(), any(), any());
assertEquals(HttpStatus.OK.value(), responseMessage.getStatusCode());

}

@Test
public void resendNotificationNullOnboardingId() {
// Setup
@SuppressWarnings("unchecked")
final HttpRequestMessage<Optional<String>> req = mock(HttpRequestMessage.class);

final Map<String, String> queryParams = new HashMap<>();
doReturn(queryParams).when(req).getQueryParameters();

doAnswer((Answer<HttpResponseMessage.Builder>) invocation -> {
HttpStatus status = (HttpStatus) invocation.getArguments()[0];
return new HttpResponseMessageMock.HttpResponseMessageBuilderMock().status(status);
}).when(req).createResponseBuilder(any(HttpStatus.class));

final ExecutionContext context = mock(ExecutionContext.class);
doReturn(Logger.getGlobal()).when(context).getLogger();

// Invoke
HttpResponseMessage responseMessage = function.resendNotification(req, context);
assertEquals(HttpStatus.BAD_REQUEST.value(), responseMessage.getStatusCode());

}

@Test
public void resendNotificationOnboardingNotFound() {
// Setup
@SuppressWarnings("unchecked")
final HttpRequestMessage<Optional<String>> req = mock(HttpRequestMessage.class);

final Map<String, String> queryParams = new HashMap<>();
final String onboardingId = "onboardingId";
queryParams.put("onboardingId", onboardingId);
doReturn(queryParams).when(req).getQueryParameters();

doAnswer((Answer<HttpResponseMessage.Builder>) invocation -> {
HttpStatus status = (HttpStatus) invocation.getArguments()[0];
return new HttpResponseMessageMock.HttpResponseMessageBuilderMock().status(status);
}).when(req).createResponseBuilder(any(HttpStatus.class));

final ExecutionContext context = mock(ExecutionContext.class);
doReturn(Logger.getGlobal()).when(context).getLogger();
when(onboardingService.getOnboarding(onboardingId)).thenReturn(Optional.empty());

// Invoke
HttpResponseMessage responseMessage = function.resendNotification(req, context);
assertEquals(HttpStatus.NOT_FOUND.value(), responseMessage.getStatusCode());

}

@Test
public void sendNotificationTriggerError() {
// Setup
@SuppressWarnings("unchecked")
final HttpRequestMessage<Optional<String>> req = mock(HttpRequestMessage.class);
final String malformedOnboarding = "{\"onboardingId\":\"onboardingId\"";
final Optional<String> queryBody = Optional.of(malformedOnboarding);
Expand All @@ -83,7 +170,7 @@ public void sendNotificationTriggerError() {
return new HttpResponseMessageMock.HttpResponseMessageBuilderMock().status(status);
}).when(req).createResponseBuilder(any(HttpStatus.class));
// Invoke
HttpResponseMessage responseMessage = function.sendNotifications(req, context);
HttpResponseMessage responseMessage = function.sendNotification(req, context);
assertEquals(HttpStatus.BAD_REQUEST.value(), responseMessage.getStatusCode());

}
Expand Down
Loading
Loading