Skip to content

Commit

Permalink
MINIFICPP-2518 PutKinesisStream
Browse files Browse the repository at this point in the history
  • Loading branch information
martinzink committed Feb 19, 2025
1 parent c43307d commit be3b5b6
Show file tree
Hide file tree
Showing 32 changed files with 1,158 additions and 300 deletions.
12 changes: 10 additions & 2 deletions cmake/BundledAwsSdkCpp.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR)
"${LIBDIR}/${PREFIX}aws-c-compression.${SUFFIX}"
"${LIBDIR}/${PREFIX}aws-c-sdkutils.${SUFFIX}"
"${LIBDIR}/${PREFIX}aws-cpp-sdk-core.${SUFFIX}"
"${LIBDIR}/${PREFIX}aws-cpp-sdk-s3.${SUFFIX}")
"${LIBDIR}/${PREFIX}aws-cpp-sdk-s3.${SUFFIX}"
"${LIBDIR}/${PREFIX}aws-cpp-sdk-kinesis.${SUFFIX}"
)

FOREACH(BYPRODUCT ${BYPRODUCTS})
LIST(APPEND AWSSDK_LIBRARIES_LIST "${BINARY_DIR}/thirdparty/libaws-install/${BYPRODUCT}")
Expand All @@ -67,7 +69,7 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR)
set(AWS_SDK_CPP_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
-DCMAKE_PREFIX_PATH=${BINARY_DIR}/thirdparty/libaws-install
-DCMAKE_INSTALL_PREFIX=${BINARY_DIR}/thirdparty/libaws-install
-DBUILD_ONLY=s3
-DBUILD_ONLY=kinesis%s3
-DENABLE_TESTING=OFF
-DBUILD_SHARED_LIBS=OFF
-DENABLE_UNITY_BUILD=${AWS_ENABLE_UNITY_BUILD})
Expand Down Expand Up @@ -205,4 +207,10 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR)
add_dependencies(AWS::aws-cpp-sdk-s3 aws-sdk-cpp-external)
target_include_directories(AWS::aws-cpp-sdk-s3 INTERFACE ${LIBAWS_INCLUDE_DIR})
target_link_libraries(AWS::aws-cpp-sdk-s3 INTERFACE AWS::aws-cpp-sdk-core)

add_library(AWS::aws-cpp-sdk-kinesis STATIC IMPORTED)
set_target_properties(AWS::aws-cpp-sdk-kinesis PROPERTIES IMPORTED_LOCATION "${BINARY_DIR}/thirdparty/libaws-install/${LIBDIR}/${PREFIX}aws-cpp-sdk-kinesis.${SUFFIX}")
add_dependencies(AWS::aws-cpp-sdk-kinesis aws-sdk-cpp-external)
target_include_directories(AWS::aws-cpp-sdk-kinesis INTERFACE ${LIBAWS_INCLUDE_DIR})
target_link_libraries(AWS::aws-cpp-sdk-kinesis INTERFACE AWS::aws-cpp-sdk-core)
endfunction(use_bundled_libaws)
9 changes: 9 additions & 0 deletions docker/test/integration/cluster/ContainerStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .containers.NifiContainer import NiFiOptions
from .containers.ZookeeperContainer import ZookeeperContainer
from .containers.KafkaBrokerContainer import KafkaBrokerContainer
from .containers.KinesisServerContainer import KinesisServerContainer
from .containers.S3ServerContainer import S3ServerContainer
from .containers.AzureStorageServerContainer import AzureStorageServerContainer
from .containers.FakeGcsServerContainer import FakeGcsServerContainer
Expand Down Expand Up @@ -153,6 +154,14 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c
network=self.network,
image_store=self.image_store,
command=command))
elif engine == 'kinesis-server':
return self.containers.setdefault(container_name,
KinesisServerContainer(feature_context=feature_context,
name=container_name,
vols=self.vols,
network=self.network,
image_store=self.image_store,
command=command))
elif engine == 'azure-storage-server':
return self.containers.setdefault(container_name,
AzureStorageServerContainer(feature_context=feature_context,
Expand Down
4 changes: 4 additions & 0 deletions docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ def check_http_proxy_access(self, container_name, url):
and output.count("TCP_MISS") >= output.count("TCP_DENIED"))
or output.count("TCP_DENIED") == 0 and "TCP_MISS" in output)

