diff --git a/changes/change_operation_api.md b/changes/change_operation_api.md
new file mode 100644
index 00000000..5e272017
--- /dev/null
+++ b/changes/change_operation_api.md
@@ -0,0 +1 @@
+* Redesign `WorkflowEngine` and `Provisioner` APIs to be fluent rather than callbacks
diff --git a/plugin-guide.md b/plugin-guide.md
index 6179b1a2..33bceaed 100644
--- a/plugin-guide.md
+++ b/plugin-guide.md
@@ -9,7 +9,7 @@ using the `provides` keyword. All plugins need to depend only on the
infrastructure.
There are several services that a plugin can provide and a plugin is free to
-provide multiple. Plugins are loaded from JSON data in the Vidarr configuration
+provide multiple. Plugins are loaded from JSON data in the Víðarr configuration
file or, in the case of an unload filter, user requests, using Jackson. Each
plugin can load whatever Jackson-compatible data from JSON it requires. Each
plugin has a small "provider" class which provides type information for
@@ -18,7 +18,7 @@ appropriate class instance. The provider class lists what values for `"type"`
correspond to what Java objects that Jackson should load. Since objects are
instantiated by Jackson, most have a `startup` method that is called after
loading is complete where the plugin can do any initialisation required. If it
-throws exceptions, the Vidarr server will fail to start, which is probably the
+throws exceptions, the Víðarr server will fail to start, which is probably the
correct behaviour for a badly misconfigured plugin.
As an example of a configuration file:
@@ -53,7 +53,7 @@ expected to journal their current state to the database. The `WorkMonitor`
provides methods to journal state to the database for crash recovery and to
provide status information to users.
-Most plugins have a `recover` method. If Vidarr is restarted, the plugin will
+Most plugins have a `recover` method. If Víðarr is restarted, the plugin will
be asked to recover its state from the last state information in journaled to
the database using the `WorkMonitor`. Plugins are expected to be able to pick
up where they left off based only on this information.
@@ -75,29 +75,29 @@ resources must be available at the start of its run and it holds the resource
until the workflow completes (successfully or not), at which point the resource
may be reused by another workflow run. Within quota-type, some require
information (_e.g._, the amount of RAM), while others are based purely on the
-existence of the workflow run (_e.g._, max-in-flight). The priority consumable
+existence of the workflow run (_e.g._, max-in-flight). The priority consumable
resources operates within the restrictions imposed from quota resources and allows
-users to manually set the order in which workflow runs will launch. Other
-resource are more "throttling"-type. These include maintenance schedules and
-Prometheus alerts which block workflow runs from starting but don't track
-anything once the workflow run is underway.
+users to manually set the order in which workflow runs will launch. Other
+resource are more "throttling"-type. These include maintenance schedules and
+Prometheus alerts which block workflow runs from starting but don't track
+anything once the workflow run is underway.
-Consumable resources are long-running. Whenever Vidarr attempts to run a
+Consumable resources are long-running. Whenever Víðarr attempts to run a
workflow, it will consult the consumable resources to see if there is capacity
to run the workflow (the `request` method). At that point the consumable
resource must make a decision as to whether the workflow can proceed. Once the
-workflow has finished running (successfully or not), Vidarr will `release` the
-resource so that it can be used again. When Vidarr restarts, any running
+workflow has finished running (successfully or not), Víðarr will `release` the
+resource so that it can be used again. When Víðarr restarts, any running
workflows will be called with `recover` to indicate that the resource is being
used and the resource cannot stop the workflow even if the resource is
-over-capacity.
+over-capacity.
Consumable resources can request data from the user, if desired. The
`inputFromSubmitter` can return an empty optional to indicate that no
information is required or can indicate the name and type of information that
is required. The `request` and `release` methods will contain a copy of this information,
encoded as JSON, if the submitter provided it. The JSON data has been
-type-checked by Vidarr, so it should be safe to convert to the expected type
+type-checked by Víðarr, so it should be safe to convert to the expected type
using Jackson.
Sometimes, consumable resources are doing scoring that would be helpful to know
@@ -122,9 +122,7 @@ shared directory instead of, say, `/tmp` and ensure the right permissions are
set up.
These are not the responsibility of the plugin author.
-The class `BaseJsonInputProvisioner` is a partial implementation that can store
-crash recovery information in a JSON object of the implementor's choosing,
-making recovery easier.
+This plugin type uses the [operations API](#operations-api).
# Output Provisioners
Output provisioners implement `ca.on.oicr.gsi.vidarr.OutputProvisionerProvider`
@@ -143,9 +141,7 @@ the plugin to validate any configuration metadata provided by the submitter
provision out step will be run with the metadata provided by the submitter and
the output provided by the workflow.
-The class `BaseJsonOutputProvisioner` is a partial implementation that can
-store crash recovery information in a JSON object of the implementer's
-choosing, making recovery easier.
+This plugin type uses the [operations API](#operations-api).
# Runtime Provisioners
Runtime provisioners implement `ca.on.oicr.gsi.vidarr.RuntimeProvisionerProvider`
@@ -159,9 +155,7 @@ This plugin and the workflow plugins must have a mutual understanding of what a
workflow engine's identifier means. That is somewhat the responsibility of the
system administrator.
-The class `BaseJsonRuntimeProvisioner` is a partial implementation that can
-store crash recovery information in a JSON object of the implementer's
-choosing, making recovery easier.
+This plugin type uses the [operations API](#operations-api).
# Workflow Engine
Workflow engines implement `ca.on.oicr.gsi.vidarr.WorkflowEngineProvider`
@@ -174,17 +168,15 @@ which ones are allowed via the `supports` method.
The workflow engine will be given the complete input to the workflow (with real
paths provided by the input provisioners) and the workflow itself. Once the
workflow has completed, it must provide a JSON structure that references the
-output of the workflow. Vidarr will identified the output files generated by
-the workflow engine and they will be passed to the output provisioners.
+output of the workflow. Víðarr will identify the output files generated by the
+workflow engine and they will be passed to the output provisioners.
After the output provisioners have completed, the workflow engine will be
called again to cleanup any output, if this is appropriate. If the workflow
engine does not support cleanup, it should gracefully succeed during the
clean-up (and clean-up recovery) methods.
-The class `BaseJsonWorkflowEngine` is a partial implementation that can store
-crash recovery information in a JSON object of the implementer's choosing,
-making recovery easier.
+This plugin type uses the [operations API](#operations-api).
# Unload Filters
Unload filters implement `ca.on.oicr.gsi.vidarr.UnloadFilterProvider` and
@@ -211,6 +203,67 @@ to filter on runs. A filter could query Pinery and get all the external
identifiers associated with that run and then construct a query based on those
to match workflow runs that use any of those identifiers.
+
+# The Operations API
+Multiple plugins use an operations API rather than direct method calls. The API
+is designed to simplify two messy tasks: asynchronous operations and creating a
+recoverable state. The operations API consists of a few classes in
+`ca.on.oicr.gsi.vidarr`:
+
+- `OperationAction`: the core class that describes an operations process
+- `OperationStep`: a class that describes an asynchronous operation
+- `OperationStatefulStep`: a class that describes an asynchronous operation
+ which reads or modifies the on-going state
+
+The process starts with the plugin generating an original state object. This is
+a plugin-defined record. State objects should _not_ be mutated and using a
+record helps to encourage this. The plugin defines an `OperationAction` that
+describes the process, starting with the original state object, that outlines
+the steps needed to transform that state into the final output expected by the
+plugin.
+
+As Víðarr runs the operations, it keeps track of two values: the state and the
+_current value_. The current value is the output of the previous step and the
+input to the next step. In effect, given steps _A_, _B_, and _C_, the
+operations API will allow writing this as `load(...).then(A).then(B).then(C)`,
+but it will be executing it as `C(B(A(...)))`, where the return value of the
+previous step is the only parameter to the next one. This design is preferable
+to direct calls because Víðarr can stop executing the task when it needs to
+wait and restart it later, removing the burden of asynchronous scheduling from
+the plugin author.
+
+To start, a call to `OperationAction.load` or `OperationAction.value` is
+required. This primes the sequence of events by computing a value from the
+state information alone which will be the input for the first step. After this,
+`then` may be called to manipulate this value. Additionally, there are
+convenience methods `map`, to modify the current value, and `reload`, to discard
+the current value and load a new one from the state.
+
+`OperationStep` is technically a subset of `OperationStatefulStep`, but it is
+implemented separately because the restricted design of `OperationStep` has
+simpler type constraints, producing better errors during development.
+
+How state is managed along this chain of events is intentionally hidden to
+simplify the process. A `OperationStatefulStep` can wrap the state in
+additional information. For instance, `repeatUntilSuccess` needs to track the
+number of times it attempted an operation, so it wraps the state in
+`RepeatCounter`. When the chain is executed, Víðarr takes the original state
+and wraps it in classes like `RepeatCounter` to build up a state that tracks
+all of the paths required by the chain. It can then write this wrapped state to
+the database and, if Víðarr restarts, it can recover the operations
+automatically using this state information.
+
+This means that the steps along the way are automatically wrapping and
+unwrapping state along the way so that the correct information is stored in the
+database. One caveat is that if the structure of the operation of this code
+changes, then so does the state stored in the database. Meaning that
+redesigning the operations may mean that Víðarr cannot recover. Having the
+plugin programmer manually manage state does allow them to have better control
+over this scenario, but for a lot of overhead in the plugin implementation. The
+interfaces Víðarr uses intentionally hide the state type behind a wild card
+(`?`) generic to simplify writing plugins, but any changes to this type will
+cause recovery issues.
+
# Provided Implementations
This core implementation provides several plugins independent of external
systems.
diff --git a/vidarr-cli/src/main/java/ca/on/oicr/gsi/vidarr/cli/SingleShotOperation.java b/vidarr-cli/src/main/java/ca/on/oicr/gsi/vidarr/cli/SingleShotOperation.java
index c4690fd3..eda7781d 100644
--- a/vidarr-cli/src/main/java/ca/on/oicr/gsi/vidarr/cli/SingleShotOperation.java
+++ b/vidarr-cli/src/main/java/ca/on/oicr/gsi/vidarr/cli/SingleShotOperation.java
@@ -1,7 +1,7 @@
package ca.on.oicr.gsi.vidarr.cli;
-import ca.on.oicr.gsi.vidarr.core.ActiveOperation;
-import ca.on.oicr.gsi.vidarr.core.OperationStatus;
+import ca.on.oicr.gsi.vidarr.ActiveOperation;
+import ca.on.oicr.gsi.vidarr.OperationStatus;
import com.fasterxml.jackson.databind.JsonNode;
import java.lang.System.Logger.Level;
diff --git a/vidarr-cli/src/main/java/ca/on/oicr/gsi/vidarr/cli/TargetConfiguration.java b/vidarr-cli/src/main/java/ca/on/oicr/gsi/vidarr/cli/TargetConfiguration.java
index bc88c375..e720dfec 100644
--- a/vidarr-cli/src/main/java/ca/on/oicr/gsi/vidarr/cli/TargetConfiguration.java
+++ b/vidarr-cli/src/main/java/ca/on/oicr/gsi/vidarr/cli/TargetConfiguration.java
@@ -5,7 +5,7 @@
import ca.on.oicr.gsi.vidarr.core.Target;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -14,51 +14,52 @@
/** Configuration file format for workflow testing */
public final class TargetConfiguration {
- private WorkflowEngine engine;
- private List inputs;
- private List outputs;
- private List runtimes;
+ private WorkflowEngine, ?> engine;
+ private List> inputs;
+ private List> outputs;
+ private List> runtimes;
@JsonCreator
- public TargetConfiguration(@JsonProperty("engine") WorkflowEngine engine,
- @JsonProperty("inputs") List inputs,
- @JsonProperty("outputs") List outputs,
- @JsonProperty("runtimes") List runtimes) {
+ public TargetConfiguration(
+ @JsonProperty("engine") WorkflowEngine, ?> engine,
+ @JsonProperty("inputs") List> inputs,
+ @JsonProperty("outputs") List> outputs,
+ @JsonProperty("runtimes") List> runtimes) {
this.engine = Objects.requireNonNull(engine, "Engine object missing from config");
this.inputs = Objects.requireNonNull(inputs, "Inputs object missing from config");
this.outputs = Objects.requireNonNull(outputs, "Outputs object missing from config");
this.runtimes = Objects.requireNonNull(runtimes, "Runtimes object missing from config");
}
- public WorkflowEngine getEngine() {
+ public WorkflowEngine, ?> getEngine() {
return engine;
}
- public List getInputs() {
+ public List> getInputs() {
return inputs;
}
- public List getOutputs() {
+ public List> getOutputs() {
return outputs;
}
- public List getRuntimes() {
+ public List> getRuntimes() {
return runtimes;
}
- public void setEngine(WorkflowEngine engine) {
+ public void setEngine(WorkflowEngine, ?> engine) {
this.engine = engine;
}
- public void setInputs(List inputs) {
+ public void setInputs(List> inputs) {
this.inputs = inputs;
}
- public void setOutputs(List outputs) {
+ public void setOutputs(List> outputs) {
this.outputs = outputs;
}
- public void setRuntimes(List runtimes) {
+ public void setRuntimes(List> runtimes) {
this.runtimes = runtimes;
}
@@ -74,7 +75,7 @@ public Target toTarget() {
}
engine.startup();
return new Target() {
- final Map inputs =
+ final Map> inputs =
TargetConfiguration.this.inputs.stream()
.flatMap(
p ->
@@ -83,7 +84,7 @@ public Target toTarget() {
.map(f -> new Pair<>(f, p)))
.collect(Collectors.toMap(Pair::first, Pair::second));
- final Map outputs =
+ final Map> outputs =
TargetConfiguration.this.outputs.stream()
.flatMap(
p ->
@@ -92,8 +93,8 @@ public Target toTarget() {
.map(f -> new Pair<>(f, p)))
.collect(Collectors.toMap(Pair::first, Pair::second));
- final List runtimes =
- TargetConfiguration.this.runtimes.stream().collect(Collectors.toList());
+ final List> runtimes =
+ new ArrayList<>(TargetConfiguration.this.runtimes);
@Override
public Stream> consumableResources() {
@@ -101,22 +102,22 @@ public Stream> consumableResources() {
}
@Override
- public WorkflowEngine engine() {
+ public WorkflowEngine, ?> engine() {
return engine;
}
@Override
- public InputProvisioner provisionerFor(InputProvisionFormat type) {
+ public InputProvisioner> provisionerFor(InputProvisionFormat type) {
return inputs.get(type);
}
@Override
- public OutputProvisioner provisionerFor(OutputProvisionFormat type) {
+ public OutputProvisioner, ?> provisionerFor(OutputProvisionFormat type) {
return outputs.get(type);
}
@Override
- public Stream runtimeProvisioners() {
+ public Stream> runtimeProvisioners() {
return runtimes.stream();
}
};
diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ActiveWorkflow.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ActiveWorkflow.java
index 857d00b9..0af45367 100644
--- a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ActiveWorkflow.java
+++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ActiveWorkflow.java
@@ -1,6 +1,7 @@
package ca.on.oicr.gsi.vidarr.core;
import ca.on.oicr.gsi.Pair;
+import ca.on.oicr.gsi.vidarr.ActiveOperation;
import ca.on.oicr.gsi.vidarr.api.ExternalId;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java
index f67caeba..925adb9f 100644
--- a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java
+++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java
@@ -1,19 +1,27 @@
package ca.on.oicr.gsi.vidarr.core;
-import ca.on.oicr.gsi.Pair;
-import ca.on.oicr.gsi.vidarr.*;
+import ca.on.oicr.gsi.vidarr.ActiveOperation;
+import ca.on.oicr.gsi.vidarr.ActiveOperation.TransactionManager;
+import ca.on.oicr.gsi.vidarr.InputProvisionFormat;
+import ca.on.oicr.gsi.vidarr.OperationControlFlow;
+import ca.on.oicr.gsi.vidarr.OperationStatus;
+import ca.on.oicr.gsi.vidarr.OutputProvisionFormat;
import ca.on.oicr.gsi.vidarr.OutputProvisioner.ResultVisitor;
+import ca.on.oicr.gsi.vidarr.WorkflowDefinition;
+import ca.on.oicr.gsi.vidarr.WorkflowEngine;
import ca.on.oicr.gsi.vidarr.WorkflowEngine.Result;
import ca.on.oicr.gsi.vidarr.api.ExternalId;
-import ca.on.oicr.gsi.vidarr.core.PrepareOutputProvisioning.ProvisioningOutWorkMonitor;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.lang.System.Logger.Level;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -27,10 +35,29 @@
public abstract class BaseProcessor<
W extends ActiveWorkflow, PO extends ActiveOperation, TX>
- implements FileResolver {
+ implements TransactionManager, FileResolver {
+
+ OperationControlFlow createNext(
+ PO operation, TerminalHandler