Skip to content
This repository was archived by the owner on Aug 1, 2022. It is now read-only.

Commit 65a8e23

Browse files
Merge pull request #41 from sam-n-johnston/feat/map-state
Feat/map state
2 parents 37c1926 + eae336e commit 65a8e23

15 files changed

+155
-75
lines changed

src/Context/Context.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ export class Context {
3636
return this._taskContext;
3737
}
3838

39+
public clone(): Context {
40+
return new Context(this._executionContext, this._stateMachineContext, this._stateContext, this._taskContext);
41+
}
42+
3943
transitionTo(state: StateContext, task: TaskContext = TaskContext.create()): void {
4044
this._stateContext = state;
4145
this._taskContext = task;

src/StateMachineExecutor.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ export class StateMachineExecutor {
3737
try {
3838
stateExecutorOutput = await typeExecutor.execute(this.context, stateDefinition, inputJson);
3939

40+
this.logger.debug(`StateMachineExecutor - execute1 - ${stateExecutorOutput}`);
41+
4042
if (stateExecutorOutput.End) {
4143
this.logger.log(`[${this.context.State.Name}] State Machine Ended`);
4244
return stateExecutorOutput.json;

src/StateProcessor.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ import { PayloadTemplateType } from '../src/types/State';
33
import { Context } from './Context/Context';
44
import { LambdaWaitFotTokenPayloadTemplate } from './PayloadTemplates/LambdaWaitFotTokenPayloadTemplate';
55
import { ParameterPayloadTemplate } from './PayloadTemplates/ParameterPayloadTemplate';
6+
import { Logger } from './utils/Logger';
67

78
export class StateProcessor {
9+
protected static logger: Logger = Logger.getInstance();
10+
811
public static processInputPath(dataJson: string | undefined | null, inputPath: string | null | undefined): string {
12+
this.logger.debug(`StateProcessor - processInputPath - ${dataJson}`);
913
if (inputPath === null) {
1014
return '{}';
1115
}

src/main.ts

Lines changed: 63 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import type { ServerlessOfflineHooks } from './types/ServerlessOfflineHooks';
77
import { StepFunctionSimulatorServer } from './StepFunctionSimulatorServer';
88
import { StateInfoHandler } from './StateInfoHandler';
99
import { Logger } from './utils/Logger';
10-
import { TaskStateDefinition } from './types/State';
10+
import { MapStateDefinition, StateDefinition, TaskStateDefinition } from './types/State';
11+
import { StateType } from './stateTasks/StateType';
1112

1213
class ServerlessOfflineStepFunctionsPlugin {
1314
public hooks?: ServerlessOfflineHooks;
@@ -17,10 +18,12 @@ class ServerlessOfflineStepFunctionsPlugin {
1718
private cliOptions: CLIOptions;
1819
private options?: ServerlessOfflineStepFunctionsOptions;
1920
private stepFunctionSimulatorServer?: StepFunctionSimulatorServer;
21+
private logger: Logger;
2022

2123
constructor(serverless: Record<any, any>, cliOptions: CLIOptions) {
2224
this.serverless = serverless;
2325
this.cliOptions = cliOptions;
26+
this.logger = Logger.getInstance();
2427

2528
this.commands = {
2629
'@fernthedev/serverless-offline-step-functions': {
@@ -41,7 +44,7 @@ class ServerlessOfflineStepFunctionsPlugin {
4144

4245
if (this.options?.enabled === false) {
4346
// Simulator Will not be executed
44-
Logger.getInstance().warning('Simulator will not execute.');
47+
this.logger.warning('Simulator will not execute.');
4548
return;
4649
}
4750

@@ -84,54 +87,72 @@ class ServerlessOfflineStepFunctionsPlugin {
8487
};
8588
}
8689

87-
private resolveHandlers(definedStateMachines: any) {
90+
private getFunctionName(stateOptions: StateDefinition): string {
91+
let functionName: string | undefined;
92+
const resource: string | Record<string, string[]> = (stateOptions as any).Resource;
93+
94+
if (typeof resource === 'string') {
95+
if (resource.endsWith('.waitForTaskToken')) {
96+
functionName = (stateOptions as TaskStateDefinition).Parameters?.FunctionName?.['Fn::GetAtt'][0];
97+
} else {
98+
functionName = resource.split('-').slice(-1)[0];
99+
}
100+
} else {
101+
// probably an object
102+
for (const [key, value] of Object.entries(resource)) {
103+
if (key === 'Fn::GetAtt') {
104+
functionName = value[0];
105+
}
106+
}
107+
}
108+
109+
if (!functionName) {
110+
throw Error(`Could not find funciton name for resource ${resource}`);
111+
}
112+
113+
return functionName;
114+
}
115+
116+
private setStateInfo(states: [string, StateDefinition][], stateMachineName: string) {
88117
const definedFunctions = this.serverless.service.initialServerlessConfig.functions;
89118
const statesInfoHandler = StateInfoHandler.getInstance();
90-
const definedStateMachinesArr = Object.entries(definedStateMachines);
119+
this.logger.debug(`ServerlessOfflineStepFunctionsPlugin - setStateInfo - ${states}`);
120+
121+
for (const [stateName, stateOptions] of states) {
122+
// TODO: Instead of checking the types here, we should create objects that have meaning
123+
if (stateOptions.Type === StateType.Map) {
124+
const stateDefinition: [string, StateDefinition][] = Object.entries(
125+
(stateOptions as MapStateDefinition).Iterator.States,
126+
);
127+
this.setStateInfo(stateDefinition, stateMachineName);
128+
continue;
129+
}
91130

92-
// Per StateMachine
93-
for (const [stateMachineName, stateMachineOptions] of definedStateMachinesArr) {
94-
const states = Object.entries((stateMachineOptions as any).definition.States);
131+
if (stateOptions.Type !== StateType.Task) {
132+
continue;
133+
}
95134

96-
// Per State in the StateMachine
97-
for (const [stateName, stateOptions] of states) {
98-
if (!(stateOptions as any)?.Resource) {
99-
// The State Machine in here could be a Pass, Failed or Wait
100-
break;
101-
}
135+
const functionName = this.getFunctionName(stateOptions as StateDefinition);
102136

103-
let functionName: string | undefined;
104-
const resource: string | Record<string, string[]> = (stateOptions as any).Resource;
105-
106-
// TODO: To extract this to a function/class
107-
if (typeof resource === 'string') {
108-
if (resource.endsWith('.waitForTaskToken')) {
109-
functionName = (stateOptions as TaskStateDefinition).Parameters?.FunctionName?.['Fn::GetAtt'][0];
110-
} else {
111-
functionName = resource.split('-').slice(-1)[0];
112-
}
113-
} else {
114-
// probably an object
115-
for (const [key, value] of Object.entries(resource)) {
116-
if (key === 'Fn::GetAtt') {
117-
functionName = value[0];
118-
}
119-
}
120-
}
137+
const { handler } = definedFunctions[functionName];
138+
const indexOfHandlerNameSeparator = handler.lastIndexOf('.');
139+
const handlerPath = handler.substring(0, indexOfHandlerNameSeparator);
140+
const handlerName = handler.substring(indexOfHandlerNameSeparator + 1);
141+
const environment: Record<string, string> | undefined = this.serverless.service.initialServerlessConfig
142+
?.functions[functionName]?.environment;
121143

122-
if (!functionName) {
123-
throw Error(`Could not find funciton name for resource ${resource}`);
124-
}
144+
statesInfoHandler.setStateInfo(stateMachineName, stateName, handlerPath, handlerName, environment);
145+
}
146+
}
125147

126-
const { handler } = definedFunctions[functionName];
127-
const indexOfHandlerNameSeparator = handler.lastIndexOf('.');
128-
const handlerPath = handler.substring(0, indexOfHandlerNameSeparator);
129-
const handlerName = handler.substring(indexOfHandlerNameSeparator + 1);
130-
const environment: Record<string, string> | undefined = this.serverless.service.initialServerlessConfig
131-
?.functions[functionName]?.environment;
148+
private resolveHandlers(definedStateMachines: any) {
149+
const definedStateMachinesArr = Object.entries(definedStateMachines);
132150

133-
statesInfoHandler.setStateInfo(stateMachineName, stateName, handlerPath, handlerName, environment);
134-
}
151+
// Per StateMachine
152+
for (const [stateMachineName, stateMachineOptions] of definedStateMachinesArr) {
153+
const states: [string, StateDefinition][] = Object.entries((stateMachineOptions as any).definition.States);
154+
155+
this.setStateInfo(states, stateMachineName);
135156
}
136157
}
137158

src/stateTasks/StateType.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ export enum StateType {
1616
// Adds branching logic to the state machine
1717
Choice = 'Choice',
1818
Parallel = 'Parallel',
19+
Map = 'Map',
1920
}

src/stateTasks/StateTypeExecutor.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
import { Context } from '../Context/Context';
22
import type { StateDefinition } from '../types/State';
33
import type { StateExecutorOutput } from '../types/StateExecutorOutput';
4+
import { Logger } from '../utils/Logger';
45

56
export abstract class StateTypeExecutor {
7+
protected logger: Logger;
8+
9+
constructor() {
10+
this.logger = Logger.getInstance();
11+
}
12+
613
abstract execute(
714
context: Context,
815
definition: StateDefinition,

src/stateTasks/StateTypeExecutorFactory.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { WaitExecutor } from './executors/WaitExecutor';
66
import { ChoiceExecutor } from './executors/ChoiceExecutor';
77
import { FailExecutor } from './executors/FailExecutor';
88
import { SucceedExecutor } from './executors/SucceedExecutor';
9+
import { MapExecutor } from './executors/MapExecutor';
910

1011
export class StateTypeExecutorFactory {
1112
private static STATE_TYPE_MAP = new Map<StateType, StateTypeExecutor>([
@@ -15,6 +16,7 @@ export class StateTypeExecutorFactory {
1516
[StateType.Choice, new ChoiceExecutor()],
1617
[StateType.Fail, new FailExecutor()],
1718
[StateType.Succeed, new SucceedExecutor()],
19+
[StateType.Map, new MapExecutor()],
1820
]);
1921

2022
public static getExecutor(type: StateType): StateTypeExecutor {

src/stateTasks/executors/FailExecutor.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
11
import { Context } from '../../Context/Context';
22
import type { FailStateDefinition } from '../../types/State';
33
import type { StateExecutorOutput } from '../../types/StateExecutorOutput';
4-
import { Logger } from '../../utils/Logger';
54
import { FailExecutorException } from '../exceptions/FailExecutorException';
65
import { StateTypeExecutor } from '../StateTypeExecutor';
76

87
export class FailExecutor extends StateTypeExecutor {
9-
private readonly logger: Logger;
10-
11-
constructor() {
12-
super();
13-
this.logger = Logger.getInstance();
14-
}
15-
168
public execute(context: Context, definition: FailStateDefinition): Promise<StateExecutorOutput> {
179
this.logger.error(`StateMachine "${context.StateMachine.Name}" Failed on "${context.State.Name}"`);
1810

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { StateTypeExecutor } from '../StateTypeExecutor';
2+
import type { StateExecutorOutput } from '../../types/StateExecutorOutput';
3+
import type { MapStateDefinition } from '../../types/State';
4+
5+
import { Context } from '../../Context/Context';
6+
import { ExecuteType, StateMachineExecutor } from '../../StateMachineExecutor';
7+
import { StateMachine } from '../../types/StateMachine';
8+
import { StateContext } from '../../Context/StateContext';
9+
10+
export class MapExecutor extends StateTypeExecutor {
11+
private pendingStateMachineExecutions: { [key: string]: ExecuteType } = {};
12+
13+
public async execute(
14+
context: Context,
15+
stateDefinition: MapStateDefinition,
16+
inputJson: string | undefined,
17+
): Promise<StateExecutorOutput> {
18+
const stateMachine: StateMachine = {
19+
name: stateDefinition.Comment || '',
20+
definition: stateDefinition.Iterator,
21+
};
22+
// TODO: Extract common logic from StepFunctionSimulatorServer
23+
// TODO: Make nested Maps work
24+
// TODO: Make the waitForTaskToken work in Map & nested maps
25+
26+
if (!inputJson) {
27+
throw new Error(`Undefined inputJson for state `);
28+
}
29+
30+
const iterable = JSON.parse(inputJson);
31+
type MyType = ExecuteType | string | void;
32+
const output: MyType[] = [];
33+
await Promise.all(
34+
iterable.map(async (value: unknown) => {
35+
const tempContext = context.clone();
36+
const stateName = stateDefinition.Iterator.StartAt;
37+
const stateContext = StateContext.create(stateName);
38+
tempContext.transitionTo(stateContext);
39+
40+
const sme = new StateMachineExecutor(stateMachine, tempContext);
41+
const startAtState = stateDefinition.Iterator.States[stateDefinition.Iterator.StartAt];
42+
43+
// TODO: Add the index so that everything we log can be followed more easily
44+
output.push(await sme.execute(startAtState, JSON.stringify(value)));
45+
}),
46+
);
47+
48+
return {
49+
Next: stateDefinition.Next,
50+
End: stateDefinition.End,
51+
json: JSON.stringify(output),
52+
};
53+
}
54+
}

src/stateTasks/executors/PassExecutor.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,9 @@ import type { StateExecutorOutput } from '../../types/StateExecutorOutput';
22
import type { PassStateDefinition } from '../../types/State';
33
import { StateProcessor } from '../../StateProcessor';
44
import { StateTypeExecutor } from '../StateTypeExecutor';
5-
import { Logger } from '../../utils/Logger';
65
import { Context } from '../../Context/Context';
76

87
export class PassExecutor extends StateTypeExecutor {
9-
private readonly logger: Logger;
10-
11-
constructor() {
12-
super();
13-
this.logger = Logger.getInstance();
14-
}
15-
168
public execute(
179
context: Context,
1810
definition: PassStateDefinition,

src/stateTasks/executors/SucceedExecutor.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,9 @@
11
import { Context } from '../../Context/Context';
22
import { SucceedStateDefinition } from '../../types/State';
33
import { StateExecutorOutput } from '../../types/StateExecutorOutput';
4-
import { Logger } from '../../utils/Logger';
54
import { StateTypeExecutor } from '../StateTypeExecutor';
65

76
export class SucceedExecutor extends StateTypeExecutor {
8-
private readonly logger: Logger;
9-
10-
constructor() {
11-
super();
12-
this.logger = Logger.getInstance();
13-
}
14-
157
public execute(
168
context: Context,
179
_definition: SucceedStateDefinition,

src/stateTasks/executors/TaskExecutor.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ export class TaskExecutor extends StateTypeExecutor {
9191
stateDefinition: TaskStateDefinition,
9292
context: Context,
9393
): StateExecutorOutput {
94+
this.logger.debug(`TaskExecutor - processInput1 - ${json}`);
9495
const proccessedInputJson = StateProcessor.processInputPath(json, stateDefinition.InputPath);
96+
this.logger.debug(`TaskExecutor - processInput2 - ${proccessedInputJson}`);
9597

9698
let output = proccessedInputJson;
9799

@@ -101,18 +103,25 @@ export class TaskExecutor extends StateTypeExecutor {
101103
output = StateProcessor.processParameters(proccessedInputJson, stateDefinition.Parameters);
102104
}
103105

104-
return JSON.parse(output);
106+
try {
107+
return JSON.parse(output);
108+
} catch (error) {
109+
this.logger.error(`processInput: Could not parse JSON for state ${context.State.Name}: "${output}"`);
110+
throw error;
111+
}
105112
}
106113

107114
private processOutput(
108115
input: Record<string, unknown>,
109116
output: Record<string, unknown>,
110117
stateDefinition: TaskStateDefinition,
111118
): string {
119+
this.logger.debug(`TaskExecutor - processOutput1 - ${output}`);
112120
let outputJson = output ? JSON.stringify(output) : '{}';
113121

114122
// TODO: Do Result Selector
115123
outputJson = StateProcessor.processResultPath(input, output, stateDefinition.ResultPath);
124+
this.logger.debug(`TaskExecutor - processOutput2 - ${outputJson}`);
116125
outputJson = StateProcessor.processOutputPath(outputJson, stateDefinition.OutputPath);
117126

118127
return outputJson;

src/stateTasks/executors/WaitExecutor.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,10 @@ import { StateProcessor } from '../../StateProcessor';
55

66
import type { WaitStateDefinition } from '../../types/State';
77
import type { StateExecutorOutput } from '../../types/StateExecutorOutput';
8-
import { Logger } from '../../utils/Logger';
98
import { validateTimestamp } from '../../utils/validateTimestamp';
109
import { StateTypeExecutor } from '../StateTypeExecutor';
1110

1211
export class WaitExecutor extends StateTypeExecutor {
13-
private readonly logger: Logger;
14-
15-
constructor() {
16-
super();
17-
this.logger = Logger.getInstance();
18-
}
19-
2012
public async execute(
2113
context: Context,
2214
definition: WaitStateDefinition,

src/types/State.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ export type ParallelStateDefinition = CommonStateDefinition & {
145145

146146
export type StateDefinition =
147147
| PassStateDefinition
148+
| MapStateDefinition
148149
| TaskStateDefinition
149150
| FailStateDefinition
150151
| SucceedStateDefinition

0 commit comments

Comments
 (0)