def check_kinesis_server_record_data(self, container_name, record_data):
container_name = self.container_store.get_container_name_with_postfix(container_name)
return self.aws_checker.check_kinesis_server_record_data(container_name, record_data)

def check_s3_server_object_data(self, container_name, test_data):
container_name = self.container_store.get_container_name_with_postfix(container_name)
return self.aws_checker.check_s3_server_object_data(container_name, test_data)
Expand Down
5 changes: 5 additions & 0 deletions docker/test/integration/cluster/ImageStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def get_image(self, container_engine):
image = self.__build_mqtt_broker_image()
elif container_engine == "splunk":
image = self.__build_splunk_image()
elif container_engine == "kinesis-server":
image = self.__build_kinesis_image()
elif container_engine == "reverse-proxy":
image = self.__build_reverse_proxy_image()
elif container_engine == "diag-slave-tcp":
Expand Down Expand Up @@ -249,6 +251,9 @@ def __build_mqtt_broker_image(self):

return self.__build_image(dockerfile)

def __build_kinesis_image(self):
return self.__build_image_by_path(self.test_dir + "/resources/kinesis-mock", 'kinesis-server')

def __build_splunk_image(self):
return self.__build_image_by_path(self.test_dir + "/resources/splunk-hec", 'minifi-splunk')

Expand Down
5 changes: 5 additions & 0 deletions docker/test/integration/cluster/checkers/AwsChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ class AwsChecker:
def __init__(self, container_communicator):
self.container_communicator = container_communicator

@retry_check()
def check_kinesis_server_record_data(self, container_name, record_data):
(code, output) = self.container_communicator.execute_command(container_name, ["node", "/app/consumer/consumer.js", record_data])
return code == 0

