> knownIds =
+ ServiceLoader.load(PriorityInputProvider.class).stream()
+ .map(Provider::get)
+ .flatMap(PriorityInputProvider::types)
+ .collect(Collectors.toMap(Pair::first, Pair::second));
+
+ @Override
+ public Id getMechanism() {
+ return Id.CUSTOM;
+ }
+
+ @Override
+ public String idFromValue(Object o) {
+ return knownIds.entrySet().stream()
+ .filter(known -> known.getValue().isInstance(o))
+ .map(Entry::getKey)
+ .findFirst()
+ .orElseThrow();
+ }
+
+ @Override
+ public String idFromValueAndType(Object o, Class> aClass) {
+ return idFromValue(o);
+ }
+
+ @Override
+ public JavaType typeFromId(DatabindContext context, String id) throws IOException {
+ final var clazz = knownIds.get(id);
+ return clazz == null ? null : context.constructType(clazz);
+ }
+ }
+
+ /**
+ * Compute the integer priority for the submission request
+ *
+ * @param workflowName the name of the workflow
+ * @param workflowVersion the version of the workflow
+ * @param created the time the workflow was first submitted
+ * @param input the data included as part of the submission
+ * @return the score
+ */
+ int compute(String workflowName, String workflowVersion, Instant created, JsonNode input);
+
+ /**
+ * Priority inputs may provide an optional HTTP API to extend their functionality that will be
+ * accessible through their parent consumable resource.
+ *
+ * Paths will be automatically prefixed with the instance's name, so the HTTP handler can
+ * assume it is at the root URL (i.e., /
).
+ *
+ * @return the HTTP handler to use
+ */
+ Optional httpHandler();
+
+ /**
+ * The type of data that must be provided as part of the submission request
+ *
+ * @return the type of data required
+ */
+ BasicType inputFromSubmitter();
+
+ /**
+ * Perform any initialization required by this input
+ *
+ * @param resourceName the name of the consumable resource that ultimately owns this input
+ * @param inputName a unique identifier; depending on the configuration, this name may not be a
+ * valid Shesmu identifier. It should only be used for logging/caching purposes.
+ */
+ void startup(String resourceName, String inputName);
+}
diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInputProvider.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInputProvider.java
new file mode 100644
index 00000000..a0e65fe6
--- /dev/null
+++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInputProvider.java
@@ -0,0 +1,10 @@
+package ca.on.oicr.gsi.vidarr;
+
+import ca.on.oicr.gsi.Pair;
+import java.util.stream.Stream;
+
+/** Reads JSON configuration and instantiates priority inputs appropriately */
+public interface PriorityInputProvider {
+ /** Provides the type names and classes this plugin provides */
+ Stream>> types();
+}
diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorer.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorer.java
new file mode 100644
index 00000000..0351d3dc
--- /dev/null
+++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorer.java
@@ -0,0 +1,111 @@
+package ca.on.oicr.gsi.vidarr;
+
+import ca.on.oicr.gsi.Pair;
+import ca.on.oicr.gsi.vidarr.PriorityScorer.PriorityScorerIdResolver;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import com.fasterxml.jackson.databind.DatabindContext;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
+import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
+import io.undertow.server.HttpHandler;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.ServiceLoader;
+import java.util.ServiceLoader.Provider;
+import java.util.stream.Collectors;
+
+/** Examine a score to determine if a workflow run should be allowed to proceed or wait */
+@JsonTypeIdResolver(PriorityScorerIdResolver.class)
+@JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = "type")
+public interface PriorityScorer {
+
+ final class PriorityScorerIdResolver extends TypeIdResolverBase {
+
+ private final Map> knownIds =
+ ServiceLoader.load(PriorityScorerProvider.class).stream()
+ .map(Provider::get)
+ .flatMap(PriorityScorerProvider::types)
+ .collect(Collectors.toMap(Pair::first, Pair::second));
+
+ @Override
+ public Id getMechanism() {
+ return Id.CUSTOM;
+ }
+
+ @Override
+ public String idFromValue(Object o) {
+ return knownIds.entrySet().stream()
+ .filter(known -> known.getValue().isInstance(o))
+ .map(Entry::getKey)
+ .findFirst()
+ .orElseThrow();
+ }
+
+ @Override
+ public String idFromValueAndType(Object o, Class> aClass) {
+ return idFromValue(o);
+ }
+
+ @Override
+ public JavaType typeFromId(DatabindContext context, String id) throws IOException {
+ final var clazz = knownIds.get(id);
+ return clazz == null ? null : context.constructType(clazz);
+ }
+ }
+
+ /**
+ * Determine if the workflow run can begin now
+ *
+ * @param workflowName the name of the workflow
+ * @param workflowVersion the version of the workflow
+ * @param vidarrId the workflow run identifier
+ * @param created the time when the workflow run was first submitted
+ * @param workflowMaxInFlight the max-in-flight value for this workflow, if available
+ * @param score the computed score for this workflow run
+ * @return true if the workflow run may proceed; false if it should wait
+ */
+ boolean compute(
+ String workflowName,
+ String workflowVersion,
+ String vidarrId,
+ Instant created,
+ OptionalInt workflowMaxInFlight,
+ int score);
+ /**
+ * Priority scorers may provide an optional HTTP API to extend their functionality that will be
+ * accessible through their parent consumable resource.
+ *
+ * Paths will be automatically prefixed with the instance's name, so the HTTP handler can
+ * assume it is at the root URL (i.e., /
).
+ *
+ * @return the HTTP handler to use
+ */
+ Optional httpHandler();
+
+ /**
+ * Indicate that Vidarr has restarted and this workflow run will be started even if this scorer
+ * would make it wait.
+ *
+ * @param workflowName the name of the workflow
+ * @param workflowVersion the version of the workflow
+ * @param vidarrId the identifier of the workflow run
+ */
+ void recover(String workflowName, String workflowVersion, String vidarrId);
+
+ /**
+ * Indicate that the workflow run has completed and the score should purge any state about it
+ *
+ * @param workflowName the name of the workflow
+ * @param workflowVersion the version of the workflow
+ * @param vidarrId the identifier of the workflow run
+ */
+ void release(String workflowName, String workflowVersion, String vidarrId);
+ /** Perform any initialization required by this input */
+ void startup();
+}
diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorerProvider.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorerProvider.java
new file mode 100644
index 00000000..a08e23d7
--- /dev/null
+++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorerProvider.java
@@ -0,0 +1,10 @@
+package ca.on.oicr.gsi.vidarr;
+
+import ca.on.oicr.gsi.Pair;
+import java.util.stream.Stream;
+
+/** Reads JSON configuration and instantiates priority scorer appropriately */
+public interface PriorityScorerProvider {
+ /** Provides the type names and classes this plugin provides */
+ Stream>> types();
+}
diff --git a/vidarr-pluginapi/src/main/java/module-info.java b/vidarr-pluginapi/src/main/java/module-info.java
index 34709020..95575f80 100644
--- a/vidarr-pluginapi/src/main/java/module-info.java
+++ b/vidarr-pluginapi/src/main/java/module-info.java
@@ -1,6 +1,9 @@
import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider;
import ca.on.oicr.gsi.vidarr.InputProvisionerProvider;
import ca.on.oicr.gsi.vidarr.OutputProvisionerProvider;
+import ca.on.oicr.gsi.vidarr.PriorityFormulaProvider;
+import ca.on.oicr.gsi.vidarr.PriorityInputProvider;
+import ca.on.oicr.gsi.vidarr.PriorityScorerProvider;
import ca.on.oicr.gsi.vidarr.RuntimeProvisionerProvider;
import ca.on.oicr.gsi.vidarr.UnloadFilterProvider;
import ca.on.oicr.gsi.vidarr.WorkflowEngineProvider;
@@ -12,12 +15,15 @@
* plugins are expected to provide as well as accessory data required or provided.
*/
module ca.on.oicr.gsi.vidarr.pluginapi {
- uses UnloadFilterProvider;
- uses WorkflowEngineProvider;
uses ConsumableResourceProvider;
uses InputProvisionerProvider;
uses OutputProvisionerProvider;
+ uses PriorityFormulaProvider;
+ uses PriorityInputProvider;
+ uses PriorityScorerProvider;
uses RuntimeProvisionerProvider;
+ uses UnloadFilterProvider;
+ uses WorkflowEngineProvider;
exports ca.on.oicr.gsi.vidarr;
exports ca.on.oicr.gsi.vidarr.api;
diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/PrometheusPriorityInput.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/PrometheusPriorityInput.java
new file mode 100644
index 00000000..c7e8f618
--- /dev/null
+++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/PrometheusPriorityInput.java
@@ -0,0 +1,129 @@
+package ca.on.oicr.gsi.vidarr.prometheus;
+
+import ca.on.oicr.gsi.Pair;
+import ca.on.oicr.gsi.vidarr.BasicType;
+import ca.on.oicr.gsi.vidarr.PriorityInput;
+import ca.on.oicr.gsi.vidarr.PriorityInputProvider;
+import com.fasterxml.jackson.databind.JsonNode;
+import io.undertow.server.HttpHandler;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+public final class PrometheusPriorityInput implements PriorityInput {
+ private static boolean checkLabel(Map metric, String label, String value) {
+ if (label == null) {
+ return true;
+ }
+ final var entry = metric.get(label);
+ return entry != null && entry.equals(value);
+ }
+
+ public static PriorityInputProvider provider() {
+ return () -> Stream.of(new Pair<>("prometheus", PrometheusPriorityInput.class));
+ }
+
+ private VectorCache cache;
+ private Integer cacheRequestTimeout;
+ private Integer cacheTtl;
+ private int defaultPriority;
+ private List labels;
+ private String query;
+ private String url;
+ private String workflowNameLabel;
+ private String workflowVersionLabel;
+
+ @Override
+ public int compute(String workflowName, String workflowVersion, Instant created, JsonNode input) {
+ return cache
+ .get()
+ .filter(
+ result ->
+ labels.stream()
+ .allMatch(
+ label -> {
+ final var entry = result.getMetric().get(label);
+ return entry != null && entry.equals(input.get(label).asText());
+ })
+ && checkLabel(result.getMetric(), workflowNameLabel, workflowName)
+ && checkLabel(result.getMetric(), workflowVersionLabel, workflowVersion))
+ .flatMap(result -> result.getValue().stream())
+ .mapToInt(Float::intValue)
+ .findFirst()
+ .orElse(defaultPriority);
+ }
+
+ public int getDefaultPriority() {
+ return defaultPriority;
+ }
+
+ public List getLabels() {
+ return labels;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getWorkflowNameLabel() {
+ return workflowNameLabel;
+ }
+
+ public String getWorkflowVersionLabel() {
+ return workflowVersionLabel;
+ }
+
+ @Override
+ public Optional httpHandler() {
+ return Optional.empty();
+ }
+
+ @Override
+ public BasicType inputFromSubmitter() {
+ return BasicType.object(labels.stream().map(label -> new Pair<>(label, BasicType.STRING)));
+ }
+
+ public void setCacheRequestTimeout(Integer cacheRequestTimeout) {
+ this.cacheRequestTimeout = cacheRequestTimeout;
+ }
+
+ public void setCacheTtl(Integer cacheTtl) {
+ this.cacheTtl = cacheTtl;
+ }
+
+ public void setDefaultPriority(int defaultPriority) {
+ this.defaultPriority = defaultPriority;
+ }
+
+ public void setLabels(List labels) {
+ this.labels = labels;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public void setWorkflowNameLabel(String workflowNameLabel) {
+ this.workflowNameLabel = workflowNameLabel;
+ }
+
+ public void setWorkflowVersionLabel(String workflowVersionLabel) {
+ this.workflowVersionLabel = workflowVersionLabel;
+ }
+
+ @Override
+ public void startup(String resourceName, String inputName) {
+ cache =
+ new VectorCache(resourceName + " " + inputName, url, query, cacheRequestTimeout, cacheTtl);
+ }
+}
diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/QueryResponseDto.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/QueryResponseDto.java
new file mode 100644
index 00000000..ee800dcf
--- /dev/null
+++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/QueryResponseDto.java
@@ -0,0 +1,23 @@
+package ca.on.oicr.gsi.vidarr.prometheus;
+
+public final class QueryResponseDto {
+
+ private VectorDataDto data;
+ private String status;
+
+ public VectorDataDto getData() {
+ return data;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setData(VectorDataDto data) {
+ this.data = data;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+}
diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorCache.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorCache.java
new file mode 100644
index 00000000..1dfc992a
--- /dev/null
+++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorCache.java
@@ -0,0 +1,51 @@
+package ca.on.oicr.gsi.vidarr.prometheus;
+
+import static ca.on.oicr.gsi.vidarr.prometheus.AlertmanagerAutoInhibitConsumableResource.MAPPER;
+
+import ca.on.oicr.gsi.cache.ReplacingRecord;
+import ca.on.oicr.gsi.cache.ValueCache;
+import ca.on.oicr.gsi.vidarr.JsonBodyHandler;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpRequest.BodyPublishers;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.stream.Stream;
+
+public final class VectorCache extends ValueCache> {
+ private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient();
+ private final String prometheusUrl;
+ private final String query;
+ private final Integer requestTimeout;
+
+ public VectorCache(
+ String name, String prometheusUrl, String query, Integer requestTimeout, Integer ttl) {
+ super("prometheus " + name, ttl, ReplacingRecord::new);
+ this.prometheusUrl = prometheusUrl;
+ this.query = query;
+ this.requestTimeout = requestTimeout;
+ }
+
+ protected Stream fetch(Instant lastUpdated) throws Exception {
+ if (prometheusUrl == null) {
+ return Stream.empty();
+ }
+ var response =
+ HTTP_CLIENT.send(
+ HttpRequest.newBuilder(URI.create(String.format("%s/api/v1/query", prometheusUrl)))
+ .POST(
+ BodyPublishers.ofString(
+ "query=" + URLEncoder.encode(query, StandardCharsets.UTF_8)))
+ .timeout(Duration.ofMinutes(requestTimeout))
+ .build(),
+ new JsonBodyHandler<>(MAPPER, QueryResponseDto.class));
+ final var result = response.body().get();
+ if (result == null || result.getData() == null) {
+ return Stream.empty();
+ }
+ return result.getData().getResult().stream();
+ }
+}
diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorDataDto.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorDataDto.java
new file mode 100644
index 00000000..daacf7c0
--- /dev/null
+++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorDataDto.java
@@ -0,0 +1,25 @@
+package ca.on.oicr.gsi.vidarr.prometheus;
+
+import java.util.List;
+
+public final class VectorDataDto {
+
+ private List result;
+ private String resultType;
+
+ public List getResult() {
+ return result;
+ }
+
+ public String getResultType() {
+ return resultType;
+ }
+
+ public void setResult(List result) {
+ this.result = result;
+ }
+
+ public void setResultType(String resultType) {
+ this.resultType = resultType;
+ }
+}
diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorResultDto.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorResultDto.java
new file mode 100644
index 00000000..7f75edaf
--- /dev/null
+++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorResultDto.java
@@ -0,0 +1,26 @@
+package ca.on.oicr.gsi.vidarr.prometheus;
+
+import java.util.List;
+import java.util.Map;
+
+public final class VectorResultDto {
+
+ private Map metric;
+ private List value;
+
+ public Map getMetric() {
+ return metric;
+ }
+
+ public List getValue() {
+ return value;
+ }
+
+ public void setMetric(Map metric) {
+ this.metric = metric;
+ }
+
+ public void setValue(List value) {
+ this.value = value;
+ }
+}
diff --git a/vidarr-prometheus/src/main/java/module-info.java b/vidarr-prometheus/src/main/java/module-info.java
index f66bb1f6..1ff53d4e 100644
--- a/vidarr-prometheus/src/main/java/module-info.java
+++ b/vidarr-prometheus/src/main/java/module-info.java
@@ -1,5 +1,7 @@
import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider;
+import ca.on.oicr.gsi.vidarr.PriorityInputProvider;
import ca.on.oicr.gsi.vidarr.prometheus.AlertmanagerAutoInhibitConsumableResource;
+import ca.on.oicr.gsi.vidarr.prometheus.PrometheusPriorityInput;
module ca.on.oicr.gsi.vidarr.prometheus {
requires ca.on.oicr.gsi.vidarr.pluginapi;
@@ -13,4 +15,6 @@
provides ConsumableResourceProvider with
AlertmanagerAutoInhibitConsumableResource;
+ provides PriorityInputProvider with
+ PrometheusPriorityInput;
}
diff --git a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java
index b2cbe7b2..401a87ab 100644
--- a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java
+++ b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java
@@ -446,7 +446,6 @@ private static HttpHandler monitor(HttpHandler handler) {
private final Map inputProvisioners;
private final Semaphore loadCounter = new Semaphore(3);
private final MaxInFlightByWorkflow maxInFlightPerWorkflow = new MaxInFlightByWorkflow();
- private final PriorityByWorkflow priorityPerWorkflow = new PriorityByWorkflow();
private final Map otherServers;
private final Map outputProvisioners;
private final int port;
@@ -559,9 +558,7 @@ public void emit(SectionRenderer sectionRenderer)
new Pair<>(name, consumableResources.get(name))),
Stream.of(
new Pair(
- "", maxInFlightPerWorkflow),
- new Pair(
- "priority", priorityPerWorkflow)))
+ "", maxInFlightPerWorkflow)))
.collect(Collectors.toList());
private final WorkflowEngine engine =
workflowEngines.get(e.getValue().getWorkflowEngine());
@@ -719,41 +716,6 @@ protected Optional targetByName(String name) {
.select(WORKFLOW.NAME, WORKFLOW.MAX_IN_FLIGHT)
.from(WORKFLOW)
.forEach(record -> maxInFlightPerWorkflow.set(record.value1(), record.value2()));
-
- DSL.using(connection)
- .select(
- WORKFLOW.NAME,
- WORKFLOW_RUN.HASH_ID,
- ACTIVE_WORKFLOW_RUN.CONSUMABLE_RESOURCES.cast(String.class))
- .from(WORKFLOW)
- .join(WORKFLOW_VERSION)
- .on(WORKFLOW.NAME.eq(WORKFLOW_VERSION.NAME))
- .join(WORKFLOW_RUN)
- .on(WORKFLOW_RUN.WORKFLOW_VERSION_ID.eq(WORKFLOW_VERSION.ID))
- .join(ACTIVE_WORKFLOW_RUN)
- .on(WORKFLOW_RUN.ID.eq(ACTIVE_WORKFLOW_RUN.ID))
- .where(ACTIVE_WORKFLOW_RUN.ENGINE_PHASE.eq(Phase.WAITING_FOR_RESOURCES))
- .forEach(
- record -> {
- try {
- priorityPerWorkflow.set(
- record.value1(),
- record.value2(),
- Optional.ofNullable(
- MAPPER.readTree(
- (record.value3() == null || record.value3() == null)
- ? "{}"
- : record.value3())));
- } catch (JsonProcessingException e) {
- // not a disaster; we might just get some things running out of priority
- // until max-in-flight gets saturated
- System.out.println(
- "Failed to serialize the consumable resources field"
- + " on active workflow run for priority by workflow on startup:"
- + " some actions may temporarily be run out of priority.");
- System.out.println(e.getMessage());
- }
- });
}
unloadDirectory = Path.of(configuration.getUnloadDirectory());
diff --git a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflow.java b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflow.java
deleted file mode 100644
index 38333e73..00000000
--- a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflow.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package ca.on.oicr.gsi.vidarr.server;
-
-import static java.util.Map.Entry.comparingByValue;
-
-import ca.on.oicr.gsi.Pair;
-import ca.on.oicr.gsi.vidarr.BasicType;
-import ca.on.oicr.gsi.vidarr.ConsumableResource;
-import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider;
-import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse;
-import com.fasterxml.jackson.databind.JsonNode;
-import io.prometheus.client.Gauge;
-import java.time.Instant;
-import java.util.AbstractMap.SimpleEntry;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.SortedSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-public final class PriorityByWorkflow implements ConsumableResource {
-
- // 1 is lowest priority
- // 4 is highest priority (will be given access to resources first)
- private final List acceptedPriorities = Arrays.asList(1, 2, 3, 4);
-
- public static ConsumableResourceProvider provider() {
- return () -> Stream.of(new Pair<>("priority", PriorityByWorkflow.class));
- }
-
- private static final class WaitingState {
-
- private final SortedSet> waiting =
- new ConcurrentSkipListSet>(comparingByValue()) {
-
- // Replace if key matches, even if entire pair doesn't
- @Override
- public boolean add(SimpleEntry simpleEntry) {
- SimpleEntry entry = this.getByKey(simpleEntry.getKey());
- if (entry != null) {
- super.remove(entry);
- }
- return super.add(simpleEntry);
- }
-
- public SimpleEntry getByKey(String simpleEntryKey) {
- for (SimpleEntry entry : this) {
- if (entry.getKey().equals(simpleEntryKey)) {
- return (entry);
- }
- }
- return null;
- }
-
- // Override to remove by key instead of pair
- @Override
- public boolean remove(Object o) {
- try {
- String oEntry = (String) o;
- for (SimpleEntry entry : this) {
- if (entry.getKey().equals(oEntry)) {
- super.remove(entry);
- }
- }
- } catch (Exception e) {
- return false;
- }
- return true;
- }
- };
- }
-
- // not currently monitored by anything - potentially remove?
- private static final Gauge currentInPriorityWaitingCount =
- Gauge.build(
- "vidarr_in_priority_waiting_per_workflow_current",
- "The current number of workflows that are on priority waiting to run.")
- .labelNames("workflow")
- .register();
-
- private final Map workflows = new ConcurrentHashMap<>();
-
- @Override
- public Optional> inputFromSubmitter() {
- return Optional.of(new Pair<>("priority", BasicType.INTEGER.asOptional()));
- }
-
- @Override
- public void recover(
- String workflowName,
- String workflowVersion,
- String vidarrId,
- Optional resourceJson) {
- // Do nothing, as once the workflow run launches, it is no longer tracked in PriorityByWorkflow
- }
-
- @Override
- public void release(
- String workflowName, String workflowVersion, String vidarrId, Optional input) {
- // If workflow run did not launch, re-add to waiting list
- if (input.isPresent()) {
- set(workflowName, vidarrId, input);
- }
- // Otherwise do nothing
- }
-
- @Override
- public synchronized ConsumableResourceResponse request(
- String workflowName,
- String workflowVersion,
- String vidarrId,
- Instant created,
- OptionalInt workflowMaxInFlight,
- Optional input) {
-
- int workflowPriority = Collections.min(acceptedPriorities);
- if (input.isPresent()) {
- workflowPriority = input.get().asInt();
- }
-
- if (!acceptedPriorities.contains(workflowPriority)) {
- return ConsumableResourceResponse.error(
- String.format(
- "Vidarr error: The workflow '%s' run's priority (%d) is invalid. Priority "
- + "values should be one of the following: %s",
- workflowName,
- workflowPriority,
- acceptedPriorities.stream().map(String::valueOf).collect(Collectors.joining(", "))));
- }
-
- final var state = workflows.get(workflowName);
- if (state == null || state.waiting.isEmpty()) {
- return ConsumableResourceResponse.AVAILABLE;
- }
-
- // If this workflow has already been seen
- // Add the current run to the waitlist
- // ensuring it replaces previous runs with the same ID, accounting for if the priority has
- // changed
- set(workflowName, vidarrId, input);
-
- if (workflowPriority >= state.waiting.last().getValue()) {
- state.waiting.remove(vidarrId);
- currentInPriorityWaitingCount.labels(workflowName).set(state.waiting.size());
- return ConsumableResourceResponse.AVAILABLE;
- } else {
- currentInPriorityWaitingCount.labels(workflowName).set(state.waiting.size());
- return ConsumableResourceResponse.error(
- String.format(
- "There are %s workflows currently queued up with higher priority.", workflowName));
- }
- }
-
- public void set(String workflowName, String vidarrId, Optional input) {
-
- int workflowPriority = Collections.min(acceptedPriorities);
- if (input.isPresent()) {
- workflowPriority = input.get().asInt();
- }
-
- final var stateWaiting =
- workflows.computeIfAbsent(workflowName, k -> new WaitingState()).waiting;
- stateWaiting.add(new SimpleEntry(vidarrId, workflowPriority));
- currentInPriorityWaitingCount.labels(workflowName).set(stateWaiting.size());
- }
-
- @Override
- public void startup(String name) {
- // Always ok.
- }
-
- @Override
- public boolean isInputFromSubmitterRequired() {
- return false;
- }
-}
diff --git a/vidarr-server/src/main/java/module-info.java b/vidarr-server/src/main/java/module-info.java
index 56b8f6b6..d063b4a2 100644
--- a/vidarr-server/src/main/java/module-info.java
+++ b/vidarr-server/src/main/java/module-info.java
@@ -4,7 +4,6 @@
import ca.on.oicr.gsi.vidarr.RuntimeProvisionerProvider;
import ca.on.oicr.gsi.vidarr.UnloadFilterProvider;
import ca.on.oicr.gsi.vidarr.WorkflowEngineProvider;
-import ca.on.oicr.gsi.vidarr.server.PriorityByWorkflow;
module ca.on.oicr.gsi.vidarr.server {
exports ca.on.oicr.gsi.vidarr.server;
@@ -42,9 +41,6 @@
com.fasterxml.jackson.databind;
opens db.migration;
- provides ConsumableResourceProvider with
- PriorityByWorkflow;
-
uses ConsumableResourceProvider;
uses InputProvisionerProvider;
uses OutputProvisionerProvider;
diff --git a/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java b/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java
deleted file mode 100644
index a68ebecc..00000000
--- a/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java
+++ /dev/null
@@ -1,341 +0,0 @@
-package ca.on.oicr.gsi.vidarr.server;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse;
-import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse.Visitor;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.time.Instant;
-import java.util.Optional;
-import java.util.OptionalInt;
-import org.junit.Before;
-import org.junit.Test;
-
-public class PriorityByWorkflowTest {
-
- private PriorityByWorkflow sut;
- private ObjectMapper mapper = new ObjectMapper();
-
- String workflow = "test";
- String version = "1.0";
- ConsumableResourceResponse.Visitor> consumableResourceCheckerVisitor =
- new Visitor>() {
- @Override
- public Optional available() {
- return Optional.empty();
- }
-
- @Override
- public void clear(String name) {}
-
- @Override
- public Optional error(String message) {
- return Optional.of(message);
- }
-
- @Override
- public void set(String name, long value) {}
-
- @Override
- public Optional unavailable() {
- return Optional.of(String.format("Resource is not available"));
- }
- };
-
- @Before
- public void instantiate() {
- sut = new PriorityByWorkflow();
- }
-
- @Test
- public void testRequest_invalidPriorityReturnsError() {
- JsonNode invalidJson = mapper.valueToTree(5);
-
- Optional requestError =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(invalidJson))
- .apply(consumableResourceCheckerVisitor);
- assertTrue(requestError.isPresent());
- assertEquals(
- requestError.get(),
- "Vidarr error: The workflow 'test' run's priority (5) is "
- + "invalid. Priority values should be one of the following: 1, 2, 3, 4");
- }
-
- @Test
- public void testRequest_emptyInputAndEmptyWaitingNoError() {
- Optional requestError =
- sut.request(
- workflow, version, "abcdef", Instant.EPOCH, OptionalInt.empty(), Optional.empty())
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestError.isEmpty());
- }
-
- @Test
- public void testRequest_emptyInputAndEmptyWaitingIsAvailable() {
-
- var response =
- sut.request(
- workflow, version, "abcdef", Instant.EPOCH, OptionalInt.empty(), Optional.empty());
-
- assertEquals(response, ConsumableResourceResponse.AVAILABLE);
- }
-
- @Test
- public void testRequest_validInputAndEmptyWaitingIsOk() {
- JsonNode validJson = mapper.valueToTree(2);
- Optional requestError =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(validJson))
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestError.isEmpty());
- }
-
- @Test
- public void testIfWorkflowRunWithHigherPriorityExists_thenWorkflowRunDoesNotLaunch() {
- JsonNode higherPriority = mapper.valueToTree(4);
- JsonNode lowerPriority = mapper.valueToTree(2);
-
- sut.set(workflow, "qwerty", Optional.of(higherPriority));
-
- Optional requestError =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(lowerPriority))
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestError.isPresent());
- assertEquals(
- requestError.get(),
- "There are test workflows currently queued up with higher " + "priority.");
- }
-
- @Test
- public void testIfWorkflowRunWithLowerPriorityExists_thenWorkflowRunDoesNotError() {
- JsonNode higherPriority = mapper.valueToTree(4);
- JsonNode lowerPriority = mapper.valueToTree(2);
-
- sut.set(workflow, "qwerty", Optional.of(lowerPriority));
-
- Optional requestError =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(higherPriority))
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestError.isEmpty());
- }
-
- @Test
- public void testIfWorkflowRunWithLowerPriorityExists_thenWorkflowRunLaunches() {
- JsonNode higherPriority = mapper.valueToTree(4);
- JsonNode lowerPriority = mapper.valueToTree(2);
-
- sut.set(workflow, "qwerty", Optional.of(lowerPriority));
-
- var response =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(higherPriority));
-
- assertEquals(response, ConsumableResourceResponse.AVAILABLE);
- }
-
- @Test
- public void testIfWorkflowRunWithSamePriorityExists_thenWorkflowDoesNotError() {
- JsonNode twoPriority = mapper.valueToTree(2);
- JsonNode twoPriorityAlso = mapper.valueToTree(2);
-
- sut.set(workflow, "qwerty", Optional.of(twoPriority));
-
- Optional requestError =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(twoPriorityAlso))
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestError.isEmpty());
- }
-
- @Test
- public void testIfWorkflowRunWithSamePriorityExists_thenWorkflowRunLaunches() {
- JsonNode twoPriority = mapper.valueToTree(2);
- JsonNode twoPriorityAlso = mapper.valueToTree(2);
-
- sut.set(workflow, "qwerty", Optional.of(twoPriority));
-
- var response =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(twoPriorityAlso));
-
- assertEquals(response, ConsumableResourceResponse.AVAILABLE);
- }
-
- @Test
- public void testIfWorkflowRunWIthEmptyPriorityIsSet_thenWorkflowRunIsAddedWithLowestPriority() {
- JsonNode lowestPriority = mapper.valueToTree(1);
-
- Optional requestErrorForEmpty =
- sut.request(
- workflow, version, "abcdef", Instant.EPOCH, OptionalInt.empty(), Optional.empty())
- .apply(consumableResourceCheckerVisitor);
-
- Optional requestErrorForLowest =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(lowestPriority))
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestErrorForEmpty.isEmpty());
- assertTrue(requestErrorForLowest.isEmpty());
- }
-
- @Test
- public void testIfResourceIsReleasedButNotLaunched_thenLowerPriorityCannotLaunch() {
-
- JsonNode higherPriority = mapper.valueToTree(4);
- JsonNode lowerPriority = mapper.valueToTree(1);
-
- Optional requestErrorHigher =
- sut.request(
- workflow,
- version,
- "qwerty",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(higherPriority))
- .apply(consumableResourceCheckerVisitor);
-
- sut.release(workflow, version, "qwerty", Optional.of(higherPriority));
-
- Optional requestErrorLower =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(lowerPriority))
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestErrorHigher.isEmpty());
- assertTrue(requestErrorLower.isPresent());
- assertEquals(
- requestErrorLower.get(),
- "There are test workflows currently queued up with higher " + "priority.");
- }
-
- @Test
- public void testIfResourceIsReleasedAndLaunched_thenLowerPriorityCanLaunch() {
-
- JsonNode higherPriority = mapper.valueToTree(4);
- JsonNode lowerPriority = mapper.valueToTree(1);
-
- Optional requestErrorHigher =
- sut.request(
- workflow,
- version,
- "qwerty",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(higherPriority))
- .apply(consumableResourceCheckerVisitor);
-
- sut.release(workflow, version, "qwerty", Optional.empty());
-
- Optional requestErrorLower =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(lowerPriority))
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestErrorHigher.isEmpty());
- assertTrue(requestErrorLower.isEmpty());
- }
-
- @Test
- public void testIfResourcePassesRequest_thenSamePriorityCanLaunch() {
-
- JsonNode validJson = mapper.valueToTree(2);
-
- sut.request(
- workflow, version, "qwerty", Instant.EPOCH, OptionalInt.empty(), Optional.of(validJson))
- .apply(consumableResourceCheckerVisitor);
-
- Optional requestError =
- sut.request(
- workflow,
- version,
- "abcdef",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(validJson))
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestError.isEmpty());
- }
-
- @Test
- public void testIfWorkflowLowersPriority_WorkflowIsNotBlockedBySelf() {
-
- JsonNode higherPriority = mapper.valueToTree(4);
- JsonNode lowerPriority = mapper.valueToTree(2);
-
- sut.set(workflow, "qwerty", Optional.of(higherPriority));
-
- Optional requestErrorLower =
- sut.request(
- workflow,
- version,
- "qwerty",
- Instant.EPOCH,
- OptionalInt.empty(),
- Optional.of(lowerPriority))
- .apply(consumableResourceCheckerVisitor);
-
- assertTrue(requestErrorLower.isEmpty());
- }
-}