From 8bfd6df4c1882866a35c54d4662421b8c3100163 Mon Sep 17 00:00:00 2001 From: Andre Masella Date: Sat, 3 Feb 2024 17:20:12 -0500 Subject: [PATCH] Add tracing to consumable resources Provide an API for consumable resources to export numeric values to observation. --- changes/add_cr_tracing.md | 1 + plugin-guide.md | 6 + .../vidarr/ConsumableResourceResponse.java | 21 ++- .../vidarr/api/WorkflowRunStatusResponse.java | 9 ++ .../server/ConsumableResourceChecker.java | 35 +++-- .../ca/on/oicr/gsi/vidarr/server/Main.java | 1 + .../gsi/vidarr/server/api-docs/vidarr.json | 6 + .../resources/db/migration/V0017__tracing.sql | 1 + .../vidarr/server/PriorityByWorkflowTest.java | 141 ++++++++++-------- 9 files changed, 148 insertions(+), 73 deletions(-) create mode 100644 changes/add_cr_tracing.md create mode 100644 vidarr-server/src/main/resources/db/migration/V0017__tracing.sql diff --git a/changes/add_cr_tracing.md b/changes/add_cr_tracing.md new file mode 100644 index 00000000..93451f19 --- /dev/null +++ b/changes/add_cr_tracing.md @@ -0,0 +1 @@ +* Add tracing API for consumable resources diff --git a/plugin-guide.md b/plugin-guide.md index 0fa31af4..6179b1a2 100644 --- a/plugin-guide.md +++ b/plugin-guide.md @@ -100,6 +100,12 @@ encoded as JSON, if the submitter provided it. The JSON data has been type-checked by Vidarr, so it should be safe to convert to the expected type using Jackson. +Sometimes, consumable resources are doing scoring that would be helpful to know +for debugging purposes. In that case, the resource can return a custom +`ConsumableResourceResponse` that uses the `Visitor.set` method to export +numeric statistics that will be available in the `"tracing"` property. Víðarr +will prefix these variables with the consumable resource's name. + # Input Provisioners Input provisioners implement `ca.on.oicr.gsi.vidarr.InputProvisionerProvider` and `ca.on.oicr.gsi.vidarr.InputProvisioner`. These plugins are responsible for diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/ConsumableResourceResponse.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/ConsumableResourceResponse.java index d3cfbf0b..d482add5 100644 --- a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/ConsumableResourceResponse.java +++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/ConsumableResourceResponse.java @@ -17,6 +17,14 @@ public interface Visitor { */ T available(); + /** + * Clear removes a tracing number from this resource + * + * @param name the identifier for the number + * @see #set(String, long) + */ + void clear(String name); + /** * Convert an unsuccessful response * @@ -25,6 +33,17 @@ public interface Visitor { */ T error(String message); + /** + * Sets a tracing number on this consumable resource. + * + *

