Skip to content

Commit

Permalink
[SELC-5483] Added new WorkflowExecutorIncrementRegistrationAggregator (
Browse files Browse the repository at this point in the history
…#499)

Co-authored-by: gunzip <gunzip@users.noreply.github.com>
  • Loading branch information
flaminiaScarciofolo and gunzip authored Sep 27, 2024
1 parent b24e58c commit eaf2783
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ public void onboardingsAggregateOrchestrator(
String onboardingId = null;
try {
String onboardingAggregate = ctx.getInput(String.class);
onboardingId = ctx.callActivity(CREATE_AGGREGATE_ONBOARDING_REQUEST_ACTIVITY, onboardingAggregate, optionsRetry, String.class).await();
ctx.callSubOrchestrator("Onboardings", onboardingId, String.class).await();
boolean existsDelegation = Boolean.parseBoolean(ctx.callActivity(EXISTS_DELEGATION_ACTIVITY, onboardingAggregate, optionsRetry, String.class).await());
if(!existsDelegation) {
onboardingId = ctx.callActivity(CREATE_AGGREGATE_ONBOARDING_REQUEST_ACTIVITY, onboardingAggregate, optionsRetry, String.class).await();
ctx.callSubOrchestrator("Onboardings", onboardingId, String.class).await();
}
} catch (TaskFailedException ex) {
functionContext.getLogger().warning("Error during workflowExecutor execute, msg: " + ex.getMessage());
service.updateOnboardingStatusAndInstanceId(onboardingId, OnboardingStatus.FAILED, ctx.getInstanceId());
Expand Down Expand Up @@ -149,6 +152,7 @@ public void onboardingsOrchestrator(
case CONFIRMATION_AGGREGATE -> workflowExecutor = new WorkflowExecutorConfirmAggregate(objectMapper, optionsRetry);
case IMPORT -> workflowExecutor = new WorkflowExecutorImport(objectMapper, optionsRetry);
case USERS -> workflowExecutor = new WorkflowExecutorForUsers(objectMapper, optionsRetry);
case INCREMENT_REGISTRATION_AGGREGATOR -> workflowExecutor = new WorkflowExecutorIncrementRegistrationAggregator(objectMapper, optionsRetry, onboardingMapper);
default -> throw new IllegalArgumentException("Workflow options not found!");
}

Expand Down Expand Up @@ -269,6 +273,12 @@ public String createDelegationForAggregation(@DurableActivityTrigger(name = "onb
return completionService.createDelegation(readOnboardingValue(objectMapper, onboardingString));
}

@FunctionName(EXISTS_DELEGATION_ACTIVITY)
public String existsDelegation(@DurableActivityTrigger(name = "onboardingString") String onboardingString, final ExecutionContext context) {
context.getLogger().info(String.format(FORMAT_LOGGER_ONBOARDING_STRING, EXISTS_DELEGATION_ACTIVITY, onboardingString));
return completionService.existsDelegation(readOnboardingAggregateOrchestratorInputValue(objectMapper, onboardingString));
}

/**
* This HTTP-triggered function retrieves onboarding given its identifier
* After that, It sends a message on topics through the event bus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class ActivityName {
public static final String CREATE_DELEGATION_ACTIVITY = "CreateDelegation";
public static final String ONBOARDINGS_AGGREGATE_ORCHESTRATOR = "OnboardingsAggregate";
public static final String RESEND_NOTIFICATIONS_ACTIVITY = "ResendNotificationsActivity";
public static final String EXISTS_DELEGATION_ACTIVITY = "ExistsDelegationActivity";

private ActivityName() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ public interface CompletionService {
String createAggregateOnboardingRequest(OnboardingAggregateOrchestratorInput onboardingAggregateOrchestratorInput);

void sendTestEmail(ExecutionContext context);

String existsDelegation(OnboardingAggregateOrchestratorInput onboardingAggregateOrchestratorInput);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import jakarta.inject.Inject;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.rest.client.inject.RestClient;
Expand All @@ -43,6 +44,7 @@
import static it.pagopa.selfcare.onboarding.common.WorkflowType.CONFIRMATION_AGGREGATE;
import static it.pagopa.selfcare.onboarding.service.OnboardingService.USERS_FIELD_LIST;
import static jakarta.ws.rs.core.Response.Status.Family.SUCCESSFUL;
import static org.openapi.quarkus.core_json.model.DelegationResponse.StatusEnum.ACTIVE;

@ApplicationScoped
public class CompletionServiceDefault implements CompletionService {
Expand Down Expand Up @@ -236,6 +238,24 @@ public void sendTestEmail(ExecutionContext context) {
notificationService.sendTestEmail(context);
}

@Override
public String existsDelegation(OnboardingAggregateOrchestratorInput input) {
boolean existsDelegation = false;

if (Objects.nonNull(input) && Objects.nonNull(input.getInstitution()) && Objects.nonNull(input.getAggregate())) {
try {
DelegationWithPaginationResponse delegationWithPaginationResponse = delegationApi.getDelegationsUsingGET1(null, input.getInstitution().getId(), null, null,
input.getAggregate().getTaxCode(), null, null, null);
if (Objects.nonNull(delegationWithPaginationResponse) && !CollectionUtils.isEmpty(delegationWithPaginationResponse.getDelegations())) {
existsDelegation = delegationWithPaginationResponse.getDelegations().stream().anyMatch(delegation -> ACTIVE.equals(delegation.getStatus()));
}
}catch (WebApplicationException e) {
throw new GenericOnboardingException(String.format("Error during retrieve delegation %s", e.getMessage()));
}
}
return existsDelegation ? "true" : "false";
}

private static DelegationRequest getDelegationRequest(Onboarding onboarding) {
DelegationRequest delegationRequest = new DelegationRequest();
delegationRequest.setProductId(onboarding.getProductId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import it.pagopa.selfcare.onboarding.common.OnboardingStatus;
import it.pagopa.selfcare.onboarding.dto.OnboardingAggregateOrchestratorInput;
import it.pagopa.selfcare.onboarding.entity.AggregateInstitution;
import it.pagopa.selfcare.onboarding.entity.Onboarding;
import it.pagopa.selfcare.onboarding.entity.OnboardingWorkflow;
import it.pagopa.selfcare.onboarding.mapper.OnboardingMapper;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -86,6 +89,18 @@ default Optional<OnboardingStatus> onboardingCompletionUsersActivity(TaskOrchest
return Optional.of(OnboardingStatus.COMPLETED);
}

default void createInstitutionAndOnboardingAggregate(TaskOrchestrationContext ctx, Onboarding onboarding, OnboardingMapper onboardingMapper){
List<Task<String>> parallelTasks = new ArrayList<>();

for (AggregateInstitution aggregate : onboarding.getAggregates()) {
OnboardingAggregateOrchestratorInput onboardingAggregate = onboardingMapper.mapToOnboardingAggregateOrchestratorInput(onboarding, aggregate);
final String onboardingAggregateString = getOnboardingAggregateString(objectMapper(), onboardingAggregate);
parallelTasks.add(ctx.callSubOrchestrator(ONBOARDINGS_AGGREGATE_ORCHESTRATOR, onboardingAggregateString, String.class));
}

ctx.allOf(parallelTasks).await();
}

default Optional<OnboardingStatus> onboardingCompletionActivityWithoutMail(TaskOrchestrationContext ctx, Onboarding onboarding) {
createInstitutionAndOnboarding(ctx, onboarding);
return Optional.of(OnboardingStatus.COMPLETED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,7 @@ public Optional<OnboardingStatus> executePendingState(TaskOrchestrationContext c
String onboardingWithInstitutionIdString = createInstitutionAndOnboarding(ctx, onboardingWorkflow.getOnboarding());
Onboarding onboarding = readOnboardingValue(objectMapper(), onboardingWithInstitutionIdString);

List<Task<String>> parallelTasks = new ArrayList<>();

for (AggregateInstitution aggregate : onboarding.getAggregates()) {
OnboardingAggregateOrchestratorInput onboardingAggregate = onboardingMapper.mapToOnboardingAggregateOrchestratorInput(onboarding, aggregate);
final String onboardingAggregateString = getOnboardingAggregateString(objectMapper(), onboardingAggregate);
parallelTasks.add(ctx.callSubOrchestrator(ONBOARDINGS_AGGREGATE_ORCHESTRATOR, onboardingAggregateString, String.class));
}

ctx.allOf(parallelTasks).await();
createInstitutionAndOnboardingAggregate(ctx, onboarding, onboardingMapper);

ctx.callActivity(SEND_MAIL_COMPLETION_ACTIVITY, getOnboardingWorkflowString(objectMapper(), onboardingWorkflow), optionsRetry, String.class).await();
return Optional.of(OnboardingStatus.COMPLETED);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package it.pagopa.selfcare.onboarding.workflow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import it.pagopa.selfcare.onboarding.common.OnboardingStatus;
import it.pagopa.selfcare.onboarding.entity.Onboarding;
import it.pagopa.selfcare.onboarding.entity.OnboardingWorkflow;
import it.pagopa.selfcare.onboarding.entity.OnboardingWorkflowAggregator;
import it.pagopa.selfcare.onboarding.mapper.OnboardingMapper;

import java.util.Optional;

import static it.pagopa.selfcare.onboarding.entity.OnboardingWorkflowType.AGGREGATOR;

public record WorkflowExecutorIncrementRegistrationAggregator(ObjectMapper objectMapper, TaskOptions optionsRetry,
OnboardingMapper onboardingMapper) implements WorkflowExecutor {

@Override
public Optional<OnboardingStatus> executeRequestState(TaskOrchestrationContext ctx, OnboardingWorkflow onboardingWorkflow) {
return Optional.empty();
}

@Override
public Optional<OnboardingStatus> executeToBeValidatedState(TaskOrchestrationContext ctx, OnboardingWorkflow onboardingWorkflow) {
return Optional.empty();
}

@Override
public Optional<OnboardingStatus> executePendingState(TaskOrchestrationContext ctx, OnboardingWorkflow onboardingWorkflow) {
createInstitutionAndOnboardingAggregate(ctx, onboardingWorkflow.getOnboarding(), onboardingMapper);
return Optional.of(OnboardingStatus.COMPLETED);
}

@Override
public OnboardingWorkflow createOnboardingWorkflow(Onboarding onboarding) {
return new OnboardingWorkflowAggregator(onboarding, AGGREGATOR.name());
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.logging.Logger;

import static it.pagopa.selfcare.onboarding.functions.utils.ActivityName.*;
Expand Down Expand Up @@ -201,6 +198,57 @@ void onboardingOrchestratorContractRegistrationAggregator_Pending(){
function.onboardingsOrchestrator(orchestrationContext, executionContext);
}

@Test
void onboardingOrchestratorIncrementRegistrationAggregator_Pending_delgationAlreadyExists(){
Onboarding onboarding = new Onboarding();
onboarding.setId("onboardingId");
onboarding.setStatus(OnboardingStatus.PENDING);
AggregateInstitution aggregate1 = new AggregateInstitution();
aggregate1.setTaxCode("code1");
AggregateInstitution aggregate2 = new AggregateInstitution();
aggregate2.setTaxCode("code2");
onboarding.setAggregates(List.of(aggregate1, aggregate2));
Institution institution = new Institution();
institution.setId("id");
onboarding.setInstitution(institution);
onboarding.setWorkflowType(WorkflowType.INCREMENT_REGISTRATION_AGGREGATOR);

TaskOrchestrationContext orchestrationContext = mockTaskOrchestrationContextForIncrementAggregator(onboarding, "true");
function.onboardingsOrchestrator(orchestrationContext, executionContext);

Mockito.verify(service, times(1))
.updateOnboardingStatus(onboarding.getId(), OnboardingStatus.COMPLETED);
}

@Test
void onboardingOrchestratorIncrementRegistrationAggregator_Pending_delgationNotExists(){
Onboarding onboarding = new Onboarding();
onboarding.setId("onboardingId");
onboarding.setStatus(OnboardingStatus.PENDING);
AggregateInstitution aggregate1 = new AggregateInstitution();
aggregate1.setTaxCode("code1");
AggregateInstitution aggregate2 = new AggregateInstitution();
aggregate2.setTaxCode("code2");
onboarding.setAggregates(List.of(aggregate1, aggregate2));
Institution institution = new Institution();
institution.setId("id");
onboarding.setInstitution(institution);
onboarding.setWorkflowType(WorkflowType.INCREMENT_REGISTRATION_AGGREGATOR);
TaskOrchestrationContext orchestrationContext = mockTaskOrchestrationContextForIncrementAggregator(onboarding, "false");

function.onboardingsOrchestrator(orchestrationContext, executionContext);

Mockito.verify(orchestrationContext, times(2))
.callSubOrchestrator(eq(ONBOARDINGS_AGGREGATE_ORCHESTRATOR), any(), any());

Mockito.verify(service, times(1))
.updateOnboardingStatus(onboarding.getId(), OnboardingStatus.COMPLETED);
}





@Test
void onboardingsOrchestratorNewAdminRequest() {
Onboarding onboarding = new Onboarding();
Expand Down Expand Up @@ -368,12 +416,12 @@ void onboardingsAggregateOrchestrator(){
function.onboardingsAggregateOrchestrator(orchestrationContext, executionContext);

ArgumentCaptor<String> captorActivity = ArgumentCaptor.forClass(String.class);
verify(orchestrationContext, times(1))
verify(orchestrationContext, times(2))
.callActivity(captorActivity.capture(), any(), any(),any());
verify(orchestrationContext, times(1))
.callSubOrchestrator(eq("Onboardings"), any(), any());

assertEquals(CREATE_AGGREGATE_ONBOARDING_REQUEST_ACTIVITY, captorActivity.getAllValues().get(0));
assertEquals(EXISTS_DELEGATION_ACTIVITY, captorActivity.getAllValues().get(0));
assertEquals(CREATE_AGGREGATE_ONBOARDING_REQUEST_ACTIVITY, captorActivity.getAllValues().get(1));

}
@Test
Expand All @@ -397,10 +445,10 @@ void onboardingsAggregateOrchestrator_resourceNotFound(){
verify(orchestrationContext, times(1))
.callActivity(captorActivity.capture(), any(), any(),any());

assertEquals(CREATE_AGGREGATE_ONBOARDING_REQUEST_ACTIVITY, captorActivity.getAllValues().get(0));
assertEquals(EXISTS_DELEGATION_ACTIVITY, captorActivity.getAllValues().get(0));
verify(service, times(1)).updateOnboardingStatusAndInstanceId(null, OnboardingStatus.FAILED, orchestrationContext.getInstanceId());

}

@Test
void onboardingsAggregateOrchestrator_taskFailed(){
Onboarding onboarding = new Onboarding();
Expand All @@ -422,7 +470,7 @@ void onboardingsAggregateOrchestrator_taskFailed(){
verify(orchestrationContext, times(1))
.callActivity(captorActivity.capture(), any(), any(),any());

assertEquals(CREATE_AGGREGATE_ONBOARDING_REQUEST_ACTIVITY, captorActivity.getAllValues().get(0));
assertEquals(EXISTS_DELEGATION_ACTIVITY, captorActivity.getAllValues().get(0));
verify(service, times(1)).updateOnboardingStatusAndInstanceId(null, OnboardingStatus.FAILED, orchestrationContext.getInstanceId());


Expand Down Expand Up @@ -522,14 +570,29 @@ void onboardingsOrchestratorForApprovePtWhenToBeValidated() {
}

TaskOrchestrationContext mockTaskOrchestrationContext(Onboarding onboarding) {
TaskOrchestrationContext orchestrationContext = mock(TaskOrchestrationContext.class);
when(orchestrationContext.getInput(String.class)).thenReturn(onboarding.getId());
when(service.getOnboarding(anyString())).thenReturn(Optional.of(onboarding));
when(completionService.existsDelegation(any())).thenReturn("false");

Task task = mock(Task.class);
when(orchestrationContext.callActivity(any(),any(),any(),any())).thenReturn(task);
when(orchestrationContext.callSubOrchestrator(any(),any())).thenReturn(task);
when(task.await()).thenReturn("false");
when(orchestrationContext.allOf(anyList())).thenReturn(task);
return orchestrationContext;
}

TaskOrchestrationContext mockTaskOrchestrationContextForIncrementAggregator(Onboarding onboarding, String returnValue) {
TaskOrchestrationContext orchestrationContext = mock(TaskOrchestrationContext.class);
when(orchestrationContext.getInput(String.class)).thenReturn(onboarding.getId());
when(service.getOnboarding(onboarding.getId())).thenReturn(Optional.of(onboarding));
when(completionService.existsDelegation(any())).thenReturn("true");

Task task = mock(Task.class);
when(orchestrationContext.callActivity(any(),any(),any(),any())).thenReturn(task);
when(orchestrationContext.callSubOrchestrator(any(),any())).thenReturn(task);
when(task.await()).thenReturn("example");
when(task.await()).thenReturn(returnValue);
when(orchestrationContext.allOf(anyList())).thenReturn(task);
return orchestrationContext;
}
Expand Down Expand Up @@ -767,6 +830,20 @@ void createDelegationForAggregation() {
.createDelegation(any());
}

@Test
void createDelegationForAggregationIncrement() {
final String onboardingString = "{\"onboardingId\":\"onboardingId\"}";

when(executionContext.getLogger()).thenReturn(Logger.getGlobal());
when(completionService.existsDelegation(any())).thenReturn("true");

String exists = function.existsDelegation(onboardingString, executionContext);

Assertions.assertEquals("true", exists);
verify(completionService, times(1))
.existsDelegation(any());
}

@Test
void sendTestEmail() {
@SuppressWarnings("unchecked") final HttpRequestMessage<Optional<String>> req = mock(HttpRequestMessage.class);
Expand Down
Loading

0 comments on commit eaf2783

Please sign in to comment.