Skip to content

Commit 23b771a

Browse files
authored
Merge pull request #68 from SumoLogic/hpal_SUMO-94401_kinesisfirehose
SUMO-94401: added CF templates/function/readme
2 parents 0887197 + d7aa54c commit 23b771a

File tree

5 files changed

+313
-0
lines changed

5 files changed

+313
-0
lines changed

kinesisfirehose-processor/Readme.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Kinesis Firehose Processor
2+
This function is used for transforming streaming data from kinesis firehose before it sents to destination.
3+
Other use cases might include normalizing data produced by different producers, adding metadata to the record, or converting incoming data to a format suitable for the destination. In Sumo Logic's perspective it solves the problem of adding delimters between consecutive records so that they can be easily processed by Sumo Logic's Hosted Collector configured with [S3 source](https://help.sumologic.com/Send-Data/Sources/02Sources-for-Hosted-Collectors/Amazon_Web_Services/AWS_S3_Source).
4+
5+
# How it works
6+
When you enable Firehose data transformation, Firehose buffers incoming data and invokes the specified Lambda function with each buffered batch asynchronously. The transformed data is sent from Lambda to Firehose for buffering and then delivered to the destination.
7+
8+
### Creating Stack in AWS Cloudformation
9+
you can create the stack by using [aws-cli](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-cli-creating-stack.html) or directly from aws console using webbrowser and uploading kinesisfirehose-lambda-sam.yaml. For more details checkout it's [documentation](https://help.sumologic.com/?cid=39393)
10+
Sumo Logic provides a Cloudformation [template](https://s3.amazonaws.com/appdev-cloudformation-templates/kinesisfirehose-lambda-cft.json) for creating the lambda function download and use it for creating the stack.
11+
12+
### Setting up the Lambda Function
13+
Below instructions assumes that the delivery stream already exists.One can also configure the lambda at the time of delivery stream creation. Refer [Setting up Delivery Stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)
14+
* Go to https://console.aws.amazon.com/firehose/home
15+
* Click on your delivery stream
16+
* In Details Tab, click on edit
17+
* In the edit window, Under Transform source records with AWS Lambda section enable the Source record transformation option. Now a bunch of options will be visible.
18+
* In Lambda function select the function(starting with SumoKFLambdaProcessor) created by Cloudformation template.
19+
* (Optional) you can set buffer size(lambda is invoked with this buffered batch) and buffer interval.
20+
* Now scroll up and click on create new or update button beside IAM Role.
21+
* In the new window click allow to give lambda invoke permission to Amazon Kinesis Firehose.
22+
* Now click on Save
23+
24+
### Testing your Lambda Function
25+
* Go to https://console.aws.amazon.com/firehose/home
26+
* Click on your delivery stream
27+
* Expand the Test with demo data section.
28+
* Click on Start sending demo data. After few minutes you can see transformed data in your configured S3 bucket destination.
29+
* You can view logs of lambda function in AWS Cloudwatch (LogGroup name beginning with /aws/lambda/SumoKFLambdaProcessor)
30+
31+
### For Developers
32+
33+
Installing Dependencies
34+
```
35+
npm install
36+
```
37+
38+
Building zip file
39+
```
40+
npm run build
41+
```
42+
Upload the generated kinesisfirehose-processor.zip in S3 bucket(don't forget to change bucket name and key in cloudformation template)
43+
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
{
2+
"AWSTemplateFormatVersion": "2010-09-09",
3+
"Resources": {
4+
"SumoKFLambdaExecutionRole": {
5+
"Type": "AWS::IAM::Role",
6+
"Properties": {
7+
"AssumeRolePolicyDocument": {
8+
"Version": "2012-10-17",
9+
"Statement": [{
10+
"Effect": "Allow",
11+
"Principal": {"Service": ["lambda.amazonaws.com"] },
12+
"Action": ["sts:AssumeRole"]
13+
} ]
14+
},
15+
"Path": "/",
16+
"Policies": [
17+
{
18+
"PolicyName": { "Fn::Join": [ "-", [ "CloudWatchCreateLogsRolePolicy", { "Fn::Select" : [ "2", {"Fn::Split" : [ "/" , { "Ref": "AWS::StackId" } ]}] } ] ] },
19+
"PolicyDocument": {
20+
"Version": "2012-10-17",
21+
"Statement": [{
22+
"Effect": "Allow",
23+
"Action": [
24+
"logs:CreateLogGroup",
25+
"logs:CreateLogStream",
26+
"logs:PutLogEvents",
27+
"logs:DescribeLogGroups",
28+
"logs:DescribeLogStreams",
29+
"logs:PutSubscriptionFilter",
30+
"logs:DescribeSubscriptionFilters"
31+
],
32+
"Resource": [
33+
{ "Fn::Join": [ ":", ["arn", "aws", "logs", { "Ref" : "AWS::Region" }, { "Ref" : "AWS::AccountId" },"log-group","*" ] ] }
34+
]
35+
}]
36+
}
37+
}
38+
]
39+
}
40+
},
41+
"SumoKFLambdaProcessor": {
42+
"Type": "AWS::Lambda::Function",
43+
"DependsOn": [
44+
"SumoKFLambdaExecutionRole"
45+
],
46+
"Properties": {
47+
"Code": {
48+
"S3Bucket": {"Fn::Join": ["", ["appdevzipfiles-", { "Ref" : "AWS::Region" }] ] },
49+
"S3Key": "kinesisfirehose-processor.zip"
50+
},
51+
"Role": {
52+
"Fn::GetAtt": [
53+
"SumoKFLambdaExecutionRole",
54+
"Arn"
55+
]
56+
},
57+
"FunctionName": { "Fn::Join": [ "-", [ "SumoKFLambdaProcessor", { "Fn::Select" : [ "2", {"Fn::Split" : [ "/" , { "Ref": "AWS::StackId" } ]}] } ] ] },
58+
"Timeout": 300,
59+
"Handler": "kinesisfirehose-processor.handler",
60+
"Runtime": "nodejs8.10",
61+
"MemorySize": 128
62+
}
63+
}
64+
},
65+
"Outputs": {
66+
"SumoKFLambdaProcessorArn": {
67+
"Value": { "Fn::GetAtt": ["SumoKFLambdaProcessor", "Arn"] }
68+
}
69+
}
70+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
function encodebase64(data) {
2+
return (new Buffer(data, 'utf8')).toString('base64');
3+
}
4+
5+
function decodebase64(data) {
6+
return (new Buffer(data, 'base64')).toString('utf8');
7+
}
8+
9+
function addDelimitertoJSON(data, delimiter) {
10+
delimiter = typeof delimiter === 'undefined' ? '\n' : delimiter;
11+
let resultdata = decodebase64(data);
12+
resultdata = resultdata + delimiter;
13+
resultdata = encodebase64(resultdata);
14+
return resultdata;
15+
}
16+
17+
function convertToLine(data) {
18+
// converts json object to a single line ({k1:v1,k2:v2} to k1=v1 k2=v2)
19+
const entryObj = JSON.parse(decodebase64(data));
20+
var resultdata = "";
21+
for (var key in entryObj) {
22+
if (entryObj.hasOwnProperty(key)) {
23+
resultdata += key + "=" + entryObj[key] + " ";
24+
}
25+
}
26+
resultdata = resultdata.trim() + "\n";
27+
resultdata = encodebase64(resultdata);
28+
return resultdata;
29+
}
30+
exports.handler = (event, context, callback) => {
31+
console.log("invoking transformation lambda");
32+
let success = 0;
33+
let failure = 0;
34+
35+
const output = event.records.map( function (record) {
36+
try {
37+
// let resultdata = convertToLine(record.data);
38+
let resultdata = addDelimitertoJSON(record.data);
39+
success++;
40+
return {
41+
recordId: record.recordId,
42+
result: 'Ok',
43+
data: resultdata
44+
};
45+
} catch(error) {
46+
console.log("Error in record transformation", error);
47+
failure++;
48+
return {
49+
recordId: record.recordId,
50+
result: 'ProcessingFailed',
51+
data: record.data,
52+
};
53+
}
54+
});
55+
console.log(`Processing completed.Total records ${output.length}. Success ${success} Failed ${failure}`);
56+
callback(null, { records: output });
57+
};
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "kinesisfirehose-processor",
3+
"version": "1.0.0",
4+
"description": "Lambda Function for transforming incoming data from kinesis firehose",
5+
"main": "kinesisfirehose-processor.js",
6+
"dependencies": {},
7+
"devDependencies": {},
8+
"scripts": {
9+
"test": "echo \"Error: no test specified\" && exit 1",
10+
"build": "rm -f kinesisfirehose-processor.zip && zip -r kinesisfirehose-processor.zip kinesisfirehose-processor.js package.json"
11+
},
12+
"keywords": [
13+
"AWS",
14+
"Kinesis Firehose"
15+
],
16+
"author": "Himanshu Pal",
17+
"license": "Apache-2.0"
18+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
{
2+
"AWSTemplateFormatVersion": "2010-09-09",
3+
"Resources": {
4+
"SumoKFDeliveryStream": {
5+
"Type" : "AWS::KinesisFirehose::DeliveryStream",
6+
"Properties" : {
7+
"DeliveryStreamName": { "Fn::Join": [ "-", [ "SumoKFDeliveryStream", { "Fn::Select" : [ "2", {"Fn::Split" : [ "/" , { "Ref": "AWS::StackId" } ]}] } ] ] },
8+
"DeliveryStreamType" : "DirectPut",
9+
"ExtendedS3DestinationConfiguration": {
10+
"BucketARN": {"Fn::Join": ["", ["arn:aws:s3:::", {"Ref":"SumoKFS3bucket"}]]},
11+
"BufferingHints": {
12+
"IntervalInSeconds": "300",
13+
"SizeInMBs": "5"
14+
},
15+
"CompressionFormat": "UNCOMPRESSED",
16+
"Prefix": "firehose/",
17+
"RoleARN": {"Fn::GetAtt" : ["SumoKFDeliveryRole", "Arn"] },
18+
"ProcessingConfiguration" : {
19+
"Enabled": "true",
20+
"Processors": [{
21+
"Parameters": [
22+
{
23+
"ParameterName": "LambdaArn",
24+
"ParameterValue": { "Fn::GetAtt": [ "SumoKFLambdaResources", "Outputs.SumoKFLambdaProcessorArn" ]}
25+
},
26+
{
27+
"ParameterName": "NumberOfRetries",
28+
"ParameterValue": "3"
29+
},
30+
{
31+
"ParameterName": "BufferSizeInMBs",
32+
"ParameterValue": "3"
33+
},
34+
{
35+
"ParameterName": "BufferIntervalInSeconds",
36+
"ParameterValue": "60"
37+
},
38+
],
39+
"Type": "Lambda"
40+
}]
41+
},
42+
"CloudWatchLoggingOptions": {
43+
"Enabled" : false
44+
},
45+
"EncryptionConfiguration": {
46+
"NoEncryptionConfig": "NoEncryption"
47+
},
48+
"S3BackupMode": "Disabled"
49+
}
50+
}
51+
},
52+
"SumoKFS3bucket": {
53+
"Type": "AWS::S3::Bucket",
54+
"Properties": {
55+
"VersioningConfiguration": {
56+
"Status": "Enabled"
57+
}
58+
}
59+
},
60+
"SumoKFDeliveryRole": {
61+
"Type": "AWS::IAM::Role",
62+
"Properties": {
63+
"AssumeRolePolicyDocument": {
64+
"Version": "2012-10-17",
65+
"Statement": [{
66+
"Sid": "",
67+
"Effect": "Allow",
68+
"Principal": {
69+
"Service": "firehose.amazonaws.com"
70+
},
71+
"Action": "sts:AssumeRole",
72+
"Condition": {
73+
"StringEquals": {
74+
"sts:ExternalId": {"Ref":"AWS::AccountId"}
75+
}
76+
}
77+
}]
78+
}
79+
}
80+
},
81+
"SumoKFDeliveryPolicy": {
82+
"Type": "AWS::IAM::Policy",
83+
"Properties": {
84+
"PolicyName": { "Fn::Join": [ "-", [ "firehose_delivery_policy", { "Fn::Select" : [ "2", {"Fn::Split" : [ "/" , { "Ref": "AWS::StackId" } ]}] } ] ] },
85+
"PolicyDocument": {
86+
"Version": "2012-10-17",
87+
"Statement": [
88+
{
89+
"Effect": "Allow",
90+
"Action": [
91+
"s3:AbortMultipartUpload",
92+
"s3:GetBucketLocation",
93+
"s3:GetObject",
94+
"s3:ListBucket",
95+
"s3:ListBucketMultipartUploads",
96+
"s3:PutObject"
97+
],
98+
"Resource": [
99+
{"Fn::Join": ["", ["arn:aws:s3:::", {"Ref":"SumoKFS3bucket"}]]},
100+
{"Fn::Join": ["", ["arn:aws:s3:::", {"Ref":"SumoKFS3bucket"}, "*"]]}
101+
]
102+
},
103+
{
104+
"Sid": "",
105+
"Effect": "Allow",
106+
"Action": [
107+
"lambda:InvokeFunction",
108+
"lambda:GetFunctionConfiguration"
109+
],
110+
"Resource": { "Fn::GetAtt": [ "SumoKFLambdaResources", "Outputs.SumoKFLambdaProcessorArn" ]}
111+
}
112+
]
113+
},
114+
"Roles": [{"Ref": "SumoKFDeliveryRole"}]
115+
}
116+
},
117+
"SumoKFLambdaResources" : {
118+
"Type" : "AWS::CloudFormation::Stack",
119+
"Properties" : {
120+
"TemplateURL" : "https://s3.amazonaws.com/appdev-cloudformation-templates/kinesisfirehose-lambda-cft.json",
121+
"TimeoutInMinutes" : "10"
122+
}
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)