Consumable resources are constantly measuring things for decision-making and this allows + * that information to be observed. + * + * @param name the identifier of the number + * @param value the current value + */ + void set(String name, long value); + /** * Convert an unsuccessful but empty response * @@ -70,7 +89,7 @@ public T apply(Visitor visitor) { }; } - private ConsumableResourceResponse() {} + public ConsumableResourceResponse() {} /** * Convert this response into another value diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/api/WorkflowRunStatusResponse.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/api/WorkflowRunStatusResponse.java index 4ed31508..34666cea 100644 --- a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/api/WorkflowRunStatusResponse.java +++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/api/WorkflowRunStatusResponse.java @@ -27,6 +27,7 @@ public class WorkflowRunStatusResponse { protected boolean running; private ZonedDateTime started; private String target; + private Map tracing; private String workflowRunUrl; public ObjectNode getArguments() { @@ -93,6 +94,10 @@ public String getTarget() { return target; } + public Map getTracing() { + return tracing; + } + public String getWorkflowRunUrl() { return workflowRunUrl; } @@ -169,6 +174,10 @@ public void setTarget(String target) { this.target = target; } + public void setTracing(Map tracing) { + this.tracing = tracing; + } + public void setWorkflowRunUrl(String workflowRunUrl) { this.workflowRunUrl = workflowRunUrl; } diff --git a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/ConsumableResourceChecker.java b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/ConsumableResourceChecker.java index 3935cbfa..6e9c19a4 100644 --- a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/ConsumableResourceChecker.java +++ b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/ConsumableResourceChecker.java @@ -5,17 +5,18 @@ import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse.Visitor; import ca.on.oicr.gsi.vidarr.core.Target; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.zaxxer.hikari.HikariDataSource; import io.prometheus.client.Histogram; import java.sql.SQLException; import java.time.Duration; import java.time.Instant; -import java.util.Comparator; import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.jooq.SQLDialect; import org.jooq.impl.DSL; @@ -37,6 +38,7 @@ final class ConsumableResourceChecker implements Runnable { private final Runnable next; private final Instant startTime = Instant.now(); private final Target target; + private final ObjectNode tracing = Main.MAPPER.createObjectNode(); private final String vidarrId; private final String workflow; private final String workflowVersion; @@ -70,13 +72,9 @@ public void run() { return; } var i = 0; - final var resourceBrokers = - target - .consumableResources() - .sorted(Comparator.comparing(cr -> cr.second().priority())) - .toList(); + final var resourceBrokers = target.consumableResources().collect(Collectors.toList()); for (i = 0; i < resourceBrokers.size(); i++) { - final var name = resourceBrokers.get(i).first(); + final var resourceName = resourceBrokers.get(i).first(); final var broker = resourceBrokers.get(i).second(); final var error = broker @@ -87,19 +85,35 @@ public void run() { broker.inputFromSubmitter().map(def -> consumableResources.get(def.first()))) .apply( new Visitor>() { + @Override public Optional available() { return Optional.empty(); } + @Override + public void clear(String name) { + tracing.remove(nameForVariable(name)); + } + @Override public Optional error(String message) { return Optional.of(message); } + private String nameForVariable(String name) { + return String.format("vidarr-resource-%s-%s", resourceName, name); + } + + @Override + public void set(String name, long value) { + tracing.put(nameForVariable(name), value); + } + @Override public Optional unavailable() { - return Optional.of(String.format("Resource %s is not available", name)); + return Optional.of( + String.format("Resource %s is not available", resourceName)); } }); if (error.isPresent()) { @@ -119,8 +133,10 @@ public Optional unavailable() { return; } } + final var waiting = Duration.between(startTime, Instant.now()).toSeconds(); + tracing.put("vidarr-waiting", waiting); updateBlockedResource(null); - waitTime.labels(workflow).observe(Duration.between(startTime, Instant.now()).toSeconds()); + waitTime.labels(workflow).observe(waiting); next.run(); } @@ -132,6 +148,7 @@ private void updateBlockedResource(String error) { DSL.using(configuration) .update(ACTIVE_WORKFLOW_RUN) .set(ACTIVE_WORKFLOW_RUN.WAITING_RESOURCE, error) + .set(ACTIVE_WORKFLOW_RUN.TRACING, tracing) .where(ACTIVE_WORKFLOW_RUN.ID.eq(dbId)) .execute()); diff --git a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java index 82cad14a..3d656bb6 100644 --- a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java +++ b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java @@ -242,6 +242,7 @@ private interface UnloadProcessor { STATUS_FIELDS.add(literalJsonEntry("preflightOk", ACTIVE_WORKFLOW_RUN.PREFLIGHT_OKAY)); STATUS_FIELDS.add(literalJsonEntry("target", ACTIVE_WORKFLOW_RUN.TARGET)); + STATUS_FIELDS.add(literalJsonEntry("tracing", ACTIVE_WORKFLOW_RUN.TRACING)); STATUS_FIELDS.add(literalJsonEntry("workflowRunUrl", ACTIVE_WORKFLOW_RUN.WORKFLOW_RUN_URL)); STATUS_FIELDS.add( literalJsonEntry( diff --git a/vidarr-server/src/main/resources/ca/on/oicr/gsi/vidarr/server/api-docs/vidarr.json b/vidarr-server/src/main/resources/ca/on/oicr/gsi/vidarr/server/api-docs/vidarr.json index eeec9bb7..f4509043 100644 --- a/vidarr-server/src/main/resources/ca/on/oicr/gsi/vidarr/server/api-docs/vidarr.json +++ b/vidarr-server/src/main/resources/ca/on/oicr/gsi/vidarr/server/api-docs/vidarr.json @@ -1056,6 +1056,12 @@ "target": { "type": "string" }, + "tracing": { + "additionalProperties": { + "type": "integer" + }, + "type": "object" + }, "workflowRunUrl": { "type": "string" } diff --git a/vidarr-server/src/main/resources/db/migration/V0017__tracing.sql b/vidarr-server/src/main/resources/db/migration/V0017__tracing.sql new file mode 100644 index 00000000..f83939b2 --- /dev/null +++ b/vidarr-server/src/main/resources/db/migration/V0017__tracing.sql @@ -0,0 +1 @@ +ALTER TABLE active_workflow_run ADD COLUMN tracing jsonb NOT NULL DEFAULT '{}'::jsonb; diff --git a/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java b/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java index c13887c5..c2f20bd4 100644 --- a/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java +++ b/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java @@ -18,22 +18,29 @@ public class PriorityByWorkflowTest { String workflow = "test"; String version = "1.0"; - ConsumableResourceResponse.Visitor> consumableResourceCheckerVisitor = new Visitor>() { - @Override - public Optional available() { - return Optional.empty(); - } - - @Override - public Optional error(String message) { - return Optional.of(message); - } - - @Override - public Optional unavailable() { - return Optional.of(String.format("Resource is not available")); - } - }; + ConsumableResourceResponse.Visitor> consumableResourceCheckerVisitor = + new Visitor>() { + @Override + public Optional available() { + return Optional.empty(); + } + + @Override + public void clear(String name) {} + + @Override + public Optional error(String message) { + return Optional.of(message); + } + + @Override + public void set(String name, long value) {} + + @Override + public Optional unavailable() { + return Optional.of(String.format("Resource is not available")); + } + }; @Before public void instantiate() { @@ -44,17 +51,21 @@ public void instantiate() { public void testRequest_invalidPriorityReturnsError() { JsonNode invalidJson = mapper.valueToTree(5); - Optional requestError = sut.request(workflow, version, "abcdef", - Optional.of(invalidJson)).apply(consumableResourceCheckerVisitor); + Optional requestError = + sut.request(workflow, version, "abcdef", Optional.of(invalidJson)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestError.isPresent()); - assertEquals(requestError.get(), "Vidarr error: The workflow 'test' run's priority (5) is " - + "invalid. Priority values should be one of the following: 1, 2, 3, 4"); + assertEquals( + requestError.get(), + "Vidarr error: The workflow 'test' run's priority (5) is " + + "invalid. Priority values should be one of the following: 1, 2, 3, 4"); } @Test public void testRequest_emptyInputAndEmptyWaitingNoError() { - Optional requestError = sut.request(workflow, version, "abcdef", - Optional.empty()).apply(consumableResourceCheckerVisitor); + Optional requestError = + sut.request(workflow, version, "abcdef", Optional.empty()) + .apply(consumableResourceCheckerVisitor); assertTrue(requestError.isEmpty()); } @@ -65,14 +76,14 @@ public void testRequest_emptyInputAndEmptyWaitingIsAvailable() { var response = sut.request(workflow, version, "abcdef", Optional.empty()); assertEquals(response, ConsumableResourceResponse.AVAILABLE); - } @Test public void testRequest_validInputAndEmptyWaitingIsOk() { JsonNode validJson = mapper.valueToTree(2); - Optional requestError = sut.request(workflow, version, "abcdef", - Optional.of(validJson)).apply(consumableResourceCheckerVisitor); + Optional requestError = + sut.request(workflow, version, "abcdef", Optional.of(validJson)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestError.isEmpty()); } @@ -84,12 +95,14 @@ public void testIfWorkflowRunWithHigherPriorityExists_thenWorkflowRunDoesNotLaun sut.set(workflow, "qwerty", Optional.of(higherPriority)); - Optional requestError = sut.request(workflow, version, "abcdef", - Optional.of(lowerPriority)).apply(consumableResourceCheckerVisitor); + Optional requestError = + sut.request(workflow, version, "abcdef", Optional.of(lowerPriority)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestError.isPresent()); - assertEquals(requestError.get(), "There are test workflows currently queued up with higher " - + "priority."); + assertEquals( + requestError.get(), + "There are test workflows currently queued up with higher " + "priority."); } @Test @@ -99,8 +112,9 @@ public void testIfWorkflowRunWithLowerPriorityExists_thenWorkflowRunDoesNotError sut.set(workflow, "qwerty", Optional.of(lowerPriority)); - Optional requestError = sut.request(workflow, version, "abcdef", - Optional.of(higherPriority)).apply(consumableResourceCheckerVisitor); + Optional requestError = + sut.request(workflow, version, "abcdef", Optional.of(higherPriority)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestError.isEmpty()); } @@ -124,8 +138,9 @@ public void testIfWorkflowRunWithSamePriorityExists_thenWorkflowDoesNotError() { sut.set(workflow, "qwerty", Optional.of(twoPriority)); - Optional requestError = sut.request(workflow, version, "abcdef", - Optional.of(twoPriorityAlso)).apply(consumableResourceCheckerVisitor); + Optional requestError = + sut.request(workflow, version, "abcdef", Optional.of(twoPriorityAlso)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestError.isEmpty()); } @@ -140,22 +155,22 @@ public void testIfWorkflowRunWithSamePriorityExists_thenWorkflowRunLaunches() { var response = sut.request(workflow, version, "abcdef", Optional.of(twoPriorityAlso)); assertEquals(response, ConsumableResourceResponse.AVAILABLE); - } @Test public void testIfWorkflowRunWIthEmptyPriorityIsSet_thenWorkflowRunIsAddedWithLowestPriority() { JsonNode lowestPriority = mapper.valueToTree(1); - Optional requestErrorForEmpty = sut.request(workflow, version, "abcdef", - Optional.empty()).apply(consumableResourceCheckerVisitor); + Optional requestErrorForEmpty = + sut.request(workflow, version, "abcdef", Optional.empty()) + .apply(consumableResourceCheckerVisitor); - Optional requestErrorForLowest = sut.request(workflow, version, "abcdef", - Optional.of(lowestPriority)).apply(consumableResourceCheckerVisitor); + Optional requestErrorForLowest = + sut.request(workflow, version, "abcdef", Optional.of(lowestPriority)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestErrorForEmpty.isEmpty()); assertTrue(requestErrorForLowest.isEmpty()); - } @Test @@ -164,19 +179,21 @@ public void testIfResourceIsReleasedButNotLaunched_thenLowerPriorityCannotLaunch JsonNode higherPriority = mapper.valueToTree(4); JsonNode lowerPriority = mapper.valueToTree(1); - Optional requestErrorHigher = sut.request(workflow, version, "qwerty", - Optional.of(higherPriority)).apply(consumableResourceCheckerVisitor); + Optional requestErrorHigher = + sut.request(workflow, version, "qwerty", Optional.of(higherPriority)) + .apply(consumableResourceCheckerVisitor); sut.release(workflow, version, "qwerty", Optional.of(higherPriority)); - Optional requestErrorLower = sut.request(workflow, version, "abcdef", - Optional.of(lowerPriority)).apply(consumableResourceCheckerVisitor); + Optional requestErrorLower = + sut.request(workflow, version, "abcdef", Optional.of(lowerPriority)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestErrorHigher.isEmpty()); assertTrue(requestErrorLower.isPresent()); - assertEquals(requestErrorLower.get(), "There are test workflows currently queued up with higher " - + "priority."); - + assertEquals( + requestErrorLower.get(), + "There are test workflows currently queued up with higher " + "priority."); } @Test @@ -185,17 +202,18 @@ public void testIfResourceIsReleasedAndLaunched_thenLowerPriorityCanLaunch() { JsonNode higherPriority = mapper.valueToTree(4); JsonNode lowerPriority = mapper.valueToTree(1); - Optional requestErrorHigher = sut.request(workflow, version, "qwerty", - Optional.of(higherPriority)).apply(consumableResourceCheckerVisitor); + Optional requestErrorHigher = + sut.request(workflow, version, "qwerty", Optional.of(higherPriority)) + .apply(consumableResourceCheckerVisitor); sut.release(workflow, version, "qwerty", Optional.empty()); - Optional requestErrorLower = sut.request(workflow, version, "abcdef", - Optional.of(lowerPriority)).apply(consumableResourceCheckerVisitor); + Optional requestErrorLower = + sut.request(workflow, version, "abcdef", Optional.of(lowerPriority)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestErrorHigher.isEmpty()); assertTrue(requestErrorLower.isEmpty()); - } @Test @@ -203,14 +221,14 @@ public void testIfResourcePassesRequest_thenSamePriorityCanLaunch() { JsonNode validJson = mapper.valueToTree(2); - sut.request(workflow, version, "qwerty", - Optional.of(validJson)).apply(consumableResourceCheckerVisitor); + sut.request(workflow, version, "qwerty", Optional.of(validJson)) + .apply(consumableResourceCheckerVisitor); - Optional requestError = sut.request(workflow, version, "abcdef", - Optional.of(validJson)).apply(consumableResourceCheckerVisitor); + Optional requestError = + sut.request(workflow, version, "abcdef", Optional.of(validJson)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestError.isEmpty()); - } @Test @@ -221,13 +239,10 @@ public void testIfWorkflowLowersPriority_WorkflowIsNotBlockedBySelf() { sut.set(workflow, "qwerty", Optional.of(higherPriority)); - - Optional requestErrorLower = sut.request(workflow, version, "qwerty", - Optional.of(lowerPriority)).apply(consumableResourceCheckerVisitor); + Optional requestErrorLower = + sut.request(workflow, version, "qwerty", Optional.of(lowerPriority)) + .apply(consumableResourceCheckerVisitor); assertTrue(requestErrorLower.isEmpty()); - } - - -} \ No newline at end of file +}