@retry_check()
def check_s3_server_object_data(self, container_name, test_data):
(code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging
from .Container import Container


class KinesisServerContainer(Container):
def __init__(self, feature_context, name, vols, network, image_store, command):
super().__init__(feature_context, name, 'kinesis-server', vols, network, image_store, command)

def get_startup_finished_log_entry(self):
return "Starting Kinesis Plain Mock Service on port 4568"

def deploy(self):
if not self.set_deployed():
return

logging.info('Creating and running kinesis server docker container...')
self.client.containers.run(
self.image_store.get_image(self.get_engine()),
detach=True,
name=self.name,
network=self.network.name,
environment=["INITIALIZE_STREAMS=test_stream:3",
"LOG_LEVEL=DEBUG"],
entrypoint=self.command)
logging.info('Added container \'%s\'', self.name)
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ def __validate(self, validator):
assert not self.cluster.segfault_happened() or self.cluster.log_app_output()
assert validator.validate() or self.cluster.log_app_output()

def check_kinesis_server_record_data(self, kinesis_container_name, record_data):
assert self.cluster.check_kinesis_server_record_data(kinesis_container_name, record_data) or self.cluster.log_app_output()

def check_s3_server_object_data(self, s3_container_name, object_data):
assert self.cluster.check_s3_server_object_data(s3_container_name, object_data) or self.cluster.log_app_output()

Expand Down
38 changes: 38 additions & 0 deletions docker/test/integration/features/kinesis.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ENABLE_AWS
Feature: Sending data from MiNiFi-C++ to an AWS Kinesis server
In order to transfer data to interact with AWS Kinesis server
As a user of MiNiFi
I need to have PutKinesisStream processor

Background:
Given the content of "/tmp/output" is monitored

Scenario: A MiNiFi instance can send data to AWS Kinesis
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content "Schnappi, das kleine Krokodil" is present in "/tmp/input"
And a PutKinesisStream processor set up to communicate with the kinesis server
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the GetFile processor is connected to the PutKinesisStream
And the "success" relationship of the PutKinesisStream processor is connected to the PutFile

And a kinesis server is set up in correspondence with the PutKinesisStream

When both instances start up

Then a flowfile with the content "Schnappi, das kleine Krokodil" is placed in the monitored directory in less than 60 seconds
And there is a record on the kinesis server with "Schnappi, das kleine Krokodil"
11 changes: 11 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def step_impl(context, processor_type, minifi_container_name):
@given("a {processor_type} processor set up to communicate with a kafka broker instance")
@given("a {processor_type} processor set up to communicate with an MQTT broker instance")
@given("a {processor_type} processor set up to communicate with the Splunk HEC instance")
@given("a {processor_type} processor set up to communicate with the kinesis server")
def step_impl(context, processor_type):
__create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow")

Expand Down Expand Up @@ -463,6 +464,11 @@ def step_impl(context):
context.test.acquire_container(context=context, name="s3-server", engine="s3-server")


@given("a kinesis server is set up in correspondence with the PutKinesisStream")
def step_impl(context):
context.test.acquire_container(context=context, name="kinesis-server", engine="kinesis-server")


# azure storage setup
@given("an Azure storage server is set up")
def step_impl(context):
Expand Down Expand Up @@ -900,6 +906,11 @@ def step_impl(context, url):
context.test.check_http_proxy_access('http-proxy', url)


@then("there is a record on the kinesis server with \"{record_data}\"")
def step_impl(context, record_data):
context.test.check_kinesis_server_record_data("kinesis-server", record_data)


@then("the object on the s3 server is \"{object_data}\"")
def step_impl(context, object_data):
context.test.check_s3_server_object_data("s3-server", object_data)
Expand Down
42 changes: 42 additions & 0 deletions docker/test/integration/minifi/processors/PutKinesisStream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ..core.Processor import Processor


class PutKinesisStream(Processor):
def __init__(
self,
context,
proxy_host='',
proxy_port='',
proxy_username='',
proxy_password=''):
super(PutKinesisStream, self).__init__(
context=context,
clazz='PutKinesisStream',
properties={
'Amazon Kinesis Stream Name': 'test_stream',
'Access Key': 'test_access_key',
'Secret Key': 'test_secret',
'Endpoint Override URL': f"http://kinesis-server-{context.feature_id}:4568",
'Proxy Host': proxy_host,
'Proxy Port': proxy_port,
'Proxy Username': proxy_username,
'Proxy Password': proxy_password,
'Region': 'us-east-1'
},
auto_terminate=["success", "failure"])
8 changes: 8 additions & 0 deletions docker/test/integration/resources/kinesis-mock/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM node:lts-alpine
WORKDIR /app
RUN npm i kinesis-local
RUN npm i @aws-sdk/client-kinesis @aws-sdk/node-http-handler
COPY server.json ./
COPY consumer ./consumer

ENTRYPOINT npx kinesis-local
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { KinesisClient, DescribeStreamCommand, GetShardIteratorCommand, GetRecordsCommand } from "@aws-sdk/client-kinesis";
import { NodeHttpHandler } from "@aws-sdk/node-http-handler";

const args = process.argv.slice(2);
if (args.length === 0) {
console.error("Usage: node consumer.js <search-string>");
process.exit(1);
}

const searchString = args[0];
const streamName = "test_stream";

const kinesis = new KinesisClient({
endpoint: "http://localhost:4568",
region: "us-east-1",
credentials: { accessKeyId: "fake", secretAccessKey: "fake" },
requestHandler: new NodeHttpHandler({ http2: false }),
});

async function getShardIterator(shardId) {
const command = new GetShardIteratorCommand({
StreamName: streamName,
ShardId: shardId,
ShardIteratorType: "TRIM_HORIZON",
});
const response = await kinesis.send(command);
return response.ShardIterator;
}

async function readShardRecords(shardId) {
let shardIterator = await getShardIterator(shardId);
const getRecordsCommand = new GetRecordsCommand({ ShardIterator: shardIterator });
const data = await kinesis.send(getRecordsCommand);

return data.Records.map(r => Buffer.from(r.Data).toString().trim());
}

async function readOnce() {
try {
console.log(`Checking stream '${streamName}' for: "${searchString}"`);

const describeCommand = new DescribeStreamCommand({ StreamName: streamName });
const { StreamDescription } = await kinesis.send(describeCommand);

for (let shard of StreamDescription.Shards) {
console.log(`Reading from shard: ${shard.ShardId}`);

const records = await readShardRecords(shard.ShardId);

if (records.includes(searchString)) {
console.log(`Found "${searchString}" in records.`);
process.exit(0);
}
}

console.log(`"${searchString}" not found in any shard.`);
process.exit(-1);
} catch (error) {
console.error("Error reading stream:", error);
process.exit(1);
}
}

readOnce();
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"type": "module"
}
6 changes: 6 additions & 0 deletions docker/test/integration/resources/kinesis-mock/server.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"server": {
"key": "-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDeEXM0RFWMOxkB\neBP3YcJhYhiouRvtHBnMj0M79Umst1OlfB2qqZ9P2nlVWN9E+vzbbbvtJBir/bkJ\nWMnh3sj86nY5EYUlaAwi2oQMVkivHFfweK/Usspy8bAyKBwM5BNJhnAX7GdR6cXu\nv7S2fUjGbp6bkATCypROrBC4HmOSM+GabZRQA6/EiKsYa53NxmBVgrm7twO3eoeS\ntVzH15/orWJm/8ukxq//E+WI4qb0LiRT79FjWoRK2czpAzP6+JqdQTSg1msCbsod\n/+k7nc5CVMStnkKMBOk1jniAfVoqDAtGJRlIMpWXtr6vQFiZE4z6t3bl2ZY/h6p6\nqmTkL1gXAgMBAAECggEBAMJ6v8zvZ4hXHVAnDD1jlStaEMR60NU3/fQjJzu0VqB3\nMT9FUmnrAUWazRYMrgQoVxgIo0NMkHrXypw/8RXp2VV+NKlICbY3yCEiA/EWA7Ov\n++fymfKJ3jkKJ0fVzrMPb0C+Bx88f0PCmwC7TZVgZUK7EBam6zR426eGk2Hb41He\nkcUG0Q68modQO172m3bPqNsa32bsjvAZ813KLSs8xFHhJajTjX5kXG+PmN1fenTx\n8cvzOAET62O4XEXsYqD+iENyvqkRkysfqa9uzWyhLiip6NZXJHPuNKi9usVL+ZNj\nglJXFMP0K/TFYqgMBnoJF3Y60NMBlTpTyDu1kP0/nCkCgYEA+g/cxpz+1yPEIs+y\ncX4qEp2VJpj4HuJdZKingtApg1aSAlAcNf9UZl7EsGz3vf/x/hX6YtTLF9cbyuIR\nP+CYEevWJwiLjTjMbd+Z86oeYr/sLqz4+98W3RGTAcRfLyEeM/fc7MEwTSeyIhpZ\nGi+HbxGJMvbj+8tUh6KFd9lDu8MCgYEA41dpfo2MnyA9f2OrEC2Joll/9Kgz+3l9\njZIRQ4YID3Rawi9xLi9Z6aiIudAGFT9hcFU11akIVz6yXvhRGmE6gQ6bim5ebaJe\n69EHchuBl9Pszi7/UUS4cxksVnNwJjrAHXnoESU0DTWRh3AUkBEsKOd0DkSGOma+\nMEI+JDe2cR0CgYEA9Jgfc4aNHxM0/nf6K1kk/iB1i9OEn3D7uUHe1+2VLYq4Ntr1\nPTwK6jc4XPm5Onfn1Ija6WELZr5ZyRFnnfupw53TU0rgdbpg+/gDNnvoTN89vkoj\nIPsN+h7+lHPoRsk2Kc8AofQ1ssJpU0JCdYKYDuQwN1GXnus8O4+Uza4OutECgYAr\nIeCABDcT0bgZPT2tWhZs2PIv5uHF6mzpuTbRStKoq/i0MvAURSOX80PNjSw6R8Yi\n2+fU27cbZmfNIOuyR5Qj/DOCdiIwRsgfkY8KFTHnLmwVSlFih9k+7R2+YTR77FWa\nwhBHgHl5sBomSht8oeVw9UjNlC6rUebvnQHROUjB+QKBgQDj1g/t+B5lZ03Pc00h\nNQxMqTR6wAp3hPX5Wen44+o0ktGkArRP3Iw3x5764EgUudn2p1NibCrjCnj2avCi\nGkkbl0wWcFZiOsvmBL/0ngxdPA5bfwiKKeIavltWufXXHl1fs0C4UIu9fHWpV6z7\nNRVO87Wp3BK13bCGUD0DvAdtpA==\n-----END PRIVATE KEY-----\n",
"cert": "-----BEGIN CERTIFICATE-----\nMIIDfTCCAmWgAwIBAgIEW6AYvzANBgkqhkiG9w0BAQsFADBuMRAwDgYDVQQGEwdV\nbmtub3duMRAwDgYDVQQIEwdVbmtub3duMRAwDgYDVQQHEwdVbmtub3duMRAwDgYD\nVQQKEwdVbmtub3duMRIwEAYDVQQLEwlGUzIgVGVzdHMxEDAOBgNVBAMTB1Vua25v\nd24wIBcNMTkxMjAyMTMyOTM3WhgPMjExODA2MjYxMzI5MzdaMG4xEDAOBgNVBAYT\nB1Vua25vd24xEDAOBgNVBAgTB1Vua25vd24xEDAOBgNVBAcTB1Vua25vd24xEDAO\nBgNVBAoTB1Vua25vd24xEjAQBgNVBAsTCUZTMiBUZXN0czEQMA4GA1UEAxMHVW5r\nbm93bjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAN4RczREVYw7GQF4\nE/dhwmFiGKi5G+0cGcyPQzv1Say3U6V8Haqpn0/aeVVY30T6/Nttu+0kGKv9uQlY\nyeHeyPzqdjkRhSVoDCLahAxWSK8cV/B4r9SyynLxsDIoHAzkE0mGcBfsZ1Hpxe6/\ntLZ9SMZunpuQBMLKlE6sELgeY5Iz4ZptlFADr8SIqxhrnc3GYFWCubu3A7d6h5K1\nXMfXn+itYmb/y6TGr/8T5YjipvQuJFPv0WNahErZzOkDM/r4mp1BNKDWawJuyh3/\n6TudzkJUxK2eQowE6TWOeIB9WioMC0YlGUgylZe2vq9AWJkTjPq3duXZlj+Hqnqq\nZOQvWBcCAwEAAaMhMB8wHQYDVR0OBBYEFMEuQ/pix1rrPmvAh/KqSfxFLKfhMA0G\nCSqGSIb3DQEBCwUAA4IBAQDX/Nr2FuVhXPkZRrFFVrzqIbNO3ROnkURXXJSXus5v\nnvmQOXWHtE5Uy1f6z1iKCYUs6l+M5+YLmxooTZvaoAC6tjPDskIyPGQGVG4MB03E\nahLSBaRGJEZXIHLU9s6lgK0YoZCnhHdD+TXalxS4Jv6ieZJCKbWpkQomYYPWyTC6\nlR5XSEBPhPNRzo0wjla9a6th+Zc3KnTzTsQHg65IU0DIQeAIuxG0v3xh4NOmGi2Z\nLX5q0qhf9uk88HUwocILO9TLshlTPAF4puf0vS4MICw46g4YonDz7k5VQmzy0ZAV\nc6ew1WF0PfBk/3o4F0plkj5nbem57iOU0znKfI0ZYXoR\n-----END CERTIFICATE-----\n"
}
}
Loading

0 comments on commit be3b5b6

Please sign in to comment.