Skip to content

Commit

Permalink
Redesign workflow engine and provisioner API
Browse files Browse the repository at this point in the history
This replaces the call-back based `WorkMonitor` with the fluent monad
`OperationAction` API. This change will prevent recovery of any inflight
operations.
  • Loading branch information
apmasell committed Feb 14, 2024
1 parent 7c45511 commit 1f9e831
Show file tree
Hide file tree
Showing 100 changed files with 4,109 additions and 2,708 deletions.
1 change: 1 addition & 0 deletions changes/change_operation_api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Redesign `WorkflowEngine` and `Provisioner` APIs to be fluent rather than callbacks
107 changes: 80 additions & 27 deletions plugin-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ using the `provides` keyword. All plugins need to depend only on the
infrastructure.

There are several services that a plugin can provide and a plugin is free to
provide multiple. Plugins are loaded from JSON data in the Vidarr configuration
provide multiple. Plugins are loaded from JSON data in the Víðarr configuration
file or, in the case of an unload filter, user requests, using Jackson. Each
plugin can load whatever Jackson-compatible data from JSON it requires. Each
plugin has a small "provider" class which provides type information for
Expand All @@ -18,7 +18,7 @@ appropriate class instance. The provider class lists what values for `"type"`
correspond to what Java objects that Jackson should load. Since objects are
instantiated by Jackson, most have a `startup` method that is called after
loading is complete where the plugin can do any initialisation required. If it
throws exceptions, the Vidarr server will fail to start, which is probably the
throws exceptions, the Víðarr server will fail to start, which is probably the
correct behaviour for a badly misconfigured plugin.

As an example of a configuration file:
Expand Down Expand Up @@ -53,7 +53,7 @@ expected to journal their current state to the database. The `WorkMonitor`
provides methods to journal state to the database for crash recovery and to
provide status information to users.

Most plugins have a `recover` method. If Vidarr is restarted, the plugin will
Most plugins have a `recover` method. If Víðarr is restarted, the plugin will
be asked to recover its state from the last state information in journaled to
the database using the `WorkMonitor`. Plugins are expected to be able to pick
up where they left off based only on this information.
Expand All @@ -75,29 +75,29 @@ resources must be available at the start of its run and it holds the resource
until the workflow completes (successfully or not), at which point the resource
may be reused by another workflow run. Within quota-type, some require
information (_e.g._, the amount of RAM), while others are based purely on the
existence of the workflow run (_e.g._, max-in-flight). The priority consumable
existence of the workflow run (_e.g._, max-in-flight). The priority consumable
resources operates within the restrictions imposed from quota resources and allows
users to manually set the order in which workflow runs will launch. Other
resource are more "throttling"-type. These include maintenance schedules and
Prometheus alerts which block workflow runs from starting but don't track
anything once the workflow run is underway.
users to manually set the order in which workflow runs will launch. Other
resource are more "throttling"-type. These include maintenance schedules and
Prometheus alerts which block workflow runs from starting but don't track
anything once the workflow run is underway.

Consumable resources are long-running. Whenever Vidarr attempts to run a
Consumable resources are long-running. Whenever Víðarr attempts to run a
workflow, it will consult the consumable resources to see if there is capacity
to run the workflow (the `request` method). At that point the consumable
resource must make a decision as to whether the workflow can proceed. Once the
workflow has finished running (successfully or not), Vidarr will `release` the
resource so that it can be used again. When Vidarr restarts, any running
workflow has finished running (successfully or not), Víðarr will `release` the
resource so that it can be used again. When Víðarr restarts, any running
workflows will be called with `recover` to indicate that the resource is being
used and the resource cannot stop the workflow even if the resource is
over-capacity.
over-capacity.

Consumable resources can request data from the user, if desired. The
`inputFromSubmitter` can return an empty optional to indicate that no
information is required or can indicate the name and type of information that
is required. The `request` and `release` methods will contain a copy of this information,
encoded as JSON, if the submitter provided it. The JSON data has been
type-checked by Vidarr, so it should be safe to convert to the expected type
type-checked by Víðarr, so it should be safe to convert to the expected type
using Jackson.

Sometimes, consumable resources are doing scoring that would be helpful to know
Expand All @@ -122,9 +122,7 @@ shared directory instead of, say, `/tmp` and ensure the right permissions are
set up.
These are not the responsibility of the plugin author.

