Skip to content

Commit 6dfbae9

Browse files
authored
ensure Workflow.interrupt triggers compensation (#4958)
1 parent 700162d commit 6dfbae9

File tree

3 files changed

+80
-62
lines changed

3 files changed

+80
-62
lines changed

.changeset/rare-drinks-joke.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
ensure Workflow.interrupt triggers compensation

packages/cluster/src/ClusterWorkflowEngine.ts

Lines changed: 57 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* @since 1.0.0
33
*/
44
import * as Rpc from "@effect/rpc/Rpc"
5+
import { DurableDeferred } from "@effect/workflow"
56
import * as Activity from "@effect/workflow/Activity"
67
import * as DurableClock from "@effect/workflow/DurableClock"
78
import * as Workflow from "@effect/workflow/Workflow"
@@ -23,9 +24,7 @@ import * as Entity from "./Entity.js"
2324
import { EntityAddress } from "./EntityAddress.js"
2425
import { EntityId } from "./EntityId.js"
2526
import { EntityType } from "./EntityType.js"
26-
import * as Message from "./Message.js"
2727
import { MessageStorage } from "./MessageStorage.js"
28-
import * as Reply from "./Reply.js"
2928
import type { WithExitEncoded } from "./Reply.js"
3029
import * as Sharding from "./Sharding.js"
3130
import * as Snowflake from "./Snowflake.js"
@@ -37,7 +36,6 @@ import * as Snowflake from "./Snowflake.js"
3736
export const make = Effect.gen(function*() {
3837
const sharding = yield* Sharding.Sharding
3938
const storage = yield* MessageStorage
40-
const snowflakeGen = yield* Snowflake.Generator
4139

4240
const entities = new Map<
4341
string,
@@ -129,9 +127,24 @@ export const make = Effect.gen(function*() {
129127
Effect.orDie
130128
)
131129

130+
const clearClock = Effect.fnUntraced(function*(options: {
131+
readonly executionId: string
132+
}) {
133+
const entityId = EntityId.make(options.executionId)
134+
const shardId = sharding.getShardId(entityId)
135+
const clockAddress = new EntityAddress({
136+
entityType: ClockEntity.type,
137+
entityId,
138+
shardId
139+
})
140+
yield* storage.clearAddress(clockAddress)
141+
})
142+
132143
return WorkflowEngine.of({
133-
register: (workflow, execute) =>
134-
Effect.suspend(() => {
144+
register(workflow, execute) {
145+
// eslint-disable-next-line @typescript-eslint/no-this-alias
146+
const engine = this
147+
return Effect.suspend(() => {
135148
if (entities.has(workflow.name)) {
136149
return Effect.dieMessage(`Workflow ${workflow.name} already registered`)
137150
}
@@ -143,19 +156,36 @@ export const make = Effect.gen(function*() {
143156
const address = yield* Entity.CurrentAddress
144157
const executionId = address.entityId
145158
return {
146-
run: (request: Entity.Request<any>) =>
147-
execute(request.payload, executionId).pipe(
159+
run: (request: Entity.Request<any>) => {
160+
const instance = WorkflowInstance.of({
161+
workflow,
162+
executionId,
163+
suspended: false
164+
})
165+
return execute(request.payload, executionId).pipe(
166+
Effect.onExit(() => {
167+
if (!instance.suspended) {
168+
return Effect.void
169+
}
170+
return engine.deferredResult(InterruptSignal).pipe(
171+
Effect.flatMap((maybeResult) => {
172+
if (Option.isNone(maybeResult)) {
173+
return Effect.void
174+
}
175+
instance.suspended = false
176+
return Effect.zipRight(
177+
Effect.ignore(clearClock({ executionId })),
178+
Effect.interrupt
179+
)
180+
}),
181+
Effect.orDie
182+
)
183+
}),
148184
Effect.scoped,
149185
Workflow.intoResult,
150-
Effect.provideService(
151-
WorkflowInstance,
152-
WorkflowInstance.of({
153-
workflow,
154-
executionId,
155-
suspended: false
156-
})
157-
)
158-
) as any,
186+
Effect.provideService(WorkflowInstance, instance)
187+
) as any
188+
},
159189
activity: Effect.fnUntraced(function*(request: Entity.Request<any>) {
160190
const activityId = `${executionId}/${request.payload.name}`
161191
let entry = activities.get(activityId)
@@ -186,7 +216,8 @@ export const make = Effect.gen(function*() {
186216
}
187217
})
188218
) as Effect.Effect<void>
189-
}),
219+
})
220+
},
190221

191222
execute: ({ discard, executionId, payload, workflow }) =>
192223
RcMap.get(clients, workflow.name).pipe(
@@ -196,7 +227,7 @@ export const make = Effect.gen(function*() {
196227
),
197228

198229
interrupt: Effect.fnUntraced(
199-
function*(workflow, executionId) {
230+
function*(this: WorkflowEngine["Type"], workflow, executionId) {
200231
const requestId = yield* requestIdFor({
201232
entityType: `Workflow/${workflow.name}`,
202233
executionId,
@@ -214,41 +245,12 @@ export const make = Effect.gen(function*() {
214245
return
215246
}
216247

217-
const entityId = EntityId.make(executionId)
218-
const shardId = sharding.getShardId(entityId)
219-
const workflowAddress = new EntityAddress({
220-
entityType: EntityType.make(`Workflow/${workflow.name}`),
221-
entityId,
222-
shardId
223-
})
224-
const deferredAddress = new EntityAddress({
225-
entityType: DeferredEntity.type,
226-
entityId,
227-
shardId
228-
})
229-
const clockAddress = new EntityAddress({
230-
entityType: ClockEntity.type,
231-
entityId,
232-
shardId
248+
yield* this.deferredDone({
249+
workflowName: workflow.name,
250+
executionId,
251+
deferred: InterruptSignal,
252+
exit: { _tag: "Success", value: void 0 }
233253
})
234-
if (Option.isNone(reply)) {
235-
yield* sharding.sendOutgoing(
236-
Message.OutgoingEnvelope.interrupt({
237-
address: workflowAddress,
238-
id: snowflakeGen.unsafeNext(),
239-
requestId: requestId.value
240-
}),
241-
true
242-
)
243-
} else {
244-
yield* sharding.reset(requestId.value)
245-
}
246-
yield* storage.saveReply(Reply.ReplyWithContext.interrupt({
247-
id: snowflakeGen.unsafeNext(),
248-
requestId: requestId.value
249-
}))
250-
yield* storage.clearAddress(deferredAddress)
251-
yield* storage.clearAddress(clockAddress)
252254
},
253255
Effect.retry({
254256
while: (e) => e._tag === "PersistenceError",
@@ -461,6 +463,8 @@ const ClockEntityLayer = ClockEntity.toLayer(Effect.gen(function*() {
461463
}
462464
}))
463465

466+
const InterruptSignal = DurableDeferred.make("Workflow/InterruptSignal")
467+
464468
/**
465469
* @since 1.0.0
466470
* @category Layers
@@ -471,6 +475,5 @@ export const layer: Layer.Layer<
471475
Sharding.Sharding | MessageStorage
472476
> = DeferredEntityLayer.pipe(
473477
Layer.merge(ClockEntityLayer),
474-
Layer.provideMerge(Layer.scoped(WorkflowEngine, make)),
475-
Layer.provide(Snowflake.layerGenerator)
478+
Layer.provideMerge(Layer.scoped(WorkflowEngine, make))
476479
)

packages/cluster/test/ClusterWorkflowEngine.test.ts

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
} from "@effect/cluster"
1010
import { assert, describe, expect, it } from "@effect/vitest"
1111
import { Activity, DurableClock, DurableDeferred, Workflow, WorkflowEngine } from "@effect/workflow"
12-
import { Cause, DateTime, Effect, Exit, Fiber, Layer, Option, Schema, TestClock } from "effect"
12+
import { DateTime, Effect, Exit, Fiber, Layer, Schema, TestClock } from "effect"
1313

1414
describe.concurrent("ClusterWorkflowEngine", () => {
1515
it.effect("should run a workflow", () =>
@@ -84,19 +84,31 @@ describe.concurrent("ClusterWorkflowEngine", () => {
8484
// - 1 initial request
8585
// - 5 attempts to send email
8686
// - 1 sleep activity
87-
expect(driver.requests.size).toEqual(7)
87+
// - 1 durable clock run
88+
// - 1 durable clock deferred set
89+
// - 1 durable clock deferred resume
90+
// - 1 interrupt signal set
91+
// - 1 interrupt signal resume
92+
expect(driver.requests.size).toEqual(12)
93+
yield* TestClock.adjust(5000)
94+
95+
// clock cleared
96+
expect(driver.requests.size).toEqual(11)
8897

8998
const result = driver.requests.get(envelope.requestId)!
9099
const reply = result.replies[0]!
91100
assert(
92101
reply._tag === "WithExit" &&
93-
reply.exit._tag === "Failure" &&
94-
reply.exit.cause._tag === "Interrupt"
102+
reply.exit._tag === "Success"
95103
)
96-
yield* TestClock.adjust(5000)
104+
const value = reply.exit.value as Workflow.ResultEncoded<any, any>
105+
assert(value._tag === "Complete" && value.exit._tag === "Failure")
97106

98107
const exit = yield* Fiber.await(fiber)
99108
assert(Exit.isInterrupted(exit))
109+
110+
const flags = yield* Flags
111+
assert.isTrue(flags.get("compensation"))
100112
}).pipe(
101113
Effect.provide(TestWorkflowLayer)
102114
))
@@ -177,10 +189,8 @@ const EmailWorkflowLayer = EmailWorkflow.toLayer(Effect.fn(function*(payload) {
177189
}
178190
})
179191
}).pipe(
180-
EmailWorkflow.withCompensation(Effect.fnUntraced(function*(_, cause) {
192+
EmailWorkflow.withCompensation(Effect.fnUntraced(function*(_) {
181193
flags.set("compensation", true)
182-
const error = Cause.failureOption(cause)
183-
assert.strictEqual(Option.getOrThrow(error).message, "Compensation triggered")
184194
})),
185195
Activity.retry({ times: 5 })
186196
)

0 commit comments

Comments
 (0)