Skip to content

Feat(TS): experimental middleware #1544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdks/typescript/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@hatchet-dev/typescript-sdk",
"version": "1.1.7",
"version": "1.2.0-alpha.1",
"description": "Background task orchestration & visibility for developers",
"types": "dist/index.d.ts",
"files": [
Expand Down Expand Up @@ -88,5 +88,6 @@
"qs": "^6.14.0",
"yaml": "^2.7.1",
"zod": "^3.24.2"
}
},
"packageManager": "pnpm@9.15.4+sha512.b2dc20e2fc72b3e18848459b37359a32064663e5627a51e4c74b2c29dd8e8e0491483c3abb40789cfd578bf362fb6ba8261b05f0387d76792ed6e23ea3b1b6a0"
}
2 changes: 2 additions & 0 deletions sdks/typescript/src/v1/client/client.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { MetricsClient } from './features/metrics';
import { RunsClient } from './features/runs';
import { WorkersClient } from './features/workers';
import { WorkflowsClient } from './features/workflows';
import { Middleware } from '../next/middleware/middleware';

export interface IHatchetClient {
_v0: InternalHatchetClient;
Expand All @@ -11,4 +12,5 @@ export interface IHatchetClient {
runs: RunsClient;
workflows: WorkflowsClient;
workers: WorkersClient;
middleware?: Middleware[];
}
66 changes: 53 additions & 13 deletions sdks/typescript/src/v1/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,38 @@ import { MetricsClient } from './features/metrics';
import { WorkersClient } from './features/workers';
import { WorkflowsClient } from './features/workflows';
import { RunsClient } from './features/runs';
import { InputType, OutputType, UnknownInputType, StrictWorkflowOutputType } from '../types';
import {
InputType,
OutputType,
UnknownInputType,
StrictWorkflowOutputType,
JsonObject,
} from '../types';
import { RatelimitsClient } from './features';
import { Middleware, serializeInput, deserializeOutput } from '../next/middleware/middleware';

export interface RuntimeOpts {
middleware?: Middleware[];
}

type Config = Partial<ClientConfig> & RuntimeOpts;

/**
* HatchetV1 implements the main client interface for interacting with the Hatchet workflow engine.
* It provides methods for creating and executing workflows, as well as managing workers.
*/

export class HatchetClient implements IHatchetClient {
/** The underlying v0 client instance */
_v0: InternalHatchetClient;
_api: Api;

private _middleware?: Middleware[];

get middleware() {
return this._middleware;
}

/**
* @deprecated v0 client will be removed in a future release, please upgrade to v1
*/
Expand All @@ -66,11 +86,7 @@ export class HatchetClient implements IHatchetClient {
* @param options - Optional client options
* @param axiosConfig - Optional Axios configuration for HTTP requests
*/
constructor(
config?: Partial<ClientConfig>,
options?: HatchetClientOptions,
axiosConfig?: AxiosRequestConfig
) {
constructor(config?: Config, options?: HatchetClientOptions, axiosConfig?: AxiosRequestConfig) {
try {
const loaded = ConfigLoader.loadClientConfig(config, {
path: options?.config_path,
Expand All @@ -92,6 +108,10 @@ export class HatchetClient implements IHatchetClient {
this.tenantId = clientConfig.tenant_id;
this._api = api(clientConfig.api_url, clientConfig.token, axiosConfig);
this._v0 = new InternalHatchetClient(clientConfig, options, axiosConfig, this.runs);

if (config?.middleware) {
this._middleware = config.middleware;
}
} catch (e) {
if (e instanceof z.ZodError) {
throw new Error(`Invalid client config: ${e.message}`);
Expand All @@ -108,7 +128,7 @@ export class HatchetClient implements IHatchetClient {
* @returns A new Hatchet client instance
*/
static init(
config?: Partial<ClientConfig>,
config?: Config,
options?: HatchetClientOptions,
axiosConfig?: AxiosRequestConfig
): HatchetClient {
Expand Down Expand Up @@ -220,11 +240,11 @@ export class HatchetClient implements IHatchetClient {
* @param options - Configuration options for the workflow run
* @returns A WorkflowRunRef containing the run ID and methods to interact with the run
*/
runNoWait<I extends InputType = UnknownInputType, O extends OutputType = void>(
async runNoWait<I extends InputType = UnknownInputType, O extends OutputType = void>(
workflow: BaseWorkflowDeclaration<I, O> | string | V0Workflow,
input: I,
options: RunOpts
): WorkflowRunRef<O> {
options: RunOpts = {}
): Promise<WorkflowRunRef<O>> {
let name: string;
if (typeof workflow === 'string') {
name = workflow;
Expand All @@ -234,7 +254,21 @@ export class HatchetClient implements IHatchetClient {
throw new Error('unable to identify workflow');
}

return this._v0.admin.runWorkflow<I, O>(name, input, options);
const serializedInput = await serializeInput(input as unknown as JsonObject, this.middleware);
const runRef = this._v0.admin.runWorkflow<I, O>(name, serializedInput as unknown as I, options);

// Wrap the runRef to apply output deserialization
const originalResult = runRef.result;
runRef.result = async () => {
const output = await originalResult.call(runRef);
const deserializedOutput = await deserializeOutput(
output as unknown as JsonObject,
this.middleware
);
return deserializedOutput as unknown as O;
};

return runRef;
}

/**
Expand Down Expand Up @@ -269,8 +303,14 @@ export class HatchetClient implements IHatchetClient {
input: I,
options: RunOpts = {}
): Promise<O> {
const run = this.runNoWait<I, O>(workflow, input, options);
return run.output as Promise<O>;
const serializedInput = await serializeInput(input as unknown as JsonObject, this.middleware);
const runRef = await this.runNoWait<I, O>(workflow, serializedInput as unknown as I, options);
const output = await runRef.result();
const deserializedOutput = await deserializeOutput(
output as unknown as JsonObject,
this.middleware
);
return deserializedOutput as unknown as O;
}

/**
Expand Down
7 changes: 5 additions & 2 deletions sdks/typescript/src/v1/client/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Workflow as V0Workflow } from '@hatchet/workflow';
import { WebhookWorkerCreateRequest } from '@hatchet/clients/rest/generated/data-contracts';
import { BaseWorkflowDeclaration } from '../declaration';
import { HatchetClient } from '..';
import { bindMiddleware } from '../next/middleware/middleware';

const DEFAULT_DURABLE_SLOTS = 1_000;

Expand Down Expand Up @@ -90,8 +91,10 @@ export class Worker {
return Promise.all(
workflows?.map(async (wf) => {
if (wf instanceof BaseWorkflowDeclaration) {
const withMiddleware = await bindMiddleware(wf, this._v1);

// TODO check if tenant is V1
const register = this.nonDurable.registerWorkflowV1(wf);
const register = this.nonDurable.registerWorkflowV1(withMiddleware);

if (wf.definition._durableTasks.length > 0) {
if (!this.durable) {
Expand All @@ -100,7 +103,7 @@ export class Worker {
maxRuns: this.config.durableSlots || DEFAULT_DURABLE_SLOTS,
});
}
this.durable.registerDurableActionsV1(wf.definition);
this.durable.registerDurableActionsV1(withMiddleware.definition);
}

return register;
Expand Down
109 changes: 70 additions & 39 deletions sdks/typescript/src/v1/declaration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { Duration } from './client/duration';
import { MetricsClient } from './client/features/metrics';
import { InputType, OutputType, UnknownInputType, JsonObject } from './types';
import { serializeInput, deserializeOutput } from './next/middleware/middleware';

const UNBOUND_ERR = new Error('workflow unbound to hatchet client, hint: use client.run instead');

Expand Down Expand Up @@ -239,17 +240,40 @@ export class BaseWorkflowDeclaration<
* @returns A WorkflowRunRef containing the run ID and methods to get results and interact with the run.
* @throws Error if the workflow is not bound to a Hatchet client.
*/
runNoWait(input: I, options?: RunOpts, _standaloneTaskName?: string): WorkflowRunRef<O> {
async runNoWait(
input: I,
options?: RunOpts,
_standaloneTaskName?: string
): Promise<WorkflowRunRef<O>> {
if (!this.client) {
throw UNBOUND_ERR;
}

const res = this.client._v0.admin.runWorkflow<I, O>(this.name, input, options);
const serializedInput = await serializeInput(
input as unknown as JsonObject,
this.client.middleware
);
const res = this.client._v0.admin.runWorkflow<I, O>(
this.name,
serializedInput as unknown as I,
options
);

if (_standaloneTaskName) {
res._standalone_task_name = _standaloneTaskName;
}

// Wrap the result method to apply output deserialization
const originalResult = res.result;
res.result = async () => {
const output = await originalResult.call(res);
const deserializedOutput = await deserializeOutput(
output as unknown as JsonObject,
this.client?.middleware
);
return deserializedOutput as unknown as O;
};

return res;
}

Expand Down Expand Up @@ -279,6 +303,7 @@ export class BaseWorkflowDeclaration<

return this.run(input, options, _standaloneTaskName);
}

/**
* Executes the workflow with the given input and awaits the results.
* @param input The input data for the workflow.
Expand All @@ -294,38 +319,29 @@ export class BaseWorkflowDeclaration<
}

if (Array.isArray(input)) {
let resp: WorkflowRunRef<O>[] = [];
for (let i = 0; i < input.length; i += 500) {
const batch = input.slice(i, i + 500);
const batchResp = await this.client._v0.admin.runWorkflows<I, O>(
batch.map((inp) => ({
workflowName: this.definition.name,
input: inp,
options,
}))
);
resp = resp.concat(batchResp);
}

const res: Promise<O>[] = [];
resp.forEach((ref, index) => {
const wf = input[index].workflow;
if (wf instanceof TaskWorkflowDeclaration) {
// eslint-disable-next-line no-param-reassign
ref._standalone_task_name = wf._standalone_task_name;
}
res.push(ref.result());
});
return Promise.all(res);
return Promise.all(input.map((i) => this.run(i, options, _standaloneTaskName)));
}

const res = this.client._v0.admin.runWorkflow<I, O>(this.definition.name, input, options);
const serializedInput = await serializeInput(
input as unknown as JsonObject,
this.client.middleware
);
const res = this.client._v0.admin.runWorkflow<I, O>(
this.name,
serializedInput as unknown as I,
options
);

if (_standaloneTaskName) {
res._standalone_task_name = _standaloneTaskName;
}

return res.result() as Promise<O>;
const output = await res.result();
const deserializedOutput = await deserializeOutput(
output as unknown as JsonObject,
this.client.middleware
);
return deserializedOutput as unknown as O;
}

/**
Expand Down Expand Up @@ -648,25 +664,40 @@ export class TaskWorkflowDeclaration<
});
}

async runNoWait(input: I, options?: RunOpts): Promise<WorkflowRunRef<O>> {
if (!this.client) {
throw UNBOUND_ERR;
}

const res = await super.runNoWait(input, options, this._standalone_task_name);

// Wrap the result method to apply output deserialization
const originalResult = res.result;
res.result = async () => {
const output = await originalResult.call(res);
const deserializedOutput = await deserializeOutput(
output as unknown as JsonObject,
this.client?.middleware
);
return deserializedOutput as unknown as O;
};

return res;
}

async run(input: I, options?: RunOpts): Promise<O>;
async run(input: I[], options?: RunOpts): Promise<O[]>;
async run(input: I | I[], options?: RunOpts): Promise<O | O[]> {
return (await super.run(input as I, options, this._standalone_task_name)) as O | O[];
}

/**
* Triggers a workflow run without waiting for completion.
* @param input The input data for the workflow.
* @param options Optional configuration for this workflow run.
* @returns A WorkflowRunRef containing the run ID and methods to get results and interact with the run.
* @throws Error if the workflow is not bound to a Hatchet client.
*/
runNoWait(input: I, options?: RunOpts): WorkflowRunRef<O> {
if (!this.client) {
throw UNBOUND_ERR;
}

return super.runNoWait(input, options, this._standalone_task_name);
if (Array.isArray(input)) {
return Promise.all(input.map((i) => this.run(i, options)));
}

const runRef = await this.runNoWait(input, options);
return runRef.result();
}

get taskDef() {
Expand Down
8 changes: 4 additions & 4 deletions sdks/typescript/src/v1/examples/cancellations/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { cancellation } from './workflow';
import { hatchet } from '../hatchet-client';
// ...
async function main() {
const run = cancellation.runNoWait({});
const run1 = cancellation.runNoWait({});
const run = await cancellation.runNoWait({});
const run1 = await cancellation.runNoWait({});

await sleep(1000);

Expand All @@ -26,8 +26,8 @@ async function main() {

console.log(resReplay);

const run2 = cancellation.runNoWait({}, { additionalMetadata: { test: 'abc' } });
const run4 = cancellation.runNoWait({}, { additionalMetadata: { test: 'test' } });
const run2 = await cancellation.runNoWait({}, { additionalMetadata: { test: 'abc' } });
const run4 = await cancellation.runNoWait({}, { additionalMetadata: { test: 'test' } });

await sleep(1000);

Expand Down
Loading
Loading