Skip to content

Commit

Permalink
Refactor to more explicit worker execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Spenhouet committed Mar 10, 2024
1 parent 43db8b2 commit 31009c6
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 42 deletions.
76 changes: 47 additions & 29 deletions src/api/job.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import _ from "lodash";
import fs from "node:fs/promises";
import path from "node:path";
import { FILES, PATHS } from "../constants";
import { writeJson } from "../utils/files";
Expand All @@ -10,25 +9,10 @@ import { ASSET_RESOURCES } from "./resources/assets";
import { ENTITY_RESOURCES } from "./resources/entities";
import { GRAPH_RESOURCES } from "./resources/graphs";
import { SYSTEM_RESOURCES } from "./resources/systems";
import { JsonResources } from "./utils/openapi";
import { getAssetIds, getEntityIds, getSystemIds } from "./utils/ids";

const WORKERS_PATH = "src/api/workers";

async function compile<Result>(
dir: string,
workerScript: string,
resources: JsonResources | undefined = undefined
): Promise<(Result | null)[]> {
const ids = await fs.readdir(dir);

if (resources) {
await resources.index(ids);
}

const workerScriptPath = path.resolve(WORKERS_PATH, workerScript);
return executeInWorkerPool<string, Result>(workerScriptPath, ids);
}

