Skip to content

Commit 7f14b11

Browse files
Fixed recursive invoked loop
1 parent 316f2a7 commit 7f14b11

File tree

1 file changed

+83
-51
lines changed

1 file changed

+83
-51
lines changed

loggroup-lambda-connector/src/loggroup-lambda-connector.js

Lines changed: 83 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ const { LambdaClient, InvokeCommand } = require("@aws-sdk/client-lambda");
44
const cwl = new CloudWatchLogsClient();
55
const lambda = new LambdaClient({ apiVersion: '2015-03-31' }); // Update to the appropriate Lambda API version you require
66
const maxRetryCounter = 3;
7+
const timeoutThreshold = 12000;
78

8-
async function createSubscriptionFilter(lambdaLogGroupName, destinationArn, roleArn) {
9-
var params={};
9+
async function createSubscriptionFilter(lambdaLogGroupName, destinationArn, roleArn, additionalArgs) {
10+
var params={};
1011
if (destinationArn.startsWith("arn:aws:lambda")) {
1112
params = {
12-
destinationArn: destinationArn,
13+
destinationArn: destinationArn,
1314
filterName: 'SumoLGLBDFilter',
1415
filterPattern: '',
1516
logGroupName: lambdaLogGroupName
@@ -28,6 +29,7 @@ async function createSubscriptionFilter(lambdaLogGroupName, destinationArn, role
2829
try {
2930
const cmd = new PutSubscriptionFilterCommand(params);
3031
await cwl.send(cmd);
32+
additionalArgs.subscribeCount += 1
3133
console.log("Successfully subscribed logGroup: ", lambdaLogGroupName);
3234
} catch (err) {
3335
console.log("Error in subscribing", lambdaLogGroupName, err);
@@ -58,7 +60,7 @@ function filterLogGroups(event, logGroupRegex) {
5860
return false;
5961
}
6062

61-
async function subscribeExistingLogGroups(logGroups, retryCounter) {
63+
async function subscribeExistingLogGroups(logGroups, retryCounter, additionalArgs) {
6264
var logGroupRegex = new RegExp(process.env.LOG_GROUP_PATTERN, "i");
6365
var destinationArn = process.env.DESTINATION_ARN;
6466
var roleArn = process.env.ROLE_ARN;
@@ -70,8 +72,8 @@ async function subscribeExistingLogGroups(logGroups, retryCounter) {
7072
console.log("Unmatched logGroup: ", logGroupName);
7173
return Promise.resolve();
7274
} else {
73-
return createSubscriptionFilter(logGroupName, destinationArn, roleArn).catch(function (err) {
74-
if (err && err.code == "ThrottlingException") {
75+
return createSubscriptionFilter(logGroupName, destinationArn, roleArn, additionalArgs).catch(function (err) {
76+
if (err && err.message === "Rate exceeded") {
7577
failedLogGroupNames.push({ logGroupName: logGroupName });
7678
}
7779
});
@@ -80,80 +82,110 @@ async function subscribeExistingLogGroups(logGroups, retryCounter) {
8082

8183
if (retryCounter <= maxRetryCounter && failedLogGroupNames.length > 0) {
8284
console.log("Retrying Subscription for Failed Log Groups due to throttling with counter number as " + retryCounter);
83-
await subscribeExistingLogGroups(failedLogGroupNames, retryCounter + 1);
85+
await subscribeExistingLogGroups(failedLogGroupNames, retryCounter + 1, additionalArgs);
8486
}
8587
}
8688

87-
async function processExistingLogGroups(token, context, errorHandler) {
89+
async function processExistingLogGroups(context, token, additionalArgs, errorHandler) {
8890
var params = { limit: 50 };
8991
if (token) {
9092
params = {
9193
limit: 50,
9294
nextToken: token
9395
};
9496
}
95-
97+
9698
try {
99+
console.log("Previous record count " + additionalArgs.recordCount);
97100
const data = await cwl.send(new DescribeLogGroupsCommand(params));
98-
console.log(
99-
"fetched logGroups: " + data.logGroups.length + " nextToken: " + data.nextToken
100-
);
101-
await subscribeExistingLogGroups(data.logGroups, 1);
102-
101+
additionalArgs.recordCount += data.logGroups.length;
102+
console.log("Updated record count " + additionalArgs.recordCount);
103+
await subscribeExistingLogGroups(data.logGroups, 1, additionalArgs);
104+
console.log("Updated subscribeCount " + additionalArgs.subscribeCount);
103105
if (data.nextToken) {
104-
console.log(
105-
"Log Groups remaining...Calling the lambda again with token " + data.nextToken
106-
);
107-
await invoke_lambda(context, data.nextToken, errorHandler);
108-
console.log("Lambda invoke complete with token " + data.nextToken);
106+
const remainingTime = context.getRemainingTimeInMillis(); // 60000
107+
const diffTime = remainingTime - timeoutThreshold // 14552-12000=2792
108+
if (diffTime < timeoutThreshold) {
109+
additionalArgs.invokeCount += 1
110+
console.log("Lambda invoke complete with token "+ data.nextToken);
111+
console.log("InvokeCount " + additionalArgs.invokeCount);
112+
await invoke_lambda(context, data.nextToken, additionalArgs, errorHandler);
113+
return
114+
}
115+
console.log("Remaining time " + remainingTime);
116+
console.log("Log Groups remaining...Calling the lambda again with token " + data.nextToken);
117+
await processExistingLogGroups(context, data.nextToken, additionalArgs, errorHandler)
109118
} else {
110-
console.log("All Log Groups are subscribed to Destination Type " + process.env.DESTINATION_ARN);
119+
console.log("Total " + additionalArgs.subscribeCount + " out of " + additionalArgs.recordCount
120+
+ " Log Groups are subscribed to Destination Type "
121+
+ process.env.DESTINATION_ARN);
122+
console.log("Last invokeCount " + additionalArgs.invokeCount);
111123
errorHandler(null, "Success");
112124
}
113125
} catch (err) {
114126
errorHandler(err, "Error in fetching logGroups");
115127
}
116128
}
117-
118-
async function invoke_lambda(context, token, errorHandler) {
119-
var payload = { "existingLogs": "true", "token": token };
120-
try {
121-
await lambda.send(new InvokeCommand({
122-
InvocationType: 'Event',
123-
FunctionName: context.functionName,
124-
Payload: JSON.stringify(payload)
125-
}));
126-
} catch (err) {
129+
130+
async function invoke_lambda(context, token, additionalArgs, errorHandler) {
131+
var payload = { "existingLogs": "true", "token": token, "additionalArgs": additionalArgs};
132+
try {
133+
await lambda.send(new InvokeCommand({
134+
InvocationType: 'Event',
135+
FunctionName: context.functionName,
136+
Payload: JSON.stringify(payload)
137+
}));
138+
} catch (err) {
127139
errorHandler(err, "Error invoking Lambda");
128-
}
129140
}
130-
131-
async function processEvents(env, event, errorHandler) {
132-
var logGroupName = event.detail.requestParameters.logGroupName;
133-
if (filterLogGroups(event, env.LOG_GROUP_PATTERN)) {
134-
console.log("Subscribing: ", logGroupName, env.DESTINATION_ARN);
135-
await createSubscriptionFilter(logGroupName, env.DESTINATION_ARN, env.ROLE_ARN)
136-
.catch(function (err) {
137-
errorHandler(err, "Error in Subscribing.");
138-
});
139-
} else {
141+
}
142+
143+
async function delay(ms) {
144+
return new Promise(resolve => setTimeout(resolve, ms));
145+
}
146+
147+
async function processEvents(env, event, additionalArgs, errorHandler, retryCounter=0) {
148+
var logGroupName = event.detail.requestParameters.logGroupName;
149+
if (filterLogGroups(event, env.LOG_GROUP_PATTERN)) {
150+
console.log("Subscribing: ", logGroupName, env.DESTINATION_ARN);
151+
try {
152+
await createSubscriptionFilter(logGroupName, env.DESTINATION_ARN, env.ROLE_ARN, additionalArgs);
153+
} catch (err) {
154+
errorHandler(err, "Error in Subscribing.");
155+
if (err && err.message === "Rate exceeded" && retryCounter <= maxRetryCounter) {
156+
retryCounter += 1
157+
const delayTime = Math.pow(2, retryCounter) * 1000; // Exponential backoff
158+
console.log(`ThrottlingException encountered. Retrying in ${delayTime}ms...Attempt ${retryCounter}/${maxRetryCounter}`);
159+
await delay(delayTime);
160+
await processEvents(env, event, additionalArgs, errorHandler, retryCounter);
161+
}
162+
};
163+
} else {
140164
console.log("Unmatched: ", logGroupName, env.DESTINATION_ARN);
141-
}
142165
}
143-
144-
exports.handler = async function (event, context, callback) {
145-
console.log("Invoking Log Group connector function");
146-
function errorHandler(err, msg) {
147-
if (err) {
148-
console.log(err, msg);
166+
}
167+
168+
exports.handler = async function (event, context, callback) {
169+
let additionalArgs = {
170+
recordCount: 0,
171+
subscribeCount: 0,
172+
invokeCount: 0
173+
};
174+
if (event.additionalArgs) {
175+
additionalArgs = event.additionalArgs
176+
}
177+
console.log("Invoking Log Group connector function");
178+
function errorHandler(err, msg) {
179+
if (err) {
180+
console.log(err, msg);
149181
callback(err);
150182
} else {
151183
callback(null, "Success");
152184
}
153185
}
154186
if (event.existingLogs == "true") {
155-
await processExistingLogGroups(event.token, context, errorHandler);
187+
await processExistingLogGroups(context, event.token, additionalArgs, errorHandler);
156188
} else {
157-
await processEvents(process.env, event, errorHandler);
189+
await processEvents(process.env, event, additionalArgs, errorHandler);
158190
}
159-
};
191+
};

0 commit comments

Comments
 (0)