Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #372 - Add Distribution for tasks #373

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.kadai.spi.task.internal.BeforeRequestReviewManager;
import io.kadai.spi.task.internal.CreateTaskPreprocessorManager;
import io.kadai.spi.task.internal.ReviewRequiredManager;
import io.kadai.spi.task.internal.TaskDistributionManager;
import io.kadai.spi.task.internal.TaskEndstatePreprocessorManager;
import java.util.function.Supplier;
import org.apache.ibatis.session.SqlSession;
Expand Down Expand Up @@ -115,6 +116,13 @@ default void executeInDatabaseConnection(Runnable runnable) {
*/
TaskRoutingManager getTaskRoutingManager();

/**
* Retrieve TaskDistributionManager.
*
* @return the TaskDistributionManager instance.
*/
TaskDistributionManager getTaskDistributionManager();

/**
* Retrieve CreateTaskPreprocessorManager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.kadai.spi.task.internal.BeforeRequestReviewManager;
import io.kadai.spi.task.internal.CreateTaskPreprocessorManager;
import io.kadai.spi.task.internal.ReviewRequiredManager;
import io.kadai.spi.task.internal.TaskDistributionManager;
import io.kadai.spi.task.internal.TaskEndstatePreprocessorManager;
import io.kadai.task.api.TaskService;
import io.kadai.task.internal.AttachmentMapper;
Expand Down Expand Up @@ -109,6 +110,7 @@ public class KadaiEngineImpl implements KadaiEngine {
private static final SessionStack SESSION_STACK = new SessionStack();
protected final KadaiConfiguration kadaiConfiguration;
private final TaskRoutingManager taskRoutingManager;
private final TaskDistributionManager taskDistributionManager;
private final CreateTaskPreprocessorManager createTaskPreprocessorManager;
private final PriorityServiceManager priorityServiceManager;
private final ReviewRequiredManager reviewRequiredManager;
Expand Down Expand Up @@ -198,6 +200,7 @@ protected KadaiEngineImpl(
priorityServiceManager = new PriorityServiceManager(this);
historyEventManager = new HistoryEventManager(this);
taskRoutingManager = new TaskRoutingManager(this);
taskDistributionManager = new TaskDistributionManager(this);
reviewRequiredManager = new ReviewRequiredManager(this);
beforeRequestReviewManager = new BeforeRequestReviewManager(this);
afterRequestReviewManager = new AfterRequestReviewManager(this);
Expand Down Expand Up @@ -609,6 +612,11 @@ public TaskRoutingManager getTaskRoutingManager() {
return taskRoutingManager;
}

@Override
public TaskDistributionManager getTaskDistributionManager() {
return taskDistributionManager;
}

@Override
public CreateTaskPreprocessorManager getCreateTaskPreprocessorManager() {
return createTaskPreprocessorManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.kadai.spi.task.api;

import io.kadai.common.api.KadaiEngine;
import io.kadai.task.api.models.Task;
import io.kadai.workbasket.api.models.Workbasket;
import java.util.List;
import java.util.Map;

/**
* The {@code TaskDistributionProvider} interface defines a Service Provider Interface for
* implementing custom {@linkplain Task} distribution strategies within the {@linkplain
* KadaiEngine}.
*
* <p>This interface allows the system to distribute {@linkplain Task}s from a source to one or more
* destination {@linkplain Workbasket}s based on a given strategy and additional context-specific
* information.
*
* <p>The implementation of this interface must be registered as a service provider and will be
* called by KADAI during task distribution operations.
*/
public interface TaskDistributionProvider {

/**
* Initializes the {@linkplain KadaiEngine} for the current KADAI installation.
*
* <p>This method is called during KADAI startup and allows the service provider to access and
* store the active {@linkplain KadaiEngine} instance for later use during task distribution.
*
* @param kadaiEngine the active {@linkplain KadaiEngine} instance initialized for this
* installation
*/
void initialize(KadaiEngine kadaiEngine);

/**
* Determines the distribution of tasks to one or more destination workbaskets based on the
* provided parameters.
*
* <p>This method is invoked by KADAI to calculate the assignment of a set of task IDs to specific
* destination workbaskets using a custom distribution strategy. The method does not directly
* perform the distribution but returns the intended mapping of tasks to workbaskets.
*
* <p><b>Input Parameters:</b>
*
* <ul>
* <li>{@code taskIds (List<String>)}: A list of task IDs to be analyzed for distribution.
* <li>{@code destinationWorkbasketIds (List<String>)}: A list of destination workbasket IDs
* where the tasks are intended to be assigned.
* <li>{@code additionalInformation (Map<String, Object>)}: Additional context-specific details
* that can influence the distribution logic.
* </ul>
*
* <p><b>Output:</b>
*
* <ul>
* <li>The method returns a {@code Map<String, List<String>>}, where each key represents a
* destination workbasket ID, and the corresponding value is a list of task IDs that should
* be assigned to that workbasket.
* <li>The returned mapping provides the intended distribution but does not execute any changes
* in the system.
* </ul>
*
* <p><b>Contract:</b>
*
* <ul>
* <li>The {@code taskIds} and {@code destinationWorkbasketIds} must not be null.
* </ul>
*
* @param taskIds a list of task IDs to be analyzed for distribution
* @param destinationWorkbasketIds a list of destination workbasket IDs where tasks are intended
* to be distributed
* @param additionalInformation a map of additional details for customizing the distribution logic
* @return a {@code Map<String, List<String>>} containing the destination workbasket IDs as keys
* and the corresponding task IDs to be assigned as values
*/
Map<String, List<String>> distributeTasks(
List<String> taskIds,
List<String> destinationWorkbasketIds,
Map<String, Object> additionalInformation);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.kadai.spi.task.internal;

import io.kadai.common.api.KadaiEngine;
import io.kadai.spi.task.api.TaskDistributionProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class DefaultTaskDistributionProvider implements TaskDistributionProvider {

@Override
public void initialize(KadaiEngine kadaiEngine) {
// NOOP
}

@Override
public Map<String, List<String>> distributeTasks(
List<String> taskIds, List<String> workbasketIds, Map<String, Object> additionalInformation) {

if (taskIds == null || taskIds.isEmpty()) {
throw new IllegalArgumentException("Task Ids list cannot be null or empty.");
}
if (workbasketIds == null || workbasketIds.isEmpty()) {
throw new IllegalArgumentException("Ids of destinationWorkbaskets cannot be null or empty.");
}

Map<String, List<String>> distributedTaskIds =
workbasketIds.stream().collect(Collectors.toMap(id -> id, id -> new ArrayList<>()));

int workbasketCount = workbasketIds.size();
IntStream.range(0, taskIds.size())
.forEach(
i ->
distributedTaskIds.get(workbasketIds.get(i % workbasketCount)).add(taskIds.get(i)));

return distributedTaskIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.kadai.spi.task.internal;

import io.kadai.common.api.KadaiEngine;
import io.kadai.common.api.exceptions.InvalidArgumentException;
import io.kadai.common.internal.util.LogSanitizer;
import io.kadai.common.internal.util.SpiLoader;
import io.kadai.spi.task.api.TaskDistributionProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TaskDistributionManager {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskDistributionManager.class);
private final List<TaskDistributionProvider> taskDistributionProviderList;
private final TaskDistributionProvider defaultProvider = new DefaultTaskDistributionProvider();

public TaskDistributionManager(KadaiEngine kadaiEngine) {
List<TaskDistributionProvider> loadedProviders = SpiLoader.load(TaskDistributionProvider.class);

this.taskDistributionProviderList = new ArrayList<>(loadedProviders);
this.taskDistributionProviderList.add(defaultProvider);

for (TaskDistributionProvider provider : taskDistributionProviderList) {
provider.initialize(kadaiEngine);
LOGGER.info("Registered TaskDistribution provider: {}", provider.getClass().getName());
}

if (loadedProviders.isEmpty()) {
LOGGER.info(
"No Custom TaskDistribution Provider found. Using only DefaultTaskDistributionProvider.");
}
}

public TaskDistributionProvider getProviderByName(String name) {
if (name == null) {
return defaultProvider;
}

return taskDistributionProviderList.stream()
.filter(provider -> provider.getClass().getSimpleName().equals(name))
.findFirst()
.orElseThrow(
() ->
new InvalidArgumentException(
String.format("The distribution strategy '%s' does not exist.", name)));
}

public Map<String, List<String>> distributeTasks(
List<String> taskIds,
List<String> destinationWorkbasketIds,
Map<String, Object> additionalInformation,
String distributionStrategyName) {

TaskDistributionProvider provider = getProviderByName(distributionStrategyName);

String sanitizedDistributionStrategyName =
LogSanitizer.stripLineBreakingChars(
distributionStrategyName != null
? distributionStrategyName
: "DefaultTaskDistributionProvider");
LOGGER.info("Using TaskDistributionProvider: {}", sanitizedDistributionStrategyName);

Map<String, List<String>> newTaskDistribution =
provider.distributeTasks(taskIds, destinationWorkbasketIds, additionalInformation);

if (newTaskDistribution == null || newTaskDistribution.isEmpty()) {
throw new InvalidArgumentException(
"The distribution strategy resulted in no task assignments. Please verify the input.");
}

return newTaskDistribution;
}
}
Loading