The class `BaseJsonInputProvisioner` is a partial implementation that can store
crash recovery information in a JSON object of the implementor's choosing,
making recovery easier.
This plugin type uses the [operations API](#operations-api).

# Output Provisioners
Output provisioners implement `ca.on.oicr.gsi.vidarr.OutputProvisionerProvider`
Expand All @@ -143,9 +141,7 @@ the plugin to validate any configuration metadata provided by the submitter
provision out step will be run with the metadata provided by the submitter and
the output provided by the workflow.

The class `BaseJsonOutputProvisioner` is a partial implementation that can
store crash recovery information in a JSON object of the implementer's
choosing, making recovery easier.
This plugin type uses the [operations API](#operations-api).

# Runtime Provisioners
Runtime provisioners implement `ca.on.oicr.gsi.vidarr.RuntimeProvisionerProvider`
Expand All @@ -159,9 +155,7 @@ This plugin and the workflow plugins must have a mutual understanding of what a
workflow engine's identifier means. That is somewhat the responsibility of the
system administrator.

The class `BaseJsonRuntimeProvisioner` is a partial implementation that can
store crash recovery information in a JSON object of the implementer's
choosing, making recovery easier.
This plugin type uses the [operations API](#operations-api).

# Workflow Engine
Workflow engines implement `ca.on.oicr.gsi.vidarr.WorkflowEngineProvider`
Expand All @@ -174,17 +168,15 @@ which ones are allowed via the `supports` method.
The workflow engine will be given the complete input to the workflow (with real
paths provided by the input provisioners) and the workflow itself. Once the
workflow has completed, it must provide a JSON structure that references the
output of the workflow. Vidarr will identified the output files generated by
the workflow engine and they will be passed to the output provisioners.
output of the workflow. Víðarr will identify the output files generated by the
workflow engine and they will be passed to the output provisioners.

After the output provisioners have completed, the workflow engine will be
called again to cleanup any output, if this is appropriate. If the workflow
engine does not support cleanup, it should gracefully succeed during the
clean-up (and clean-up recovery) methods.

The class `BaseJsonWorkflowEngine` is a partial implementation that can store
crash recovery information in a JSON object of the implementer's choosing,
making recovery easier.
This plugin type uses the [operations API](#operations-api).

# Unload Filters
Unload filters implement `ca.on.oicr.gsi.vidarr.UnloadFilterProvider` and
Expand All @@ -211,6 +203,67 @@ to filter on runs. A filter could query Pinery and get all the external
identifiers associated with that run and then construct a query based on those
to match workflow runs that use any of those identifiers.

<a id="operations-api"></a>
# The Operations API
Multiple plugins use an operations API rather than direct method calls. The API
is designed to simplify two messy tasks: asynchronous operations and creating a
recoverable state. The operations API consists of a few classes in
`ca.on.oicr.gsi.vidarr`:

- `OperationAction`: the core class that describes an operations process
- `OperationStep`: a class that describes an asynchronous operation
- `OperationStatefulStep`: a class that describes an asynchronous operation
which reads or modifies the on-going state

The process starts with the plugin generating an original state object. This is
a plugin-defined record. State objects should _not_ be mutated and using a
record helps to encourage this. The plugin defines an `OperationAction` that
describes the process, starting with the original state object, that outlines
the steps needed to transform that state into the final output expected by the
plugin.

As Víðarr runs the operations, it keeps track of two values: the state and the
_current value_. The current value is the output of the previous step and the
input to the next step. In effect, given steps _A_, _B_, and _C_, the
operations API will allow writing this as `load(...).then(A).then(B).then(C)`,
but it will be executing it as `C(B(A(...)))`, where the return value of the
previous step is the only parameter to the next one. This design is preferable
to direct calls because Víðarr can stop executing the task when it needs to
wait and restart it later, removing the burden of asynchronous scheduling from
the plugin author.

To start, a call to `OperationAction.load` or `OperationAction.value` is
required. This primes the sequence of events by computing a value from the
state information alone which will be the input for the first step. After this,
`then` may be called to manipulate this value. Additionally, there are
convenience methods `map`, to modify the current value, and `reload`, to discard
the current value and load a new one from the state.

`OperationStep` is technically a subset of `OperationStatefulStep`, but it is
implemented separately because the restricted design of `OperationStep` has
simpler type constraints, producing better errors during development.

How state is managed along this chain of events is intentionally hidden to
simplify the process. A `OperationStatefulStep` can wrap the state in
additional information. For instance, `repeatUntilSuccess` needs to track the
number of times it attempted an operation, so it wraps the state in
`RepeatCounter`. When the chain is executed, Víðarr takes the original state
and wraps it in classes like `RepeatCounter` to build up a state that tracks
all of the paths required by the chain. It can then write this wrapped state to
the database and, if Víðarr restarts, it can recover the operations
automatically using this state information.

This means that the steps along the way are automatically wrapping and
unwrapping state along the way so that the correct information is stored in the
database. One caveat is that if the structure of the operation of this code
changes, then so does the state stored in the database. Meaning that
redesigning the operations may mean that Víðarr cannot recover. Having the
plugin programmer manually manage state does allow them to have better control
over this scenario, but for a lot of overhead in the plugin implementation. The
interfaces Víðarr uses intentionally hide the state type behind a wild card
(`?`) generic to simplify writing plugins, but any changes to this type will
cause recovery issues.

# Provided Implementations
This core implementation provides several plugins independent of external
systems.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ca.on.oicr.gsi.vidarr.cli;

import ca.on.oicr.gsi.vidarr.core.ActiveOperation;
import ca.on.oicr.gsi.vidarr.core.OperationStatus;
import ca.on.oicr.gsi.vidarr.ActiveOperation;
import ca.on.oicr.gsi.vidarr.OperationStatus;
import com.fasterxml.jackson.databind.JsonNode;
import java.lang.System.Logger.Level;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import ca.on.oicr.gsi.vidarr.core.Target;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -14,51 +14,52 @@

/** Configuration file format for workflow testing */
public final class TargetConfiguration {
private WorkflowEngine engine;
private List<InputProvisioner> inputs;
private List<OutputProvisioner> outputs;
private List<RuntimeProvisioner> runtimes;
private WorkflowEngine<?, ?> engine;
private List<InputProvisioner<?>> inputs;
private List<OutputProvisioner<?, ?>> outputs;
private List<RuntimeProvisioner<?>> runtimes;

@JsonCreator
public TargetConfiguration(@JsonProperty("engine") WorkflowEngine engine,
@JsonProperty("inputs") List<InputProvisioner> inputs,
@JsonProperty("outputs") List<OutputProvisioner> outputs,
@JsonProperty("runtimes") List<RuntimeProvisioner> runtimes) {
public TargetConfiguration(
@JsonProperty("engine") WorkflowEngine<?, ?> engine,
@JsonProperty("inputs") List<InputProvisioner<?>> inputs,
@JsonProperty("outputs") List<OutputProvisioner<?, ?>> outputs,
@JsonProperty("runtimes") List<RuntimeProvisioner<?>> runtimes) {
this.engine = Objects.requireNonNull(engine, "Engine object missing from config");
this.inputs = Objects.requireNonNull(inputs, "Inputs object missing from config");
this.outputs = Objects.requireNonNull(outputs, "Outputs object missing from config");
this.runtimes = Objects.requireNonNull(runtimes, "Runtimes object missing from config");
}

public WorkflowEngine getEngine() {
public WorkflowEngine<?, ?> getEngine() {
return engine;
}

public List<InputProvisioner> getInputs() {
public List<InputProvisioner<?>> getInputs() {
return inputs;
}

public List<OutputProvisioner> getOutputs() {
public List<OutputProvisioner<?, ?>> getOutputs() {
return outputs;
}

public List<RuntimeProvisioner> getRuntimes() {
public List<RuntimeProvisioner<?>> getRuntimes() {
return runtimes;
}

public void setEngine(WorkflowEngine engine) {
public void setEngine(WorkflowEngine<?, ?> engine) {
this.engine = engine;
}

public void setInputs(List<InputProvisioner> inputs) {
public void setInputs(List<InputProvisioner<?>> inputs) {
this.inputs = inputs;
}

public void setOutputs(List<OutputProvisioner> outputs) {
public void setOutputs(List<OutputProvisioner<?, ?>> outputs) {
this.outputs = outputs;
}

public void setRuntimes(List<RuntimeProvisioner> runtimes) {
public void setRuntimes(List<RuntimeProvisioner<?>> runtimes) {
this.runtimes = runtimes;
}

Expand All @@ -74,7 +75,7 @@ public Target toTarget() {
}
engine.startup();
return new Target() {
final Map<InputProvisionFormat, InputProvisioner> inputs =
final Map<InputProvisionFormat, InputProvisioner<?>> inputs =
TargetConfiguration.this.inputs.stream()
.flatMap(
p ->
Expand All @@ -83,7 +84,7 @@ public Target toTarget() {
.map(f -> new Pair<>(f, p)))
.collect(Collectors.toMap(Pair::first, Pair::second));

final Map<OutputProvisionFormat, OutputProvisioner> outputs =
final Map<OutputProvisionFormat, OutputProvisioner<?, ?>> outputs =
TargetConfiguration.this.outputs.stream()
.flatMap(
p ->
Expand All @@ -92,31 +93,31 @@ public Target toTarget() {
.map(f -> new Pair<>(f, p)))
.collect(Collectors.toMap(Pair::first, Pair::second));

final List<RuntimeProvisioner> runtimes =
TargetConfiguration.this.runtimes.stream().collect(Collectors.toList());
final List<RuntimeProvisioner<?>> runtimes =
new ArrayList<>(TargetConfiguration.this.runtimes);

@Override
public Stream<Pair<String, ConsumableResource>> consumableResources() {
return Stream.empty();
}

@Override
public WorkflowEngine engine() {
public WorkflowEngine<?, ?> engine() {
return engine;
}

@Override
public InputProvisioner provisionerFor(InputProvisionFormat type) {
public InputProvisioner<?> provisionerFor(InputProvisionFormat type) {
return inputs.get(type);
}

@Override
public OutputProvisioner provisionerFor(OutputProvisionFormat type) {
public OutputProvisioner<?, ?> provisionerFor(OutputProvisionFormat type) {
return outputs.get(type);
}

@Override
public Stream<RuntimeProvisioner> runtimeProvisioners() {
public Stream<RuntimeProvisioner<?>> runtimeProvisioners() {
return runtimes.stream();
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ca.on.oicr.gsi.vidarr.core;

import ca.on.oicr.gsi.Pair;
import ca.on.oicr.gsi.vidarr.ActiveOperation;
import ca.on.oicr.gsi.vidarr.api.ExternalId;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down
Loading

0 comments on commit 1f9e831

Please sign in to comment.