async function generateOasSchema() {
INDEX_RESOURCES.extend(ENTITY_RESOURCES, SYSTEM_RESOURCES, ASSET_RESOURCES, GRAPH_RESOURCES);
const oasSchema = _.pick(INDEX_RESOURCES.spec, [
Expand All @@ -55,34 +39,68 @@ async function generate404() {
}

job("API Job", async () => {
const [assetIds, entityIds, systemIds] = await Promise.all([
getAssetIds(),
getEntityIds(),
getSystemIds(),
]);

// TODO this could already be done during data collection, not requiring a post-processing step
// TODO define with depends on definitions as a DAG and then automatically group into consecutive execution steps

// Two types of workers:
// 1. parameter forwarded to worker
// 2. parameter spread to worker pool

// Unify everything and implement DAG based worker pool execution -> execute everything in a single pool?
// Per parameter dependency e.g. depending on supply of an asset being precompiled
// Could be done with local mapping done here and dynamic dependsOn definitions

await Promise.all([
executeInWorker(path.resolve(WORKERS_PATH, "precompile_relations.ts")),
compile(PATHS.assets, "precompile_supply.ts"),
executeInWorker(path.resolve(WORKERS_PATH, "precompile_relations.ts"), assetIds),
executeInWorkerPool(path.resolve(WORKERS_PATH, "precompile_supply.ts"), assetIds),
executeInWorkerPool(
path.resolve(WORKERS_PATH, "precompile_underlying_assets.ts"),
assetIds
),
]);

await Promise.all([
compile(PATHS.assets, "precompile_market_cap.ts"), // Depends on "precompile_supply.ts"
compile(PATHS.assets, "precompile_underlying_assets.ts"),
executeInWorkerPool(path.resolve(WORKERS_PATH, "precompile_market_cap.ts"), assetIds), // Depends on "precompile_supply.ts"
]);

await Promise.all([
compile(PATHS.systems, "precompile_system_total_value_locked.ts", SYSTEM_RESOURCES), // Depends on "precompile_relations.ts" and "precompile_market_cap.ts"
compile(PATHS.entities, "precompile_entities_total_value_locked.ts", ENTITY_RESOURCES), // Depends on "precompile_relations.ts" and "precompile_market_cap.ts"
compile(PATHS.assets, "precompile_collateralization_ratio.ts"), // Depends on "precompile_underlying_assets.ts"
executeInWorkerPool(
path.resolve(WORKERS_PATH, "precompile_system_total_value_locked.ts"),
systemIds
), // Depends on "precompile_relations.ts" and "precompile_market_cap.ts"
executeInWorkerPool(
path.resolve(WORKERS_PATH, "precompile_entities_total_value_locked.ts"),
entityIds
), // Depends on "precompile_relations.ts" and "precompile_market_cap.ts"
executeInWorkerPool(
path.resolve(WORKERS_PATH, "precompile_collateralization_ratio.ts"),
assetIds
), // Depends on "precompile_underlying_assets.ts" and "precompile_market_cap.ts"
]);

await Promise.all([
executeInWorker(path.resolve(WORKERS_PATH, "precompile_collateralization_graph.ts")), // Depends on "precompile_collateralization_ratio.ts"
executeInWorker(
path.resolve(WORKERS_PATH, "precompile_collateralization_graph.ts"),
assetIds
), // Depends on "precompile_collateralization_ratio.ts"
]);

await Promise.all([
INDEX_RESOURCES.index(),
compile(PATHS.entities, "compile_entity.ts", ENTITY_RESOURCES),
compile(PATHS.systems, "compile_system.ts", SYSTEM_RESOURCES),
compile(PATHS.assets, "compile_asset.ts", ASSET_RESOURCES),
executeInWorker(path.resolve(WORKERS_PATH, "compile_graph.ts")),
ASSET_RESOURCES.index(assetIds),
executeInWorkerPool(path.resolve(WORKERS_PATH, "compile_asset.ts"), assetIds), // Depends on
ENTITY_RESOURCES.index(entityIds),
executeInWorkerPool(path.resolve(WORKERS_PATH, "compile_entity.ts"), entityIds), // Depends on
SYSTEM_RESOURCES.index(systemIds),
executeInWorkerPool(path.resolve(WORKERS_PATH, "compile_system.ts"), systemIds), // Depends on
GRAPH_RESOURCES.index(),
executeInWorker(path.resolve(WORKERS_PATH, "compile_graph.ts")), // Depends on "precompile_collateralization_graph.ts"
generateOasSchema(),
generate404(),
]);
Expand Down
17 changes: 17 additions & 0 deletions src/api/utils/ids.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import fs from "node:fs/promises";
import { PATHS } from "../../constants";

export async function getAssetIds(): Promise<bcked.asset.Id[]> {
const assetIds = await fs.readdir(PATHS.assets);
return assetIds as bcked.asset.Id[];
}

export async function getEntityIds(): Promise<bcked.entity.Id[]> {
const entityIds = await fs.readdir(PATHS.entities);
return entityIds as bcked.entity.Id[];
}

export async function getSystemIds(): Promise<bcked.system.Id[]> {
const systemIds = await fs.readdir(PATHS.systems);
return systemIds as bcked.system.Id[];
}
1 change: 0 additions & 1 deletion src/api/workers/compile_graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ parentPort?.on("message", async () => {

try {
await Promise.all([
GRAPH_RESOURCES.index(),
compileHistory<bcked.asset.Graph, "stats.leaveCollateralization">(
FILES.csv.collateralizationGraph,
"stats.leaveCollateralization",
Expand Down
12 changes: 6 additions & 6 deletions src/api/workers/precompile_collateralization_graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { sendErrorReport } from "../../watcher/bot";

import { hoursToMilliseconds } from "date-fns";
import { existsSync } from "fs";
import { readdir } from "fs/promises";
import createGraph, { type Graph } from "ngraph.graph";

import path from "path";
Expand Down Expand Up @@ -122,9 +121,10 @@ function computeStats(graph: Graph<graph.NodeData, graph.LinkData>): graph.Stats
};
}

async function* createGraphs(window: number = hoursToMilliseconds(12)): AsyncIterableIterator<any> {
const assetIds = (await readdir(PATHS.assets)) as bcked.asset.Id[];

async function* createGraphs(
assetIds: bcked.asset.Id[],
window: number = hoursToMilliseconds(12)
): AsyncIterableIterator<any> {
const collateralizationLookups = initializeCollateralizationLookups(assetIds);

// TODO get latest entry from global graph and continue from that time
Expand All @@ -148,7 +148,7 @@ async function* createGraphs(window: number = hoursToMilliseconds(12)): AsyncIte
}
}

parentPort?.on("message", async () => {
parentPort?.on("message", async (assetIds: bcked.asset.Id[]) => {
const step = `Precompiling Collateralization Graph`;
console.log(step);
const filePath = path.join(PATHS.graphs, PATHS.records, FILES.csv.collateralizationGraph);
Expand All @@ -158,7 +158,7 @@ parentPort?.on("message", async () => {
// TODO Later change this to start at the current date and only append changes
await remove(filePath);

const entries = createGraphs();
const entries = createGraphs(assetIds);
await writeToCsv(filePath, entries, "timestamp");

parentPort?.postMessage(null);
Expand Down
11 changes: 5 additions & 6 deletions src/api/workers/precompile_relations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ import { parentPort } from "worker_threads";
import { FILES, PATHS } from "../../constants";
import { sendErrorReport } from "../../watcher/bot";

import { readdir } from "fs/promises";
import _, { type PropertyPath } from "lodash";
import { join } from "path";
import { fromAsync } from "../../utils/array";
import { readJson, writeJson } from "../../utils/files";
import { toId } from "../../utils/helper";

async function* loadAssetDetails(): AsyncIterableIterator<bcked.asset.Details | null> {
const assetIds = (await readdir(PATHS.assets)) as bcked.asset.Id[];

async function* loadAssetDetails(
assetIds: bcked.asset.Id[]
): AsyncIterableIterator<bcked.asset.Details | null> {
for (const assetId of assetIds) {
const detailsJson = join(PATHS.assets, assetId, PATHS.records, FILES.json.details);
yield await readJson(detailsJson);
Expand All @@ -37,12 +36,12 @@ async function storeGrouping(
}
}

parentPort?.on("message", async () => {
parentPort?.on("message", async (assetIds: bcked.asset.Id[]) => {
const step = `Precompiling Relations`;
console.log(step);

try {
const assetDetails = _.compact(await fromAsync(loadAssetDetails()));
const assetDetails = _.compact(await fromAsync(loadAssetDetails(assetIds)));

await storeGrouping(assetDetails, "identifier.system", PATHS.systems);
await storeGrouping(assetDetails, "linkedEntities.issuer", PATHS.entities);
Expand Down

0 comments on commit 31009c6

Please sign in to comment.