Skip to content

Commit

Permalink
Add tracing to consumable resources
Browse files Browse the repository at this point in the history
Provide an API for consumable resources to export numeric values to
observation.
  • Loading branch information
apmasell authored and avarsava committed Feb 12, 2024
1 parent 7a0e47d commit 8bfd6df
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 73 deletions.
1 change: 1 addition & 0 deletions changes/add_cr_tracing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Add tracing API for consumable resources
6 changes: 6 additions & 0 deletions plugin-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ encoded as JSON, if the submitter provided it. The JSON data has been
type-checked by Vidarr, so it should be safe to convert to the expected type
using Jackson.

Sometimes, consumable resources are doing scoring that would be helpful to know
for debugging purposes. In that case, the resource can return a custom
`ConsumableResourceResponse` that uses the `Visitor.set` method to export
numeric statistics that will be available in the `"tracing"` property. Víðarr
will prefix these variables with the consumable resource's name.

# Input Provisioners
Input provisioners implement `ca.on.oicr.gsi.vidarr.InputProvisionerProvider`
and `ca.on.oicr.gsi.vidarr.InputProvisioner`. These plugins are responsible for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ public interface Visitor<T> {
*/
T available();

/**
* Clear removes a tracing number from this resource
*
* @param name the identifier for the number
* @see #set(String, long)
*/
void clear(String name);

/**
* Convert an unsuccessful response
*
Expand All @@ -25,6 +33,17 @@ public interface Visitor<T> {
*/
T error(String message);

/**
* Sets a tracing number on this consumable resource.
*
* <p>Consumable resources are constantly measuring things for decision-making and this allows
* that information to be observed.
*
* @param name the identifier of the number
* @param value the current value
*/
void set(String name, long value);

/**
* Convert an unsuccessful but empty response
*
Expand Down Expand Up @@ -70,7 +89,7 @@ public <T> T apply(Visitor<T> visitor) {
};
}

private ConsumableResourceResponse() {}
public ConsumableResourceResponse() {}

/**
* Convert this response into another value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class WorkflowRunStatusResponse {
protected boolean running;
private ZonedDateTime started;
private String target;
private Map<String, Long> tracing;
private String workflowRunUrl;

public ObjectNode getArguments() {
Expand Down Expand Up @@ -93,6 +94,10 @@ public String getTarget() {
return target;
}

public Map<String, Long> getTracing() {
return tracing;
}

public String getWorkflowRunUrl() {
return workflowRunUrl;
}
Expand Down Expand Up @@ -169,6 +174,10 @@ public void setTarget(String target) {
this.target = target;
}

public void setTracing(Map<String, Long> tracing) {
this.tracing = tracing;
}

public void setWorkflowRunUrl(String workflowRunUrl) {
this.workflowRunUrl = workflowRunUrl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse.Visitor;
import ca.on.oicr.gsi.vidarr.core.Target;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.zaxxer.hikari.HikariDataSource;
import io.prometheus.client.Histogram;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;

Expand All @@ -37,6 +38,7 @@ final class ConsumableResourceChecker implements Runnable {
private final Runnable next;
private final Instant startTime = Instant.now();
private final Target target;
private final ObjectNode tracing = Main.MAPPER.createObjectNode();
private final String vidarrId;
private final String workflow;
private final String workflowVersion;
Expand Down Expand Up @@ -70,13 +72,9 @@ public void run() {
return;
}
var i = 0;
final var resourceBrokers =
target
.consumableResources()
.sorted(Comparator.comparing(cr -> cr.second().priority()))
.toList();
final var resourceBrokers = target.consumableResources().collect(Collectors.toList());
for (i = 0; i < resourceBrokers.size(); i++) {
final var name = resourceBrokers.get(i).first();
final var resourceName = resourceBrokers.get(i).first();
final var broker = resourceBrokers.get(i).second();
final var error =
broker
Expand All @@ -87,19 +85,35 @@ public void run() {
broker.inputFromSubmitter().map(def -> consumableResources.get(def.first())))
.apply(
new Visitor<Optional<String>>() {

@Override
public Optional<String> available() {
return Optional.empty();
}

@Override
public void clear(String name) {
tracing.remove(nameForVariable(name));
}

@Override
public Optional<String> error(String message) {
return Optional.of(message);
}

private String nameForVariable(String name) {
return String.format("vidarr-resource-%s-%s", resourceName, name);
}

@Override
public void set(String name, long value) {
tracing.put(nameForVariable(name), value);
}

@Override
public Optional<String> unavailable() {
return Optional.of(String.format("Resource %s is not available", name));
return Optional.of(
String.format("Resource %s is not available", resourceName));
}
});
if (error.isPresent()) {
Expand All @@ -119,8 +133,10 @@ public Optional<String> unavailable() {
return;
}
}
final var waiting = Duration.between(startTime, Instant.now()).toSeconds();
tracing.put("vidarr-waiting", waiting);
updateBlockedResource(null);
waitTime.labels(workflow).observe(Duration.between(startTime, Instant.now()).toSeconds());
waitTime.labels(workflow).observe(waiting);
next.run();
}

Expand All @@ -132,6 +148,7 @@ private void updateBlockedResource(String error) {
DSL.using(configuration)
.update(ACTIVE_WORKFLOW_RUN)
.set(ACTIVE_WORKFLOW_RUN.WAITING_RESOURCE, error)
.set(ACTIVE_WORKFLOW_RUN.TRACING, tracing)
.where(ACTIVE_WORKFLOW_RUN.ID.eq(dbId))
.execute());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ private interface UnloadProcessor<T> {

STATUS_FIELDS.add(literalJsonEntry("preflightOk", ACTIVE_WORKFLOW_RUN.PREFLIGHT_OKAY));
STATUS_FIELDS.add(literalJsonEntry("target", ACTIVE_WORKFLOW_RUN.TARGET));
STATUS_FIELDS.add(literalJsonEntry("tracing", ACTIVE_WORKFLOW_RUN.TRACING));
STATUS_FIELDS.add(literalJsonEntry("workflowRunUrl", ACTIVE_WORKFLOW_RUN.WORKFLOW_RUN_URL));
STATUS_FIELDS.add(
literalJsonEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,12 @@
"target": {
"type": "string"
},
"tracing": {
"additionalProperties": {
"type": "integer"
},
"type": "object"
},
"workflowRunUrl": {
"type": "string"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE active_workflow_run ADD COLUMN tracing jsonb NOT NULL DEFAULT '{}'::jsonb;
Loading

0 comments on commit 8bfd6df

Please sign in to comment.