Skip to content

Commit 6f3b819

Browse files
Merge pull request #208 from SumoLogic/sumo_246483
Added support to subscribe existing log groups by User defined Tags
2 parents e86cb57 + 320d7c2 commit 6f3b819

File tree

7 files changed

+168
-54
lines changed

7 files changed

+168
-54
lines changed

loggroup-lambda-connector/Readme.md

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# SumoLogic LogGroup Connector
22
This is used to automatically subscribe newly created and existing Cloudwatch LogGroups to a Lambda function.
33

4-
**Note:**
4+
> **Note:**
55
For existing CloudWatch LogGroups, a Lambda function can subscribe to up to 65,000 LogGroups.
66
If the number of LogGroups exceeds 65,000, you can request to disable Lambda recursive loop detection by [contact AWS Support](https://repost.aws/knowledge-center/aws-phone-support).
77

@@ -25,13 +25,30 @@ Made with ❤️ by Sumo Logic. Available on the [AWS Serverless Application Rep
2525

2626

2727
### Configuring Lambda
28-
It has two environment variables
29-
30-
**LOG_GROUP_PATTERN**: This is a javascript regex to filter out loggroups. Only loggroups which match this pattern will be subscribed to the lambda function.Do not use '/' while writing the pattern and it is case insensitive.
31-
32-
```
33-
Test - will match testlogroup, logtestgroup and LogGroupTest
28+
#### Environment variables
29+
30+
**LOG_GROUP_PATTERN**: This JavaScript regex is used to filter log groups. Only log groups that match this pattern will be subscribed to the Lambda function. The default value is `Test`, which will match log groups like `testlogroup`, `logtestgroup`, and `LogGroupTest`.
31+
32+
##### Use Cases and it's Regex Pattern Example
33+
34+
| Case Description | Regex Pattern Example |
35+
|----------------------------------------------------------------------|-------------------------------------|
36+
| To subscribe all loggroup | `/*` or (leave empty) |
37+
| To subscribe all loggroup paths only | `/` |
38+
| To subscribe all loggroup of aws services | `/aws/*` |
39+
| To subscribe to loggroups for only one service, such as Lambda | `/aws/lambda/*` |
40+
| To subscribe loggroup multiple services like lambda, rds, apigateway | `/aws/(lambda\|rds\|apigateway)` |
41+
| To subscribe loggroup by key word like `Test` or `Prod` | `Test` or `Prod` [Case insensitive] |
42+
| Don't subscribe if `LOG_GROUP_PATTERN` | `^$` |
43+
44+
**LOG_GROUP_TAGS**: This is used to filter log groups based on tags. Only log groups that match any of the specified key-value pairs will be subscribed to the Lambda function. It is case-sensitive.
45+
#### For example
46+
```bash
47+
LOG_GROUP_TAGS="Environment=Production,Application=MyApp"
3448
```
49+
> 💡 **Tip**: To filter log groups based on tags only, set `LOG_GROUP_PATTERN=^$`.
50+
51+
> **Note**: `LOG_GROUP_PATTERN` and `LOG_GROUP_TAGS` can be used together to subscribe to log groups or can be used separately.
3552
3653
**DESTINATION_ARN**: This specifies ARN of the Destination to Subscribe the log group.
3754

@@ -54,8 +71,6 @@ Lambda Destination ARN :- This specifies ARN of the Lambda function. Also you ha
5471

5572
Kinesis Destination ARN :- This specifies the ARN of the kinesis Stream.
5673

57-
**LOG_GROUP_TAGS**: This is used for filtering out loggroups based on tags.Only loggroups which match any one of the key value pairs will be subscribed to the lambda function. This works only for new loggroups not existing loggroups.
58-
5974
**ROLE_ARN** : This is used when subscription destination ARN is kinesis firehose stream.
6075

6176
### For Developers

loggroup-lambda-connector/sam/packaged.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ Metadata:
2121
- serverless
2222
- loggroups
2323
- cloudwatch
24-
LicenseUrl: s3://appdevstore/LoggroupConnector/v1.0.12/6092dd6c323e33634657102f570628e0
24+
LicenseUrl: s3://appdevstore/LoggroupConnector/v1.0.14/6092dd6c323e33634657102f570628e0
2525
Name: sumologic-loggroup-connector
26-
ReadmeUrl: s3://appdevstore/LoggroupConnector/v1.0.12/999bf8292d709fcf681946996eb9071d
27-
SemanticVersion: 1.0.12
26+
ReadmeUrl: s3://appdevstore/LoggroupConnector/v1.0.14/60b531a8a7a836857dd096ea058dc2c6
27+
SemanticVersion: 1.0.14
2828
SourceCodeUrl: https://github.com/SumoLogic/sumologic-aws-lambda/tree/main/loggroup-lambda-connector
2929
SpdxLicenseId: Apache-2.0
3030
Parameters:
@@ -107,7 +107,7 @@ Resources:
107107
SumoLogGroupLambdaConnector:
108108
Type: AWS::Serverless::Function
109109
Properties:
110-
CodeUri: s3://appdevstore/LoggroupConnector/v1.0.12/6b0658f101eead0c32f0fd36c4cbedd1
110+
CodeUri: s3://appdevstore/LoggroupConnector/v1.0.14/5a44aebff6ae18483b1b5d082d112e85
111111
Handler: loggroup-lambda-connector.handler
112112
Runtime: nodejs20.x
113113
Environment:
@@ -130,6 +130,7 @@ Resources:
130130
- logs:DescribeLogGroups
131131
- logs:DescribeLogStreams
132132
- logs:PutSubscriptionFilter
133+
- logs:ListTagsLogGroup
133134
Resource:
134135
- Fn::Sub: arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*
135136
- Sid: InvokePolicy

loggroup-lambda-connector/sam/sam_package.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ else
1010
AWS_REGION="us-east-2"
1111
fi
1212

13-
version="1.0.11"
13+
version="1.0.14"
1414

1515
sam package --template-file template.yaml --s3-bucket $SAM_S3_BUCKET --output-template-file packaged.yaml --s3-prefix "LoggroupConnector/v$version" --region $AWS_REGION
1616

loggroup-lambda-connector/sam/template.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Metadata:
2424
LicenseUrl: ../LICENSE
2525
Name: sumologic-loggroup-connector
2626
ReadmeUrl: ../Readme.md
27-
SemanticVersion: 1.0.12
27+
SemanticVersion: 1.0.14
2828
SourceCodeUrl: https://github.com/SumoLogic/sumologic-aws-lambda/tree/main/loggroup-lambda-connector
2929
SpdxLicenseId: Apache-2.0
3030

@@ -114,6 +114,7 @@ Resources:
114114
- logs:DescribeLogGroups
115115
- logs:DescribeLogStreams
116116
- logs:PutSubscriptionFilter
117+
- logs:ListTagsLogGroup
117118
Resource:
118119
- !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*'
119120
- Sid: InvokePolicy

loggroup-lambda-connector/src/loggroup-lambda-connector.js

Lines changed: 86 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,82 @@
1-
const { CloudWatchLogsClient, PutSubscriptionFilterCommand, DescribeLogGroupsCommand } = require("@aws-sdk/client-cloudwatch-logs");
1+
const { CloudWatchLogsClient, PutSubscriptionFilterCommand, DescribeLogGroupsCommand, ListTagsLogGroupCommand } = require("@aws-sdk/client-cloudwatch-logs");
22
const { LambdaClient, InvokeCommand } = require("@aws-sdk/client-lambda");
33

44
const cwl = new CloudWatchLogsClient();
55
const lambda = new LambdaClient({ apiVersion: '2015-03-31' }); // Update to the appropriate Lambda API version you require
66
const maxRetryCounter = 3;
77
const timeoutThreshold = 12000;
88

9+
10+
function validateRegex(pattern) {
11+
try {
12+
// Attempt to create a RegExp object with the provided pattern
13+
return new RegExp(pattern, "i");
14+
} catch (e) {
15+
// Throw an error with a descriptive message if the pattern is invalid
16+
throw new Error(`Invalid regular expression pattern: ${pattern}. Error: ${e.message}`);
17+
}
18+
}
19+
20+
async function getTagsByLogGroupName(logGroupName, retryCounter=0) {
21+
var tags = {};
22+
const input = {
23+
logGroupName: logGroupName, // required
24+
};
25+
try {
26+
// ListTagsLogGroupRequest
27+
let response = await cwl.send(new ListTagsLogGroupCommand(input));
28+
tags = response.tags
29+
} catch (err) {
30+
if (err && err.message === "Rate exceeded" && retryCounter <= maxRetryCounter) {
31+
retryCounter += 1
32+
const delayTime = Math.pow(2, retryCounter) * 2000; // Exponential backoff
33+
console.log(`ThrottlingException encountered for ${logGroupName}. Retrying in ${delayTime}ms...Attempt ${retryCounter}/${maxRetryCounter}`);
34+
await delay(delayTime);
35+
await getTagsByLogGroupName(logGroupName, retryCounter);
36+
} else {
37+
console.error(`Failed to get tags for ${logGroupName} due to ${err}`)
38+
}
39+
}
40+
return tags
41+
}
42+
43+
function IsTagMatchToLogGroup(tagMatcherForLogGroup, logGroupTags) {
44+
if (tagMatcherForLogGroup && logGroupTags) {
45+
let tagMatcherList = tagMatcherForLogGroup.split(",");
46+
console.log("logGroupTags: ", logGroupTags);
47+
let tag, key, value;
48+
for (let i = 0; i < tagMatcherList.length; i++) {
49+
tag = tagMatcherList[i].split("=");
50+
key = tag[0].trim();
51+
value = tag[1].trim();
52+
if (logGroupTags[key] && logGroupTags[key] == value) {
53+
return true;
54+
}
55+
}
56+
}
57+
return false;
58+
}
59+
60+
async function filterExistingLogGroups(logGroupName, logGroupRegex) {
61+
if (logGroupName.match(logGroupRegex)) {
62+
return true;
63+
}
64+
var logGroupTags = await getTagsByLogGroupName(logGroupName)
65+
var tagMatcherForLogGroup = process.env.LOG_GROUP_TAGS
66+
console.log("Filtering log group:", logGroupName, "with tags:", logGroupTags);
67+
return IsTagMatchToLogGroup(tagMatcherForLogGroup, logGroupTags)
68+
}
69+
70+
function filterNewLogGroups(event, logGroupRegex) {
71+
var logGroupName = event.detail.requestParameters.logGroupName;
72+
if (logGroupName.match(logGroupRegex) && event.detail.eventName === "CreateLogGroup") {
73+
return true;
74+
}
75+
var logGroupTags = event.detail.requestParameters.tags;
76+
var tagMatcherForLogGroup = process.env.LOG_GROUP_TAGS
77+
return IsTagMatchToLogGroup(tagMatcherForLogGroup, logGroupTags)
78+
}
79+
980
async function createSubscriptionFilter(lambdaLogGroupName, destinationArn, roleArn, additionalArgs) {
1081
var params={};
1182
if (destinationArn.startsWith("arn:aws:lambda")) {
@@ -37,46 +108,25 @@ async function createSubscriptionFilter(lambdaLogGroupName, destinationArn, role
37108
}
38109
}
39110

40-
function filterLogGroups(event, logGroupRegex) {
41-
logGroupRegex = new RegExp(logGroupRegex, "i");
42-
let logGroupName = event.detail.requestParameters.logGroupName;
43-
if (logGroupName.match(logGroupRegex) && event.detail.eventName === "CreateLogGroup") {
44-
return true;
45-
}
46-
let lg_tags = event.detail.requestParameters.tags;
47-
if (process.env.LOG_GROUP_TAGS && lg_tags) {
48-
console.log("tags in loggroup: ", lg_tags);
49-
var tags_array = process.env.LOG_GROUP_TAGS.split(",");
50-
let tag, key, value;
51-
for (let i = 0; i < tags_array.length; i++) {
52-
tag = tags_array[i].split("=");
53-
key = tag[0].trim();
54-
value = tag[1].trim();
55-
if (lg_tags[key] && lg_tags[key] == value) {
56-
return true;
57-
}
58-
}
59-
}
60-
return false;
61-
}
62-
63111
async function subscribeExistingLogGroups(logGroups, retryCounter, additionalArgs) {
64-
var logGroupRegex = new RegExp(process.env.LOG_GROUP_PATTERN, "i");
112+
var logGroupRegex = validateRegex(process.env.LOG_GROUP_PATTERN);
113+
console.log("logGroupRegexPattern: ", logGroupRegex);
65114
var destinationArn = process.env.DESTINATION_ARN;
66115
var roleArn = process.env.ROLE_ARN;
67116
const failedLogGroupNames = [];
68117
await logGroups.reduce(async (previousPromise, nextLogGroup) => {
69118
await previousPromise;
70119
const { logGroupName } = nextLogGroup;
71-
if (!logGroupName.match(logGroupRegex)) {
72-
console.log("Unmatched logGroup: ", logGroupName);
73-
return Promise.resolve();
74-
} else {
120+
let filterStatus = await filterExistingLogGroups(logGroupName, logGroupRegex);
121+
if (filterStatus) {
75122
return createSubscriptionFilter(logGroupName, destinationArn, roleArn, additionalArgs).catch(function (err) {
76123
if (err && err.message === "Rate exceeded") {
77124
failedLogGroupNames.push({ logGroupName: logGroupName });
78125
}
79126
});
127+
} else {
128+
console.log("Unmatched logGroup: ", logGroupName);
129+
return Promise.resolve();
80130
}
81131
}, Promise.resolve());
82132

@@ -128,7 +178,7 @@ async function processExistingLogGroups(context, token, additionalArgs, errorHan
128178
}
129179

130180
async function invoke_lambda(context, token, additionalArgs, errorHandler) {
131-
var payload = { "existingLogs": "true", "token": token, "additionalArgs": additionalArgs};
181+
var payload = {"existingLogs": "true", "token": token, "additionalArgs": additionalArgs};
132182
try {
133183
await lambda.send(new InvokeCommand({
134184
InvocationType: 'Event',
@@ -146,7 +196,9 @@ async function delay(ms) {
146196

147197
async function processEvents(env, event, additionalArgs, errorHandler, retryCounter=0) {
148198
var logGroupName = event.detail.requestParameters.logGroupName;
149-
if (filterLogGroups(event, env.LOG_GROUP_PATTERN)) {
199+
var logGroupRegex = validateRegex(env.LOG_GROUP_PATTERN);
200+
console.log("logGroupRegex: ", logGroupRegex);
201+
if (filterNewLogGroups(event, logGroupRegex)) {
150202
console.log("Subscribing: ", logGroupName, env.DESTINATION_ARN);
151203
try {
152204
await createSubscriptionFilter(logGroupName, env.DESTINATION_ARN, env.ROLE_ARN, additionalArgs);
@@ -183,6 +235,9 @@ exports.handler = async function (event, context, callback) {
183235
callback(null, "Success");
184236
}
185237
}
238+
if (!process.env.LOG_GROUP_PATTERN || process.env.LOG_GROUP_PATTERN.trim().length === 0) {
239+
console.warn("LOG_GROUP_PATTERN is empty, it will subscribe to all loggroups");
240+
}
186241
if (event.existingLogs == "true") {
187242
await processExistingLogGroups(context, event.token, additionalArgs, errorHandler);
188243
} else {

loggroup-lambda-connector/test/test-template.yaml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ Parameters:
2323
AllowedValues: [ "true", "false" ]
2424
Description: "Select true for subscribing existing logs"
2525

26+
LogGroupTags:
27+
Type: String
28+
Default: ""
29+
Description: Enter comma separated keyvalue pairs for filtering logGroups using
30+
tags. Ex KeyName1=string,KeyName2=string. This is optional leave it blank if
31+
tag based filtering is not needed.
32+
2633
BucketName:
2734
Type: String
2835
Default: ""
@@ -72,7 +79,7 @@ Resources:
7279
print("success")
7380
Handler: index.lambda_handler
7481
MemorySize: 128
75-
Runtime: python3.7
82+
Runtime: python3.12
7683
Timeout: 60
7784
Role: !GetAtt LambdaRole.Arn
7885

@@ -214,6 +221,7 @@ Resources:
214221
DestinationArnValue: !If [ create_invoke_permission, !GetAtt DummyLambda.Arn, !GetAtt KinesisLogsDeliveryStream.Arn ]
215222
LogGroupPattern: !Ref LogGroupPattern
216223
UseExistingLogs: !Ref UseExistingLogs
224+
LogGroupTags: !Ref LogGroupTags
217225
RoleArn: !If [ create_invoke_permission, "", !GetAtt KinesisLogsRole.Arn ]
218226

219227
Outputs:

loggroup-lambda-connector/test/test_loggroup_lambda_connector.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,38 @@ def test_4_existing_kinesis(self):
7272
#self.invoke_lambda()
7373
self.assert_subscription_filter("SumoLGLBDFilter")
7474

75-
def create_stack_parameters(self, destination, existing, pattern='test'):
75+
def test_5_matching_existing_loggroup_with_pattern_and_tag(self):
76+
self.create_log_group_with_tag()
77+
self.create_stack(self.stack_name, self.template_data,
78+
self.create_stack_parameters("Kinesis","true", loggroup_tag='env=prod'))
79+
print("Testing Stack Creation")
80+
self.assertTrue(self.stack_exists(self.stack_name))
81+
#self.invoke_lambda()
82+
self.assert_subscription_filter("SumoLGLBDFilter")
83+
84+
def test_6_matching_existing_loggroup_by_tag_only(self):
85+
self.create_log_group_with_tag()
86+
self.create_stack(self.stack_name, self.template_data,
87+
self.create_stack_parameters("Kinesis","true", loggroup_pattern='^$',
88+
loggroup_tag='username=akhil'))
89+
print("Testing Stack Creation")
90+
self.assertTrue(self.stack_exists(self.stack_name))
91+
#self.invoke_lambda()
92+
self.assert_subscription_filter("SumoLGLBDFilter")
93+
94+
def create_stack_parameters(self, destination, existing, loggroup_pattern='test', loggroup_tag=''):
7695
return [
7796
{
7897
'ParameterKey': 'DestinationType',
7998
'ParameterValue': destination
8099
},
81100
{
82101
'ParameterKey': 'LogGroupPattern',
83-
'ParameterValue': pattern
102+
'ParameterValue': loggroup_pattern
103+
},
104+
{
105+
'ParameterKey': 'LogGroupTags',
106+
'ParameterValue': loggroup_tag
84107
},
85108
{
86109
'ParameterKey': 'UseExistingLogs',
@@ -137,6 +160,16 @@ def create_log_group(self):
137160
response = self.log_group_client.create_log_group(logGroupName=self.log_group_name)
138161
print("creating log group", response)
139162

163+
def create_log_group_with_tag(self):
164+
tags = {
165+
'team': 'apps',
166+
'env': 'prod'
167+
}
168+
self.log_group_name = 'mytag-%s' % (datetime.datetime.now().strftime("%d-%m-%y-%H-%M-%S"))
169+
print("Loggroup Name", self.log_group_name)
170+
response = self.log_group_client.create_log_group(logGroupName=self.log_group_name, tags=tags)
171+
print("creating log group", response)
172+
140173
def assert_subscription_filter(self, filter_name):
141174
sleep(60)
142175
response = self.log_group_client.describe_subscription_filters(
@@ -205,7 +238,8 @@ def create_sam_package_and_upload():
205238

206239
def _run(command, input=None, check=False, **kwargs):
207240
if sys.version_info >= (3, 5):
208-
return subprocess.run(command, capture_output=True)
241+
result = subprocess.run(command, capture_output=True)
242+
return result.returncode, result.stdout, result.stderr
209243
if input is not None:
210244
if 'stdin' in kwargs:
211245
raise ValueError('stdin and input arguments may not both be used.')
@@ -226,11 +260,11 @@ def _run(command, input=None, check=False, **kwargs):
226260

227261

228262
def run_command(cmdargs):
229-
resp = _run(cmdargs)
230-
if len(resp.stderr.decode()) > 0:
263+
retcode, stdout, stderr = _run(cmdargs)
264+
if retcode != 0:
231265
# traceback.print_exc()
232-
raise Exception("Error in run command %s cmd: %s" % (resp, cmdargs))
233-
return resp.stdout
266+
raise Exception("Error in run command %s cmd: %s" % (stderr, cmdargs))
267+
return retcode, stdout, stderr
234268

235269

236270
if __name__ == '__main__':

0 commit comments

Comments
 (0)