diff --git a/cmake/BundledAwsSdkCpp.cmake b/cmake/BundledAwsSdkCpp.cmake index 222a89940f..5f100e1939 100644 --- a/cmake/BundledAwsSdkCpp.cmake +++ b/cmake/BundledAwsSdkCpp.cmake @@ -58,7 +58,9 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR) "${LIBDIR}/${PREFIX}aws-c-compression.${SUFFIX}" "${LIBDIR}/${PREFIX}aws-c-sdkutils.${SUFFIX}" "${LIBDIR}/${PREFIX}aws-cpp-sdk-core.${SUFFIX}" - "${LIBDIR}/${PREFIX}aws-cpp-sdk-s3.${SUFFIX}") + "${LIBDIR}/${PREFIX}aws-cpp-sdk-s3.${SUFFIX}" + "${LIBDIR}/${PREFIX}aws-cpp-sdk-kinesis.${SUFFIX}" + ) FOREACH(BYPRODUCT ${BYPRODUCTS}) LIST(APPEND AWSSDK_LIBRARIES_LIST "${BINARY_DIR}/thirdparty/libaws-install/${BYPRODUCT}") @@ -67,7 +69,7 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR) set(AWS_SDK_CPP_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS} -DCMAKE_PREFIX_PATH=${BINARY_DIR}/thirdparty/libaws-install -DCMAKE_INSTALL_PREFIX=${BINARY_DIR}/thirdparty/libaws-install - -DBUILD_ONLY=s3 + -DBUILD_ONLY=kinesis%s3 -DENABLE_TESTING=OFF -DBUILD_SHARED_LIBS=OFF -DENABLE_UNITY_BUILD=${AWS_ENABLE_UNITY_BUILD}) @@ -205,4 +207,10 @@ function(use_bundled_libaws SOURCE_DIR BINARY_DIR) add_dependencies(AWS::aws-cpp-sdk-s3 aws-sdk-cpp-external) target_include_directories(AWS::aws-cpp-sdk-s3 INTERFACE ${LIBAWS_INCLUDE_DIR}) target_link_libraries(AWS::aws-cpp-sdk-s3 INTERFACE AWS::aws-cpp-sdk-core) + + add_library(AWS::aws-cpp-sdk-kinesis STATIC IMPORTED) + set_target_properties(AWS::aws-cpp-sdk-kinesis PROPERTIES IMPORTED_LOCATION "${BINARY_DIR}/thirdparty/libaws-install/${LIBDIR}/${PREFIX}aws-cpp-sdk-kinesis.${SUFFIX}") + add_dependencies(AWS::aws-cpp-sdk-kinesis aws-sdk-cpp-external) + target_include_directories(AWS::aws-cpp-sdk-kinesis INTERFACE ${LIBAWS_INCLUDE_DIR}) + target_link_libraries(AWS::aws-cpp-sdk-kinesis INTERFACE AWS::aws-cpp-sdk-core) endfunction(use_bundled_libaws) diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 9c8527458a..680f36b051 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -20,6 +20,7 @@ from .containers.NifiContainer import NiFiOptions from .containers.ZookeeperContainer import ZookeeperContainer from .containers.KafkaBrokerContainer import KafkaBrokerContainer +from .containers.KinesisServerContainer import KinesisServerContainer from .containers.S3ServerContainer import S3ServerContainer from .containers.AzureStorageServerContainer import AzureStorageServerContainer from .containers.FakeGcsServerContainer import FakeGcsServerContainer @@ -153,6 +154,14 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c network=self.network, image_store=self.image_store, command=command)) + elif engine == 'kinesis-server': + return self.containers.setdefault(container_name, + KinesisServerContainer(feature_context=feature_context, + name=container_name, + vols=self.vols, + network=self.network, + image_store=self.image_store, + command=command)) elif engine == 'azure-storage-server': return self.containers.setdefault(container_name, AzureStorageServerContainer(feature_context=feature_context, diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index e161d5da47..a35a389f95 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -190,6 +190,10 @@ def check_http_proxy_access(self, container_name, url): and output.count("TCP_MISS") >= output.count("TCP_DENIED")) or output.count("TCP_DENIED") == 0 and "TCP_MISS" in output) + def check_kinesis_server_record_data(self, container_name, record_data): + container_name = self.container_store.get_container_name_with_postfix(container_name) + return self.aws_checker.check_kinesis_server_record_data(container_name, record_data) + def check_s3_server_object_data(self, container_name, test_data): container_name = self.container_store.get_container_name_with_postfix(container_name) return self.aws_checker.check_s3_server_object_data(container_name, test_data) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index 7ec5cb27d1..8f15da97ae 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -63,6 +63,8 @@ def get_image(self, container_engine): image = self.__build_mqtt_broker_image() elif container_engine == "splunk": image = self.__build_splunk_image() + elif container_engine == "kinesis-server": + image = self.__build_kinesis_image() elif container_engine == "reverse-proxy": image = self.__build_reverse_proxy_image() elif container_engine == "diag-slave-tcp": @@ -249,6 +251,9 @@ def __build_mqtt_broker_image(self): return self.__build_image(dockerfile) + def __build_kinesis_image(self): + return self.__build_image_by_path(self.test_dir + "/resources/kinesis-mock", 'kinesis-server') + def __build_splunk_image(self): return self.__build_image_by_path(self.test_dir + "/resources/splunk-hec", 'minifi-splunk') diff --git a/docker/test/integration/cluster/checkers/AwsChecker.py b/docker/test/integration/cluster/checkers/AwsChecker.py index 48dcb6996d..8a3193f7b5 100644 --- a/docker/test/integration/cluster/checkers/AwsChecker.py +++ b/docker/test/integration/cluster/checkers/AwsChecker.py @@ -20,6 +20,11 @@ class AwsChecker: def __init__(self, container_communicator): self.container_communicator = container_communicator + @retry_check() + def check_kinesis_server_record_data(self, container_name, record_data): + (code, output) = self.container_communicator.execute_command(container_name, ["node", "/app/consumer/consumer.js", record_data]) + return code == 0 + @retry_check() def check_s3_server_object_data(self, container_name, test_data): (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"]) diff --git a/docker/test/integration/cluster/containers/KinesisServerContainer.py b/docker/test/integration/cluster/containers/KinesisServerContainer.py new file mode 100644 index 0000000000..0bee46cef7 --- /dev/null +++ b/docker/test/integration/cluster/containers/KinesisServerContainer.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +from .Container import Container + + +class KinesisServerContainer(Container): + def __init__(self, feature_context, name, vols, network, image_store, command): + super().__init__(feature_context, name, 'kinesis-server', vols, network, image_store, command) + + def get_startup_finished_log_entry(self): + return "Starting Kinesis Plain Mock Service on port 4568" + + def deploy(self): + if not self.set_deployed(): + return + + logging.info('Creating and running kinesis server docker container...') + self.client.containers.run( + self.image_store.get_image(self.get_engine()), + detach=True, + name=self.name, + network=self.network.name, + environment=["INITIALIZE_STREAMS=test_stream:3", + "LOG_LEVEL=DEBUG"], + entrypoint=self.command) + logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 57526342db..12ca48e29c 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -251,6 +251,9 @@ def __validate(self, validator): assert not self.cluster.segfault_happened() or self.cluster.log_app_output() assert validator.validate() or self.cluster.log_app_output() + def check_kinesis_server_record_data(self, kinesis_container_name, record_data): + assert self.cluster.check_kinesis_server_record_data(kinesis_container_name, record_data) or self.cluster.log_app_output() + def check_s3_server_object_data(self, s3_container_name, object_data): assert self.cluster.check_s3_server_object_data(s3_container_name, object_data) or self.cluster.log_app_output() diff --git a/docker/test/integration/features/kinesis.feature b/docker/test/integration/features/kinesis.feature new file mode 100644 index 0000000000..2f68fd77c1 --- /dev/null +++ b/docker/test/integration/features/kinesis.feature @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +@ENABLE_AWS +Feature: Sending data from MiNiFi-C++ to an AWS Kinesis server + In order to transfer data to interact with AWS Kinesis server + As a user of MiNiFi + I need to have PutKinesisStream processor + + Background: + Given the content of "/tmp/output" is monitored + + Scenario: A MiNiFi instance can send data to AWS Kinesis + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "Schnappi, das kleine Krokodil" is present in "/tmp/input" + And a PutKinesisStream processor set up to communicate with the kinesis server + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PutKinesisStream + And the "success" relationship of the PutKinesisStream processor is connected to the PutFile + + And a kinesis server is set up in correspondence with the PutKinesisStream + + When both instances start up + + Then a flowfile with the content "Schnappi, das kleine Krokodil" is placed in the monitored directory in less than 60 seconds + And there is a record on the kinesis server with "Schnappi, das kleine Krokodil" diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index e65ca2f29f..b57b64f50f 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -128,6 +128,7 @@ def step_impl(context, processor_type, minifi_container_name): @given("a {processor_type} processor set up to communicate with a kafka broker instance") @given("a {processor_type} processor set up to communicate with an MQTT broker instance") @given("a {processor_type} processor set up to communicate with the Splunk HEC instance") +@given("a {processor_type} processor set up to communicate with the kinesis server") def step_impl(context, processor_type): __create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow") @@ -463,6 +464,11 @@ def step_impl(context): context.test.acquire_container(context=context, name="s3-server", engine="s3-server") +@given("a kinesis server is set up in correspondence with the PutKinesisStream") +def step_impl(context): + context.test.acquire_container(context=context, name="kinesis-server", engine="kinesis-server") + + # azure storage setup @given("an Azure storage server is set up") def step_impl(context): @@ -900,6 +906,11 @@ def step_impl(context, url): context.test.check_http_proxy_access('http-proxy', url) +@then("there is a record on the kinesis server with \"{record_data}\"") +def step_impl(context, record_data): + context.test.check_kinesis_server_record_data("kinesis-server", record_data) + + @then("the object on the s3 server is \"{object_data}\"") def step_impl(context, object_data): context.test.check_s3_server_object_data("s3-server", object_data) diff --git a/docker/test/integration/minifi/processors/PutKinesisStream.py b/docker/test/integration/minifi/processors/PutKinesisStream.py new file mode 100644 index 0000000000..40e55e1c2d --- /dev/null +++ b/docker/test/integration/minifi/processors/PutKinesisStream.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ..core.Processor import Processor + + +class PutKinesisStream(Processor): + def __init__( + self, + context, + proxy_host='', + proxy_port='', + proxy_username='', + proxy_password=''): + super(PutKinesisStream, self).__init__( + context=context, + clazz='PutKinesisStream', + properties={ + 'Amazon Kinesis Stream Name': 'test_stream', + 'Access Key': 'test_access_key', + 'Secret Key': 'test_secret', + 'Endpoint Override URL': f"http://kinesis-server-{context.feature_id}:4568", + 'Proxy Host': proxy_host, + 'Proxy Port': proxy_port, + 'Proxy Username': proxy_username, + 'Proxy Password': proxy_password, + 'Region': 'us-east-1' + }, + auto_terminate=["success", "failure"]) diff --git a/docker/test/integration/resources/kinesis-mock/Dockerfile b/docker/test/integration/resources/kinesis-mock/Dockerfile new file mode 100644 index 0000000000..567acfb238 --- /dev/null +++ b/docker/test/integration/resources/kinesis-mock/Dockerfile @@ -0,0 +1,8 @@ +FROM node:lts-alpine +WORKDIR /app +RUN npm i kinesis-local +RUN npm i @aws-sdk/client-kinesis @aws-sdk/node-http-handler +COPY server.json ./ +COPY consumer ./consumer + +ENTRYPOINT npx kinesis-local diff --git a/docker/test/integration/resources/kinesis-mock/consumer/consumer.js b/docker/test/integration/resources/kinesis-mock/consumer/consumer.js new file mode 100644 index 0000000000..07f48cb27c --- /dev/null +++ b/docker/test/integration/resources/kinesis-mock/consumer/consumer.js @@ -0,0 +1,64 @@ +import { KinesisClient, DescribeStreamCommand, GetShardIteratorCommand, GetRecordsCommand } from "@aws-sdk/client-kinesis"; +import { NodeHttpHandler } from "@aws-sdk/node-http-handler"; + +const args = process.argv.slice(2); +if (args.length === 0) { + console.error("Usage: node consumer.js "); + process.exit(1); +} + +const searchString = args[0]; +const streamName = "test_stream"; + +const kinesis = new KinesisClient({ + endpoint: "http://localhost:4568", + region: "us-east-1", + credentials: { accessKeyId: "fake", secretAccessKey: "fake" }, + requestHandler: new NodeHttpHandler({ http2: false }), +}); + +async function getShardIterator(shardId) { + const command = new GetShardIteratorCommand({ + StreamName: streamName, + ShardId: shardId, + ShardIteratorType: "TRIM_HORIZON", + }); + const response = await kinesis.send(command); + return response.ShardIterator; +} + +async function readShardRecords(shardId) { + let shardIterator = await getShardIterator(shardId); + const getRecordsCommand = new GetRecordsCommand({ ShardIterator: shardIterator }); + const data = await kinesis.send(getRecordsCommand); + + return data.Records.map(r => Buffer.from(r.Data).toString().trim()); +} + +async function readOnce() { + try { + console.log(`Checking stream '${streamName}' for: "${searchString}"`); + + const describeCommand = new DescribeStreamCommand({ StreamName: streamName }); + const { StreamDescription } = await kinesis.send(describeCommand); + + for (let shard of StreamDescription.Shards) { + console.log(`Reading from shard: ${shard.ShardId}`); + + const records = await readShardRecords(shard.ShardId); + + if (records.includes(searchString)) { + console.log(`Found "${searchString}" in records.`); + process.exit(0); + } + } + + console.log(`"${searchString}" not found in any shard.`); + process.exit(-1); + } catch (error) { + console.error("Error reading stream:", error); + process.exit(1); + } +} + +readOnce(); diff --git a/docker/test/integration/resources/kinesis-mock/consumer/package.json b/docker/test/integration/resources/kinesis-mock/consumer/package.json new file mode 100644 index 0000000000..aead43de36 --- /dev/null +++ b/docker/test/integration/resources/kinesis-mock/consumer/package.json @@ -0,0 +1,3 @@ +{ + "type": "module" +} \ No newline at end of file diff --git a/docker/test/integration/resources/kinesis-mock/server.json b/docker/test/integration/resources/kinesis-mock/server.json new file mode 100644 index 0000000000..cce12612b0 --- /dev/null +++ b/docker/test/integration/resources/kinesis-mock/server.json @@ -0,0 +1,6 @@ +{ + "server": { + "key": "-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDeEXM0RFWMOxkB\neBP3YcJhYhiouRvtHBnMj0M79Umst1OlfB2qqZ9P2nlVWN9E+vzbbbvtJBir/bkJ\nWMnh3sj86nY5EYUlaAwi2oQMVkivHFfweK/Usspy8bAyKBwM5BNJhnAX7GdR6cXu\nv7S2fUjGbp6bkATCypROrBC4HmOSM+GabZRQA6/EiKsYa53NxmBVgrm7twO3eoeS\ntVzH15/orWJm/8ukxq//E+WI4qb0LiRT79FjWoRK2czpAzP6+JqdQTSg1msCbsod\n/+k7nc5CVMStnkKMBOk1jniAfVoqDAtGJRlIMpWXtr6vQFiZE4z6t3bl2ZY/h6p6\nqmTkL1gXAgMBAAECggEBAMJ6v8zvZ4hXHVAnDD1jlStaEMR60NU3/fQjJzu0VqB3\nMT9FUmnrAUWazRYMrgQoVxgIo0NMkHrXypw/8RXp2VV+NKlICbY3yCEiA/EWA7Ov\n++fymfKJ3jkKJ0fVzrMPb0C+Bx88f0PCmwC7TZVgZUK7EBam6zR426eGk2Hb41He\nkcUG0Q68modQO172m3bPqNsa32bsjvAZ813KLSs8xFHhJajTjX5kXG+PmN1fenTx\n8cvzOAET62O4XEXsYqD+iENyvqkRkysfqa9uzWyhLiip6NZXJHPuNKi9usVL+ZNj\nglJXFMP0K/TFYqgMBnoJF3Y60NMBlTpTyDu1kP0/nCkCgYEA+g/cxpz+1yPEIs+y\ncX4qEp2VJpj4HuJdZKingtApg1aSAlAcNf9UZl7EsGz3vf/x/hX6YtTLF9cbyuIR\nP+CYEevWJwiLjTjMbd+Z86oeYr/sLqz4+98W3RGTAcRfLyEeM/fc7MEwTSeyIhpZ\nGi+HbxGJMvbj+8tUh6KFd9lDu8MCgYEA41dpfo2MnyA9f2OrEC2Joll/9Kgz+3l9\njZIRQ4YID3Rawi9xLi9Z6aiIudAGFT9hcFU11akIVz6yXvhRGmE6gQ6bim5ebaJe\n69EHchuBl9Pszi7/UUS4cxksVnNwJjrAHXnoESU0DTWRh3AUkBEsKOd0DkSGOma+\nMEI+JDe2cR0CgYEA9Jgfc4aNHxM0/nf6K1kk/iB1i9OEn3D7uUHe1+2VLYq4Ntr1\nPTwK6jc4XPm5Onfn1Ija6WELZr5ZyRFnnfupw53TU0rgdbpg+/gDNnvoTN89vkoj\nIPsN+h7+lHPoRsk2Kc8AofQ1ssJpU0JCdYKYDuQwN1GXnus8O4+Uza4OutECgYAr\nIeCABDcT0bgZPT2tWhZs2PIv5uHF6mzpuTbRStKoq/i0MvAURSOX80PNjSw6R8Yi\n2+fU27cbZmfNIOuyR5Qj/DOCdiIwRsgfkY8KFTHnLmwVSlFih9k+7R2+YTR77FWa\nwhBHgHl5sBomSht8oeVw9UjNlC6rUebvnQHROUjB+QKBgQDj1g/t+B5lZ03Pc00h\nNQxMqTR6wAp3hPX5Wen44+o0ktGkArRP3Iw3x5764EgUudn2p1NibCrjCnj2avCi\nGkkbl0wWcFZiOsvmBL/0ngxdPA5bfwiKKeIavltWufXXHl1fs0C4UIu9fHWpV6z7\nNRVO87Wp3BK13bCGUD0DvAdtpA==\n-----END PRIVATE KEY-----\n", + "cert": "-----BEGIN CERTIFICATE-----\nMIIDfTCCAmWgAwIBAgIEW6AYvzANBgkqhkiG9w0BAQsFADBuMRAwDgYDVQQGEwdV\nbmtub3duMRAwDgYDVQQIEwdVbmtub3duMRAwDgYDVQQHEwdVbmtub3duMRAwDgYD\nVQQKEwdVbmtub3duMRIwEAYDVQQLEwlGUzIgVGVzdHMxEDAOBgNVBAMTB1Vua25v\nd24wIBcNMTkxMjAyMTMyOTM3WhgPMjExODA2MjYxMzI5MzdaMG4xEDAOBgNVBAYT\nB1Vua25vd24xEDAOBgNVBAgTB1Vua25vd24xEDAOBgNVBAcTB1Vua25vd24xEDAO\nBgNVBAoTB1Vua25vd24xEjAQBgNVBAsTCUZTMiBUZXN0czEQMA4GA1UEAxMHVW5r\nbm93bjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAN4RczREVYw7GQF4\nE/dhwmFiGKi5G+0cGcyPQzv1Say3U6V8Haqpn0/aeVVY30T6/Nttu+0kGKv9uQlY\nyeHeyPzqdjkRhSVoDCLahAxWSK8cV/B4r9SyynLxsDIoHAzkE0mGcBfsZ1Hpxe6/\ntLZ9SMZunpuQBMLKlE6sELgeY5Iz4ZptlFADr8SIqxhrnc3GYFWCubu3A7d6h5K1\nXMfXn+itYmb/y6TGr/8T5YjipvQuJFPv0WNahErZzOkDM/r4mp1BNKDWawJuyh3/\n6TudzkJUxK2eQowE6TWOeIB9WioMC0YlGUgylZe2vq9AWJkTjPq3duXZlj+Hqnqq\nZOQvWBcCAwEAAaMhMB8wHQYDVR0OBBYEFMEuQ/pix1rrPmvAh/KqSfxFLKfhMA0G\nCSqGSIb3DQEBCwUAA4IBAQDX/Nr2FuVhXPkZRrFFVrzqIbNO3ROnkURXXJSXus5v\nnvmQOXWHtE5Uy1f6z1iKCYUs6l+M5+YLmxooTZvaoAC6tjPDskIyPGQGVG4MB03E\nahLSBaRGJEZXIHLU9s6lgK0YoZCnhHdD+TXalxS4Jv6ieZJCKbWpkQomYYPWyTC6\nlR5XSEBPhPNRzo0wjla9a6th+Zc3KnTzTsQHg65IU0DIQeAIuxG0v3xh4NOmGi2Z\nLX5q0qhf9uk88HUwocILO9TLshlTPAF4puf0vS4MICw46g4YonDz7k5VQmzy0ZAV\nc6ew1WF0PfBk/3o4F0plkj5nbem57iOU0znKfI0ZYXoR\n-----END CERTIFICATE-----\n" + } +} diff --git a/extensions/aws/CMakeLists.txt b/extensions/aws/CMakeLists.txt index 11ae835434..355a28a64f 100644 --- a/extensions/aws/CMakeLists.txt +++ b/extensions/aws/CMakeLists.txt @@ -33,14 +33,20 @@ add_minifi_library(minifi-aws SHARED ${SOURCES}) target_link_libraries(minifi-aws PUBLIC ${LIBMINIFI} Threads::Threads) target_wholearchive_library_private(minifi-aws AWS::aws-cpp-sdk-s3) +target_wholearchive_library_private(minifi-aws AWS::aws-cpp-sdk-kinesis) if(CMAKE_SYSTEM_PROCESSOR MATCHES "(arm64)|(ARM64)|(aarch64)|(armv8)") target_wholearchive_library_private(minifi-aws AWS::aws-checksums) endif() -get_target_property(AWS_SDK_INCLUDE_DIRS AWS::aws-cpp-sdk-s3 INTERFACE_INCLUDE_DIRECTORIES) +get_target_property(AWS_SDK_S3_INCLUDE_DIRS AWS::aws-cpp-sdk-s3 INTERFACE_INCLUDE_DIRECTORIES) +get_target_property(AWS_SDK_KINESIS_INCLUDE_DIRS AWS::aws-cpp-sdk-kinesis INTERFACE_INCLUDE_DIRECTORIES) + target_include_directories(minifi-aws INTERFACE ${AWS_SDK_INCLUDE_DIRS}) +target_include_directories(minifi-aws INTERFACE ${AWS_SDK_KINESIS_INCLUDE_DIRS}) + if(WIN32) target_compile_definitions(minifi-aws INTERFACE "AWS_CORE_API=__declspec(dllimport)") target_compile_definitions(minifi-aws INTERFACE "AWS_S3_API=__declspec(dllimport)") + target_compile_definitions(minifi-aws INTERFACE "AWS_KINESIS_API=__declspec(dllimport)") endif() register_extension(minifi-aws "AWS EXTENSIONS" AWS-EXTENSIONS "This enables AWS support" "extensions/aws/tests") diff --git a/extensions/aws/processors/AwsProcessor.cpp b/extensions/aws/processors/AwsProcessor.cpp new file mode 100644 index 0000000000..d90ec74682 --- /dev/null +++ b/extensions/aws/processors/AwsProcessor.cpp @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AwsProcessor.h" + +#include +#include +#include + +#include "AWSCredentialsService.h" +#include "S3Wrapper.h" +#include "core/ProcessContext.h" +#include "properties/Properties.h" +#include "range/v3/algorithm/contains.hpp" +#include "utils/HTTPUtils.h" +#include "utils/StringUtils.h" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::aws::processors { + +AwsProcessor::AwsProcessor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr logger) + : core::ProcessorImpl(name, uuid), + logger_(std::move(logger)) { +} + +std::optional AwsProcessor::getAWSCredentialsFromControllerService(core::ProcessContext& context) const { + if (const auto aws_credentials_service = minifi::utils::parseOptionalControllerService(context, AWSCredentialsProviderService, getUUID())) { + return (*aws_credentials_service)->getAWSCredentials(); + } + logger_->log_error("AWS credentials service could not be found"); + + return std::nullopt; +} + +std::optional AwsProcessor::getAWSCredentials( + core::ProcessContext& context, + const core::FlowFile* const flow_file) { + auto service_cred = getAWSCredentialsFromControllerService(context); + if (service_cred) { + logger_->log_info("AWS Credentials successfully set from controller service"); + return service_cred.value(); + } + + aws::AWSCredentialsProvider aws_credentials_provider; + if (const auto access_key = context.getProperty(AccessKey.name, flow_file)) { + aws_credentials_provider.setAccessKey(*access_key); + } + if (const auto secret_key = context.getProperty(SecretKey.name, flow_file)) { + aws_credentials_provider.setSecretKey(*secret_key); + } + if (const auto credentials_file = context.getProperty(CredentialsFile.name, flow_file)) { + aws_credentials_provider.setCredentialsFile(*credentials_file); + } + if (const auto use_credentials = context.getProperty(UseDefaultCredentials.name, flow_file) | minifi::utils::andThen(parsing::parseBool)) { + aws_credentials_provider.setUseDefaultCredentials(*use_credentials); + } + + return aws_credentials_provider.getAWSCredentials(); +} + +aws::ProxyOptions AwsProcessor::getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file) { + aws::ProxyOptions proxy; + + proxy.host = minifi::utils::parseOptionalProperty(context, ProxyHost, flow_file).value_or(""); + proxy.port = gsl::narrow(minifi::utils::parseOptionalU64Property(context, ProxyPort, flow_file).value_or(0)); + proxy.username = minifi::utils::parseOptionalProperty(context, ProxyUsername, flow_file).value_or(""); + proxy.password = minifi::utils::parseOptionalProperty(context, ProxyPassword, flow_file).value_or(""); + + if (!proxy.host.empty()) { + logger_->log_info("Proxy for AwsProcessor was set."); + } + return proxy; +} + +void AwsProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + client_config_ = Aws::Client::ClientConfiguration(); + + client_config_->region = context.getProperty(Region) | minifi::utils::expect("Region property missing or invalid"); + logger_->log_debug("AwsProcessor: Region [{}]", client_config_->region); + + if (auto communications_timeout = minifi::utils::parseOptionalMsProperty(context, CommunicationsTimeout)) { + logger_->log_debug("AwsProcessor: Communications Timeout {}", *communications_timeout); + client_config_->connectTimeoutMs = gsl::narrow(communications_timeout->count()); // NOLINT(runtime/int,google-runtime-int) + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid"); + } + + static const auto default_ca_file = minifi::utils::getDefaultCAFile(); + if (default_ca_file) { + client_config_->caFile = *default_ca_file; + } +} + +std::optional AwsProcessor::getCommonELSupportedProperties( + core::ProcessContext& context, + const core::FlowFile* const flow_file) { + CommonProperties properties; + + auto credentials = getAWSCredentials(context, flow_file); + if (!credentials) { + logger_->log_error("AWS Credentials have not been set!"); + return std::nullopt; + } + properties.credentials = credentials.value(); + properties.proxy = getProxy(context, flow_file); + + const auto endpoint_override_url = context.getProperty(EndpointOverrideURL, flow_file); + if (endpoint_override_url) { + properties.endpoint_override_url = *endpoint_override_url; + logger_->log_debug("AwsProcessor: Endpoint Override URL [{}]", properties.endpoint_override_url); + } + + return properties; +} + +} // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/AwsProcessor.h b/extensions/aws/processors/AwsProcessor.h new file mode 100644 index 0000000000..c6662c0c86 --- /dev/null +++ b/extensions/aws/processors/AwsProcessor.h @@ -0,0 +1,182 @@ +/** + * @file S3Processor.h + * Base S3 processor class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "aws/core/auth/AWSCredentialsProvider.h" +#include "AWSCredentialsProvider.h" +#include "utils/ProxyOptions.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" +#include "core/Processor.h" + + +namespace org::apache::nifi::minifi::aws::processors { + +namespace region { +inline constexpr std::string_view AF_SOUTH_1 = "af-south-1"; +inline constexpr std::string_view AP_EAST_1 = "ap-east-1"; +inline constexpr std::string_view AP_NORTHEAST_1 = "ap-northeast-1"; +inline constexpr std::string_view AP_NORTHEAST_2 = "ap-northeast-2"; +inline constexpr std::string_view AP_NORTHEAST_3 = "ap-northeast-3"; +inline constexpr std::string_view AP_SOUTH_1 = "ap-south-1"; +inline constexpr std::string_view AP_SOUTH_2 = "ap-south-2"; +inline constexpr std::string_view AP_SOUTHEAST_1 = "ap-southeast-1"; +inline constexpr std::string_view AP_SOUTHEAST_2 = "ap-southeast-2"; +inline constexpr std::string_view AP_SOUTHEAST_3 = "ap-southeast-3"; +inline constexpr std::string_view CA_CENTRAL_1 = "ca-central-1"; +inline constexpr std::string_view CN_NORTH_1 = "cn-north-1"; +inline constexpr std::string_view CN_NORTHWEST_1 = "cn-northwest-1"; +inline constexpr std::string_view EU_CENTRAL_1 = "eu-central-1"; +inline constexpr std::string_view EU_CENTRAL_2 = "eu-central-2"; +inline constexpr std::string_view EU_NORTH_1 = "eu-north-1"; +inline constexpr std::string_view EU_SOUTH_1 = "eu-south-1"; +inline constexpr std::string_view EU_SOUTH_2 = "eu-south-2"; +inline constexpr std::string_view EU_WEST_1 = "eu-west-1"; +inline constexpr std::string_view EU_WEST_2 = "eu-west-2"; +inline constexpr std::string_view EU_WEST_3 = "eu-west-3"; +inline constexpr std::string_view ME_CENTRAL_1 = "me-central-1"; +inline constexpr std::string_view ME_SOUTH_1 = "me-south-1"; +inline constexpr std::string_view SA_EAST_1 = "sa-east-1"; +inline constexpr std::string_view US_EAST_1 = "us-east-1"; +inline constexpr std::string_view US_EAST_2 = "us-east-2"; +inline constexpr std::string_view US_GOV_EAST_1 = "us-gov-east-1"; +inline constexpr std::string_view US_GOV_WEST_1 = "us-gov-west-1"; +inline constexpr std::string_view US_ISO_EAST_1 = "us-iso-east-1"; +inline constexpr std::string_view US_ISOB_EAST_1 = "us-isob-east-1"; +inline constexpr std::string_view US_ISO_WEST_1 = "us-iso-west-1"; +inline constexpr std::string_view US_WEST_1 = "us-west-1"; +inline constexpr std::string_view US_WEST_2 = "us-west-2"; + +inline constexpr auto REGIONS = std::array{ + AF_SOUTH_1, AP_EAST_1, AP_NORTHEAST_1, + AP_NORTHEAST_2, AP_NORTHEAST_3, AP_SOUTH_1, AP_SOUTH_2, AP_SOUTHEAST_1, AP_SOUTHEAST_2, + AP_SOUTHEAST_3, CA_CENTRAL_1, CN_NORTH_1, CN_NORTHWEST_1, EU_CENTRAL_1, EU_CENTRAL_2, + EU_NORTH_1, EU_SOUTH_1, EU_SOUTH_2, EU_WEST_1, EU_WEST_2, EU_WEST_3, ME_CENTRAL_1, + ME_SOUTH_1, SA_EAST_1, US_EAST_1, US_EAST_2, US_GOV_EAST_1, US_GOV_WEST_1, + US_ISO_EAST_1, US_ISOB_EAST_1, US_ISO_WEST_1, US_WEST_1, US_WEST_2 +}; +} // namespace region + +struct CommonProperties { + Aws::Auth::AWSCredentials credentials; + aws::ProxyOptions proxy; + std::string endpoint_override_url; +}; + +class AwsProcessor : public core::ProcessorImpl { + public: + EXTENSIONAPI static constexpr auto AccessKey = core::PropertyDefinitionBuilder<>::createProperty("Access Key") + .withDescription("AWS account access key") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto SecretKey = core::PropertyDefinitionBuilder<>::createProperty("Secret Key") + .withDescription("AWS account secret key") + .supportsExpressionLanguage(true) + .isSensitive(true) + .build(); + EXTENSIONAPI static constexpr auto CredentialsFile = core::PropertyDefinitionBuilder<>::createProperty("Credentials File") + .withDescription("Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey") + .build(); + EXTENSIONAPI static constexpr auto AWSCredentialsProviderService = core::PropertyDefinitionBuilder<>::createProperty("AWS Credentials Provider service") + .withDescription("The name of the AWS Credentials Provider controller service that is used to obtain AWS credentials.") + .build(); + EXTENSIONAPI static constexpr auto Region = core::PropertyDefinitionBuilder::createProperty("Region") + .isRequired(true) + .withDefaultValue(region::US_WEST_2) + .withAllowedValues(region::REGIONS) + .withDescription("AWS Region") + .build(); + EXTENSIONAPI static constexpr auto CommunicationsTimeout = core::PropertyDefinitionBuilder<>::createProperty("Communications Timeout") + .isRequired(true) + .withValidator(core::StandardPropertyTypes::TIME_PERIOD_VALIDATOR) + .withDefaultValue("30 sec") + .withDescription("Sets the timeout of the communication between the AWS server and the client") + .build(); + EXTENSIONAPI static constexpr auto EndpointOverrideURL = core::PropertyDefinitionBuilder<>::createProperty("Endpoint Override URL") + .withDescription("Endpoint URL to use instead of the AWS default including scheme, host, " + "port, and path. The AWS libraries select an endpoint URL based on the AWS " + "region, but this property overrides the selected endpoint URL, allowing use " + "with other S3-compatible endpoints.") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto ProxyHost = core::PropertyDefinitionBuilder<>::createProperty("Proxy Host") + .withDescription("Proxy host name or IP") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto ProxyPort = core::PropertyDefinitionBuilder<>::createProperty("Proxy Port") + .withDescription("The port number of the proxy host") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto ProxyUsername = core::PropertyDefinitionBuilder<>::createProperty("Proxy Username") + .withDescription("Username to set when authenticating against proxy") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto ProxyPassword = core::PropertyDefinitionBuilder<>::createProperty("Proxy Password") + .withDescription("Password to set when authenticating against proxy") + .supportsExpressionLanguage(true) + .isSensitive(true) + .build(); + EXTENSIONAPI static constexpr auto UseDefaultCredentials = core::PropertyDefinitionBuilder<>::createProperty("Use Default Credentials") + .withDescription("If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc.") + .withValidator(core::StandardPropertyTypes::BOOLEAN_VALIDATOR) + .withDefaultValue("false") + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto Properties = std::to_array({ + AccessKey, + SecretKey, + CredentialsFile, + AWSCredentialsProviderService, + Region, + CommunicationsTimeout, + EndpointOverrideURL, + ProxyHost, + ProxyPort, + ProxyUsername, + ProxyPassword, + UseDefaultCredentials + }); + + + explicit AwsProcessor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr logger); + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + + protected: + std::optional getAWSCredentialsFromControllerService(core::ProcessContext& context) const; + std::optional getAWSCredentials(core::ProcessContext& context, const core::FlowFile* flow_file); + aws::ProxyOptions getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file); + std::optional getCommonELSupportedProperties(core::ProcessContext& context, const core::FlowFile* flow_file); + + std::shared_ptr logger_; + std::optional client_config_; +}; + +} // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/DeleteS3Object.cpp b/extensions/aws/processors/DeleteS3Object.cpp index e02efaf5b8..84b375e6e6 100644 --- a/extensions/aws/processors/DeleteS3Object.cpp +++ b/extensions/aws/processors/DeleteS3Object.cpp @@ -32,9 +32,10 @@ void DeleteS3Object::initialize() { } std::optional DeleteS3Object::buildDeleteS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const { + const CommonProperties& common_properties, + const std::string_view bucket) const { gsl_Expects(client_config_); aws::s3::DeleteObjectRequestParameters params(common_properties.credentials, *client_config_); if (const auto object_key = context.getProperty(ObjectKey, &flow_file)) { @@ -51,7 +52,7 @@ std::optional DeleteS3Object::buildDelet } logger_->log_debug("DeleteS3Object: Version [{}]", params.version); - params.bucket = common_properties.bucket; + params.bucket = bucket; params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url); return params; } @@ -70,17 +71,25 @@ void DeleteS3Object::onTrigger(core::ProcessContext& context, core::ProcessSessi return; } - auto params = buildDeleteS3RequestParams(context, *flow_file, *common_properties); + auto bucket = context.getProperty(Bucket.name, flow_file.get()); + if (!bucket) { + logger_->log_error("Bucket is invalid due to {}", bucket.error().message()); + session.transfer(flow_file, Failure); + return; + } + logger_->log_debug("S3Processor: Bucket [{}]", *bucket); + + auto params = buildDeleteS3RequestParams(context, *flow_file, *common_properties, *bucket); if (!params) { session.transfer(flow_file, Failure); return; } if (s3_wrapper_.deleteObject(*params)) { - logger_->log_debug("Successfully deleted S3 object '{}' from bucket '{}'", params->object_key, common_properties->bucket); + logger_->log_debug("Successfully deleted S3 object '{}' from bucket '{}'", params->object_key, *bucket); session.transfer(flow_file, Success); } else { - logger_->log_error("Failed to delete S3 object '{}' from bucket '{}'", params->object_key, common_properties->bucket); + logger_->log_error("Failed to delete S3 object '{}' from bucket '{}'", params->object_key, *bucket); session.transfer(flow_file, Failure); } } diff --git a/extensions/aws/processors/DeleteS3Object.h b/extensions/aws/processors/DeleteS3Object.h index 94dbf0f4ff..2d983d2a83 100644 --- a/extensions/aws/processors/DeleteS3Object.h +++ b/extensions/aws/processors/DeleteS3Object.h @@ -81,9 +81,10 @@ class DeleteS3Object : public S3Processor { } std::optional buildDeleteS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const; + const CommonProperties &common_properties, + std::string_view bucket) const; }; } // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp index 194d68f542..9c7de100ac 100644 --- a/extensions/aws/processors/FetchS3Object.cpp +++ b/extensions/aws/processors/FetchS3Object.cpp @@ -43,12 +43,13 @@ void FetchS3Object::onSchedule(core::ProcessContext& context, core::ProcessSessi } std::optional FetchS3Object::buildFetchS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const { + const CommonProperties &common_properties, + const std::string_view bucket) const { gsl_Expects(client_config_); minifi::aws::s3::GetObjectRequestParameters get_object_params(common_properties.credentials, *client_config_); - get_object_params.bucket = common_properties.bucket; + get_object_params.bucket = bucket; get_object_params.requester_pays = requester_pays_; if (const auto object_key = context.getProperty(ObjectKey, &flow_file)) { @@ -82,7 +83,15 @@ void FetchS3Object::onTrigger(core::ProcessContext& context, core::ProcessSessio return; } - auto get_object_params = buildFetchS3RequestParams(context, *flow_file, *common_properties); + auto bucket = context.getProperty(Bucket.name, flow_file.get()); + if (!bucket) { + logger_->log_error("Bucket is invalid due to {}", bucket.error().message()); + session.transfer(flow_file, Failure); + return; + } + logger_->log_debug("S3Processor: Bucket [{}]", *bucket); + + auto get_object_params = buildFetchS3RequestParams(context, *flow_file, *common_properties, *bucket); if (!get_object_params) { session.transfer(flow_file, Failure); return; diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h index 6490099aac..e64982ea3d 100644 --- a/extensions/aws/processors/FetchS3Object.h +++ b/extensions/aws/processors/FetchS3Object.h @@ -92,9 +92,10 @@ class FetchS3Object : public S3Processor { } std::optional buildFetchS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const; + const CommonProperties &common_properties, + std::string_view bucket) const; bool requester_pays_ = false; }; diff --git a/extensions/aws/processors/ListS3.cpp b/extensions/aws/processors/ListS3.cpp index 2ad5ad3a79..12ac80a282 100644 --- a/extensions/aws/processors/ListS3.cpp +++ b/extensions/aws/processors/ListS3.cpp @@ -47,10 +47,13 @@ void ListS3::onSchedule(core::ProcessContext& context, core::ProcessSessionFacto throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required property is not set or invalid"); } + auto bucket = context.getProperty(Bucket.name) | minifi::utils::expect("Required property"); + logger_->log_debug("S3Processor: Bucket [{}]", bucket); + gsl_Expects(client_config_); list_request_params_ = std::make_unique(common_properties->credentials, *client_config_); list_request_params_->setClientConfig(common_properties->proxy, common_properties->endpoint_override_url); - list_request_params_->bucket = common_properties->bucket; + list_request_params_->bucket = bucket; if (const auto delimiter = context.getProperty(Delimiter)) { list_request_params_->delimiter = *delimiter; diff --git a/extensions/aws/processors/PutKinesisStream.cpp b/extensions/aws/processors/PutKinesisStream.cpp new file mode 100644 index 0000000000..4a0affbb31 --- /dev/null +++ b/extensions/aws/processors/PutKinesisStream.cpp @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutKinesisStream.h" + +#include +#include +#include + +#include "aws/kinesis/KinesisClient.h" +#include "aws/kinesis/model/PutRecordsRequest.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::aws::processors { + +void PutKinesisStream::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void PutKinesisStream::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { + AwsProcessor::onSchedule(context, session_factory); + + batch_size_ = parseU64Property(context, MessageBatchSize); + if (batch_size_ == 0 || batch_size_ > 500) { + logger_->log_warn("PutKinesisStream::MessageBatchSize is invalid. Setting it to the maximum 500 value."); + batch_size_ = 500; + } + batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize); + if (batch_data_size_soft_cap_ > 4_MB) { + logger_->log_warn("PutKinesisStream::MaxMessageBufferSize is invalid. Setting it to the maximum 4 MB value."); + batch_data_size_soft_cap_ = 4_MB; + } + + endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | minifi::utils::toOptional(); +} + +struct StreamBatch { + uint64_t batch_size = 0; + std::vector> flow_files; +}; + +void PutKinesisStream::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + logger_->log_trace("PutKinesisStream onTrigger"); + + constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB; + std::unordered_map stream_batches; + auto credentials = getAWSCredentials(context, nullptr); + + if (!credentials) { + logger_->log_error("Failed to get credentials for PutKinesisStream"); + context.yield(); + return; + } + + for (uint64_t i = 0; i < batch_size_; i++) { + std::shared_ptr flow_file = session.get(); + if (!flow_file) { break; } + const auto flow_file_size = flow_file->getSize(); + if (flow_file_size > SINGLE_RECORD_MAX_SIZE) { + flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("record too big {}, max allowed {}", flow_file_size, SINGLE_RECORD_MAX_SIZE)); + session.transfer(flow_file, Failure); + logger_->log_error("Failed to publish to kinesis record {} because the size was greater than {} bytes", flow_file->getUUID().to_string(), SINGLE_RECORD_MAX_SIZE); + continue; + } + + auto stream_name = context.getProperty(AmazonKinesisStreamName.name, flow_file.get()); + if (!stream_name) { + logger_->log_error("Stream name is invalid due to {}", stream_name.error().message()); + session.transfer(flow_file, Failure); + continue; + } + auto partition_key = context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get()) + | minifi::utils::valueOrElse([&flow_file]() -> std::string { return flow_file->getUUID().to_string(); }); + + stream_batches[*stream_name].flow_files.push_back(std::move(flow_file)); + stream_batches[*stream_name].batch_size += flow_file_size; + + if (stream_batches[*stream_name].batch_size > batch_data_size_soft_cap_) { + break; + } + } + + std::unique_ptr kinesis_client = getClient(*credentials); + + for (const auto& [stream_name, stream_batch]: stream_batches) { + Aws::Kinesis::Model::PutRecordsRequest request; + request.SetStreamName(stream_name); + Aws::Vector records; + for (const auto& flow_file : stream_batch.flow_files) { + Aws::Kinesis::Model::PutRecordsRequestEntry entry; + const auto partition_key = context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get()) | minifi::utils::valueOrElse([&flow_file] { return flow_file->getUUID().to_string(); }); + entry.SetPartitionKey(partition_key); + const auto [status, buffer] = session.readBuffer(flow_file); + Aws::Utils::ByteBuffer aws_buffer(reinterpret_cast(buffer.data()), buffer.size()); + entry.SetData(aws_buffer); + records.push_back(entry); + } + request.SetRecords(records); + + const auto outcome = kinesis_client->PutRecords(request); + + if (!outcome.IsSuccess()) { + for (const auto& flow_file : stream_batch.flow_files) { + flow_file->addAttribute(AwsKinesisErrorMessage.name, outcome.GetError().GetMessage()); + flow_file->addAttribute(AwsKinesisErrorCode.name, std::to_string(static_cast(outcome.GetError().GetErrorType()))); + session.transfer(flow_file, Failure); + } + } else { + const auto result_records = outcome.GetResult().GetRecords(); + if (result_records.size() != stream_batch.flow_files.size()) { + logger_->log_critical("PutKinesisStream record size mismatch cannot tell which record succeeded and which didnt"); + for (const auto& flow_file : stream_batch.flow_files) { + flow_file->addAttribute(AwsKinesisErrorMessage.name, "Record size mismatch"); + session.transfer(flow_file, Failure); + } + continue; + } + for (uint64_t i = 0; i < stream_batch.flow_files.size(); i++) { + const auto& flow_file = stream_batch.flow_files[i]; + const auto& result_record = result_records[i]; + if (result_record.GetErrorCode().empty()) { + flow_file->addAttribute(AwsKinesisShardId.name, result_record.GetShardId()); + flow_file->addAttribute(AwsKinesisSequenceNumber.name, result_record.GetSequenceNumber()); + session.transfer(flow_file, Success); + } else { + flow_file->addAttribute(AwsKinesisErrorMessage.name, result_record.GetErrorMessage()); + flow_file->addAttribute(AwsKinesisErrorCode.name, result_record.GetErrorCode()); + session.transfer(flow_file, Failure); + } + } + } + } +} + +std::unique_ptr PutKinesisStream::getClient(const Aws::Auth::AWSCredentials& credentials) { + gsl_Expects(client_config_); + auto client = std::make_unique(credentials, *client_config_); + if (endpoint_override_url_) { + client->OverrideEndpoint(*endpoint_override_url_); + } + return client; +} + +REGISTER_RESOURCE(PutKinesisStream, Processor); + +} // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/PutKinesisStream.h b/extensions/aws/processors/PutKinesisStream.h new file mode 100644 index 0000000000..21cc697463 --- /dev/null +++ b/extensions/aws/processors/PutKinesisStream.h @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + + +#include +#include +#include +#include +#include + +#include "S3Processor.h" +#include "core/PropertyDefinitionBuilder.h" +#include "utils/ArrayUtils.h" +#include "aws/kinesis/KinesisClient.h" + + +namespace org::apache::nifi::minifi::aws::processors { + +class PutKinesisStream : public AwsProcessor { + public: + EXTENSIONAPI static constexpr const char* Description = "Sends the contents to a specified Amazon Kinesis. In order to send data to Kinesis, the stream name has to be specified."; + + EXTENSIONAPI static constexpr auto AmazonKinesisStreamName = core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream Name") + .withDescription("The name of Kinesis Stream") + .isRequired(true) + .withValidator(core::StandardPropertyTypes::NON_BLANK_VALIDATOR) + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto AmazonKinesisStreamPartitionKey = core::PropertyDefinitionBuilder<>::createProperty("Amazon Kinesis Stream Partition Key") + .withDescription("The partition key attribute. If it is not set, a random value is used") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto MessageBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Size") + .withDescription("Batch size for messages. [1-500]") + .withValidator(core::StandardPropertyTypes::UNSIGNED_INTEGER_VALIDATOR) + .withDefaultValue("250") + .build(); + EXTENSIONAPI static constexpr auto MaxBatchDataSize = core::PropertyDefinitionBuilder<>::createProperty("Max Batch Data Size") + .withDescription("Soft cap on the data size of the batch to a single stream. (max 4MB)") + .withValidator(core::StandardPropertyTypes::DATA_SIZE_VALIDATOR) + .withDefaultValue("1 MB") + .build(); + + EXTENSIONAPI static constexpr auto Properties = minifi::utils::array_cat(AwsProcessor::Properties, std::to_array({ + AmazonKinesisStreamName, AmazonKinesisStreamPartitionKey, MessageBatchSize, MaxBatchDataSize + })); + + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles are routed to success relationship"}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "FlowFiles are routed to failure relationship"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; + + EXTENSIONAPI static constexpr auto AwsKinesisErrorMessage = core::OutputAttributeDefinition<>{"aws.kinesis.error.message", { Failure }, + "Error message on posting message to AWS Kinesis"}; + EXTENSIONAPI static constexpr auto AwsKinesisErrorCode = core::OutputAttributeDefinition<>{"aws.kinesis.error.code", { Failure }, + "Error code for the message when posting to AWS Kinesis"}; + EXTENSIONAPI static constexpr auto AwsKinesisSequenceNumber = core::OutputAttributeDefinition<>{"aws.kinesis.sequence.number", { Success }, + "Sequence number for the message when posting to AWS Kinesis"}; + EXTENSIONAPI static constexpr auto AwsKinesisShardId = core::OutputAttributeDefinition<>{"aws.kinesis.shard.id", { Success }, + "Shard id of the message posted to AWS Kinesis"}; + EXTENSIONAPI static constexpr auto OutputAttributes = std::array{AwsKinesisErrorMessage, AwsKinesisErrorCode, AwsKinesisSequenceNumber, AwsKinesisShardId}; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + explicit PutKinesisStream(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier()) + : AwsProcessor(name, uuid, core::logging::LoggerFactory::getLogger(uuid)) { + } + + ~PutKinesisStream() override = default; + + void initialize() override; + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + + protected: + virtual std::unique_ptr getClient(const Aws::Auth::AWSCredentials& credentials); + + private: + uint64_t batch_size_ = 250; + uint64_t batch_data_size_soft_cap_ = 1_MB; + const utils::AWSInitializer& AWS_INITIALIZER = utils::AWSInitializer::get(); + std::optional endpoint_override_url_; +}; + +} // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/PutS3Object.cpp b/extensions/aws/processors/PutS3Object.cpp index 2e9413c951..881fa59cf8 100644 --- a/extensions/aws/processors/PutS3Object.cpp +++ b/extensions/aws/processors/PutS3Object.cpp @@ -111,7 +111,7 @@ std::string PutS3Object::parseAccessControlList(const std::string &comma_separat } bool PutS3Object::setCannedAcl( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const { if (const auto canned_acl = context.getProperty(CannedACL, &flow_file)) { @@ -127,7 +127,7 @@ bool PutS3Object::setCannedAcl( } bool PutS3Object::setAccessControl( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const { if (const auto full_control_user_list = context.getProperty(FullControlUserList, &flow_file)) { @@ -151,13 +151,14 @@ bool PutS3Object::setAccessControl( } std::optional PutS3Object::buildPutS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const { + const CommonProperties &common_properties, + const std::string_view bucket) const { gsl_Expects(client_config_); aws::s3::PutObjectRequestParameters params(common_properties.credentials, *client_config_); params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url); - params.bucket = common_properties.bucket; + params.bucket = bucket; params.user_metadata_map = user_metadata_map_; params.server_side_encryption = server_side_encryption_; params.storage_class = storage_class_; @@ -206,7 +207,7 @@ void PutS3Object::setAttributes( } } -void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) { +void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties, const std::string_view bucket) { { std::lock_guard lock(last_ageoff_mutex_); const auto now = std::chrono::system_clock::now(); @@ -220,7 +221,7 @@ void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properti logger_->log_trace("Listing aged off multipart uploads still in progress."); aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_); list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url); - list_params.bucket = common_properties.bucket; + list_params.bucket = bucket; list_params.age_off_limit = multipart_upload_max_age_threshold_; list_params.use_virtual_addressing = use_virtual_addressing_; auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params); @@ -229,14 +230,14 @@ void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properti return; } - logger_->log_info("Found {} aged off pending multipart upload jobs in bucket '{}'", aged_off_uploads_in_progress->size(), common_properties.bucket); + logger_->log_info("Found {} aged off pending multipart upload jobs in bucket '{}'", aged_off_uploads_in_progress->size(), bucket); size_t aborted = 0; for (const auto& upload : *aged_off_uploads_in_progress) { logger_->log_info("Aborting multipart upload with key '{}' and upload id '{}' in bucket '{}' due to reaching maximum upload age threshold.", - upload.key, upload.upload_id, common_properties.bucket); + upload.key, upload.upload_id, bucket); aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_); abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url); - abort_params.bucket = common_properties.bucket; + abort_params.bucket = bucket; abort_params.key = upload.key; abort_params.upload_id = upload.upload_id; abort_params.use_virtual_addressing = use_virtual_addressing_; @@ -247,7 +248,7 @@ void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properti ++aborted; } if (aborted > 0) { - logger_->log_info("Aborted {} pending multipart upload jobs in bucket '{}'", aborted, common_properties.bucket); + logger_->log_info("Aborted {} pending multipart upload jobs in bucket '{}'", aborted, bucket); } s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_); } @@ -266,9 +267,17 @@ void PutS3Object::onTrigger(core::ProcessContext& context, core::ProcessSession& return; } - ageOffMultipartUploads(*common_properties); + auto bucket = context.getProperty(Bucket.name, flow_file.get()); + if (!bucket) { + logger_->log_error("Bucket is invalid due to {}", bucket.error().message()); + session.transfer(flow_file, Failure); + return; + } + logger_->log_debug("S3Processor: Bucket [{}]", *bucket); + + ageOffMultipartUploads(*common_properties, *bucket); - auto put_s3_request_params = buildPutS3RequestParams(context, *flow_file, *common_properties); + auto put_s3_request_params = buildPutS3RequestParams(context, *flow_file, *common_properties, *bucket); if (!put_s3_request_params) { session.transfer(flow_file, Failure); return; diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h index c9427e856a..a338081af0 100644 --- a/extensions/aws/processors/PutS3Object.h +++ b/extensions/aws/processors/PutS3Object.h @@ -201,18 +201,19 @@ class PutS3Object : public S3Processor { void fillUserMetadata(core::ProcessContext& context); static std::string parseAccessControlList(const std::string &comma_separated_list); - bool setCannedAcl(core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const; - bool setAccessControl(core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const; + bool setCannedAcl(const core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const; + bool setAccessControl(const core::ProcessContext& context, const core::FlowFile& flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const; void setAttributes( core::ProcessSession& session, core::FlowFile& flow_file, const aws::s3::PutObjectRequestParameters &put_s3_request_params, const minifi::aws::s3::PutObjectResult &put_object_result) const; std::optional buildPutS3RequestParams( - core::ProcessContext& context, + const core::ProcessContext& context, const core::FlowFile& flow_file, - const CommonProperties &common_properties) const; - void ageOffMultipartUploads(const CommonProperties &common_properties); + const CommonProperties &common_properties, + std::string_view bucket) const; + void ageOffMultipartUploads(const CommonProperties &common_properties, const std::string_view bucket); std::string user_metadata_; std::map user_metadata_map_; diff --git a/extensions/aws/processors/S3Processor.cpp b/extensions/aws/processors/S3Processor.cpp index ff76a548a8..6be93f626c 100644 --- a/extensions/aws/processors/S3Processor.cpp +++ b/extensions/aws/processors/S3Processor.cpp @@ -32,120 +32,19 @@ namespace org::apache::nifi::minifi::aws::processors { -S3Processor::S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr logger) - : core::ProcessorImpl(name, uuid), - logger_(std::move(logger)) { -} +S3Processor::S3Processor(const std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr logger) + : AwsProcessor(name, uuid, std::move(logger)) {} -S3Processor::S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr logger, std::unique_ptr s3_request_sender) - : core::ProcessorImpl(name, uuid), - logger_(std::move(logger)), +S3Processor::S3Processor(const std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr logger, std::unique_ptr s3_request_sender) + : AwsProcessor(name, uuid, std::move(logger)), s3_wrapper_(std::move(s3_request_sender)) { } -std::optional S3Processor::getAWSCredentialsFromControllerService(core::ProcessContext& context) const { - if (const auto aws_credentials_service = minifi::utils::parseOptionalControllerService(context, AWSCredentialsProviderService, getUUID())) { - return (*aws_credentials_service)->getAWSCredentials(); - } - logger_->log_error("AWS credentials service could not be found"); - - return std::nullopt; -} - -std::optional S3Processor::getAWSCredentials( - core::ProcessContext& context, - const core::FlowFile* const flow_file) { - auto service_cred = getAWSCredentialsFromControllerService(context); - if (service_cred) { - logger_->log_info("AWS Credentials successfully set from controller service"); - return service_cred.value(); - } - - aws::AWSCredentialsProvider aws_credentials_provider; - if (const auto access_key = context.getProperty(AccessKey.name, flow_file)) { - aws_credentials_provider.setAccessKey(*access_key); - } - if (const auto secret_key = context.getProperty(SecretKey.name, flow_file)) { - aws_credentials_provider.setSecretKey(*secret_key); - } - if (const auto credentials_file = context.getProperty(CredentialsFile.name, flow_file)) { - aws_credentials_provider.setCredentialsFile(*credentials_file); - } - if (const auto use_credentials = context.getProperty(UseDefaultCredentials.name, flow_file) | minifi::utils::andThen(parsing::parseBool)) { - aws_credentials_provider.setUseDefaultCredentials(*use_credentials); - } - - return aws_credentials_provider.getAWSCredentials(); -} - -std::optional S3Processor::getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file) { - aws::s3::ProxyOptions proxy; - - proxy.host = minifi::utils::parseOptionalProperty(context, ProxyHost, flow_file).value_or(""); - proxy.port = gsl::narrow(minifi::utils::parseOptionalU64Property(context, ProxyPort, flow_file).value_or(0)); - proxy.username = minifi::utils::parseOptionalProperty(context, ProxyUsername, flow_file).value_or(""); - proxy.password = minifi::utils::parseOptionalProperty(context, ProxyPassword, flow_file).value_or(""); - - if (!proxy.host.empty()) { - logger_->log_info("Proxy for S3Processor was set."); - } - return proxy; -} - -void S3Processor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - client_config_ = Aws::Client::ClientConfiguration(); +void S3Processor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { + AwsProcessor::onSchedule(context, session_factory); if (!getProperty(Bucket.name)) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or invalid"); } - - client_config_->region = context.getProperty(Region) | minifi::utils::expect("Region property missing or invalid"); - logger_->log_debug("S3Processor: Region [{}]", client_config_->region); - - if (auto communications_timeout = minifi::utils::parseOptionalMsProperty(context, CommunicationsTimeout)) { - logger_->log_debug("S3Processor: Communications Timeout {}", *communications_timeout); - client_config_->connectTimeoutMs = gsl::narrow(communications_timeout->count()); // NOLINT(runtime/int,google-runtime-int) - } else { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid"); - } - - static const auto default_ca_file = minifi::utils::getDefaultCAFile(); - if (default_ca_file) { - client_config_->caFile = *default_ca_file; - } -} - -std::optional S3Processor::getCommonELSupportedProperties( - core::ProcessContext& context, - const core::FlowFile* const flow_file) { - CommonProperties properties; - if (auto bucket = context.getProperty(Bucket, flow_file); !bucket || bucket->empty()) { - logger_->log_error("Bucket '{}' is invalid or empty!", properties.bucket); - return std::nullopt; - } else { - properties.bucket = *bucket; - } - logger_->log_debug("S3Processor: Bucket [{}]", properties.bucket); - - auto credentials = getAWSCredentials(context, flow_file); - if (!credentials) { - logger_->log_error("AWS Credentials have not been set!"); - return std::nullopt; - } - properties.credentials = credentials.value(); - - auto proxy = getProxy(context, flow_file); - if (!proxy) { - return std::nullopt; - } - properties.proxy = proxy.value(); - - const auto endpoint_override_url = context.getProperty(EndpointOverrideURL, flow_file); - if (endpoint_override_url) { - properties.endpoint_override_url = *endpoint_override_url; - logger_->log_debug("S3Processor: Endpoint Override URL [{}]", properties.endpoint_override_url); - } - - return properties; } } // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/processors/S3Processor.h b/extensions/aws/processors/S3Processor.h index 3525e80429..64ac802eb6 100644 --- a/extensions/aws/processors/S3Processor.h +++ b/extensions/aws/processors/S3Processor.h @@ -28,74 +28,22 @@ #include #include -#include "aws/core/auth/AWSCredentialsProvider.h" -#include "S3Wrapper.h" #include "AWSCredentialsProvider.h" +#include "AwsProcessor.h" +#include "S3Wrapper.h" +#include "aws/core/auth/AWSCredentialsProvider.h" +#include "core/Processor.h" #include "core/Property.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "core/PropertyType.h" -#include "core/Processor.h" #include "core/logging/Logger.h" #include "core/logging/LoggerFactory.h" #include "utils/OptionalUtils.h" namespace org::apache::nifi::minifi::aws::processors { -namespace region { -inline constexpr std::string_view AF_SOUTH_1 = "af-south-1"; -inline constexpr std::string_view AP_EAST_1 = "ap-east-1"; -inline constexpr std::string_view AP_NORTHEAST_1 = "ap-northeast-1"; -inline constexpr std::string_view AP_NORTHEAST_2 = "ap-northeast-2"; -inline constexpr std::string_view AP_NORTHEAST_3 = "ap-northeast-3"; -inline constexpr std::string_view AP_SOUTH_1 = "ap-south-1"; -inline constexpr std::string_view AP_SOUTH_2 = "ap-south-2"; -inline constexpr std::string_view AP_SOUTHEAST_1 = "ap-southeast-1"; -inline constexpr std::string_view AP_SOUTHEAST_2 = "ap-southeast-2"; -inline constexpr std::string_view AP_SOUTHEAST_3 = "ap-southeast-3"; -inline constexpr std::string_view CA_CENTRAL_1 = "ca-central-1"; -inline constexpr std::string_view CN_NORTH_1 = "cn-north-1"; -inline constexpr std::string_view CN_NORTHWEST_1 = "cn-northwest-1"; -inline constexpr std::string_view EU_CENTRAL_1 = "eu-central-1"; -inline constexpr std::string_view EU_CENTRAL_2 = "eu-central-2"; -inline constexpr std::string_view EU_NORTH_1 = "eu-north-1"; -inline constexpr std::string_view EU_SOUTH_1 = "eu-south-1"; -inline constexpr std::string_view EU_SOUTH_2 = "eu-south-2"; -inline constexpr std::string_view EU_WEST_1 = "eu-west-1"; -inline constexpr std::string_view EU_WEST_2 = "eu-west-2"; -inline constexpr std::string_view EU_WEST_3 = "eu-west-3"; -inline constexpr std::string_view ME_CENTRAL_1 = "me-central-1"; -inline constexpr std::string_view ME_SOUTH_1 = "me-south-1"; -inline constexpr std::string_view SA_EAST_1 = "sa-east-1"; -inline constexpr std::string_view US_EAST_1 = "us-east-1"; -inline constexpr std::string_view US_EAST_2 = "us-east-2"; -inline constexpr std::string_view US_GOV_EAST_1 = "us-gov-east-1"; -inline constexpr std::string_view US_GOV_WEST_1 = "us-gov-west-1"; -inline constexpr std::string_view US_ISO_EAST_1 = "us-iso-east-1"; -inline constexpr std::string_view US_ISOB_EAST_1 = "us-isob-east-1"; -inline constexpr std::string_view US_ISO_WEST_1 = "us-iso-west-1"; -inline constexpr std::string_view US_WEST_1 = "us-west-1"; -inline constexpr std::string_view US_WEST_2 = "us-west-2"; - -inline constexpr auto REGIONS = std::array{ - AF_SOUTH_1, AP_EAST_1, AP_NORTHEAST_1, - AP_NORTHEAST_2, AP_NORTHEAST_3, AP_SOUTH_1, AP_SOUTH_2, AP_SOUTHEAST_1, AP_SOUTHEAST_2, - AP_SOUTHEAST_3, CA_CENTRAL_1, CN_NORTH_1, CN_NORTHWEST_1, EU_CENTRAL_1, EU_CENTRAL_2, - EU_NORTH_1, EU_SOUTH_1, EU_SOUTH_2, EU_WEST_1, EU_WEST_2, EU_WEST_3, ME_CENTRAL_1, - ME_SOUTH_1, SA_EAST_1, US_EAST_1, US_EAST_2, US_GOV_EAST_1, US_GOV_WEST_1, - US_ISO_EAST_1, US_ISOB_EAST_1, US_ISO_WEST_1, US_WEST_1, US_WEST_2 -}; -} // namespace region - -struct CommonProperties { - std::string bucket; - std::string object_key; - Aws::Auth::AWSCredentials credentials; - aws::s3::ProxyOptions proxy; - std::string endpoint_override_url; -}; - -class S3Processor : public core::ProcessorImpl { +class S3Processor : public AwsProcessor { public: EXTENSIONAPI static constexpr auto Bucket = core::PropertyDefinitionBuilder<>::createProperty("Bucket") .withDescription("The S3 bucket") @@ -103,95 +51,16 @@ class S3Processor : public core::ProcessorImpl { .withValidator(core::StandardPropertyTypes::NON_BLANK_VALIDATOR) .supportsExpressionLanguage(true) .build(); - EXTENSIONAPI static constexpr auto AccessKey = core::PropertyDefinitionBuilder<>::createProperty("Access Key") - .withDescription("AWS account access key") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto SecretKey = core::PropertyDefinitionBuilder<>::createProperty("Secret Key") - .withDescription("AWS account secret key") - .supportsExpressionLanguage(true) - .isSensitive(true) - .build(); - EXTENSIONAPI static constexpr auto CredentialsFile = core::PropertyDefinitionBuilder<>::createProperty("Credentials File") - .withDescription("Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey") - .build(); - EXTENSIONAPI static constexpr auto AWSCredentialsProviderService = core::PropertyDefinitionBuilder<>::createProperty("AWS Credentials Provider service") - .withDescription("The name of the AWS Credentials Provider controller service that is used to obtain AWS credentials.") - .build(); - EXTENSIONAPI static constexpr auto Region = core::PropertyDefinitionBuilder::createProperty("Region") - .isRequired(true) - .withDefaultValue(region::US_WEST_2) - .withAllowedValues(region::REGIONS) - .withDescription("AWS Region") - .build(); - EXTENSIONAPI static constexpr auto CommunicationsTimeout = core::PropertyDefinitionBuilder<>::createProperty("Communications Timeout") - .isRequired(true) - .withValidator(core::StandardPropertyTypes::TIME_PERIOD_VALIDATOR) - .withDefaultValue("30 sec") - .withDescription("Sets the timeout of the communication between the AWS server and the client") - .build(); - EXTENSIONAPI static constexpr auto EndpointOverrideURL = core::PropertyDefinitionBuilder<>::createProperty("Endpoint Override URL") - .withDescription("Endpoint URL to use instead of the AWS default including scheme, host, " - "port, and path. The AWS libraries select an endpoint URL based on the AWS " - "region, but this property overrides the selected endpoint URL, allowing use " - "with other S3-compatible endpoints.") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto ProxyHost = core::PropertyDefinitionBuilder<>::createProperty("Proxy Host") - .withDescription("Proxy host name or IP") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto ProxyPort = core::PropertyDefinitionBuilder<>::createProperty("Proxy Port") - .withDescription("The port number of the proxy host") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto ProxyUsername = core::PropertyDefinitionBuilder<>::createProperty("Proxy Username") - .withDescription("Username to set when authenticating against proxy") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto ProxyPassword = core::PropertyDefinitionBuilder<>::createProperty("Proxy Password") - .withDescription("Password to set when authenticating against proxy") - .supportsExpressionLanguage(true) - .isSensitive(true) - .build(); - EXTENSIONAPI static constexpr auto UseDefaultCredentials = core::PropertyDefinitionBuilder<>::createProperty("Use Default Credentials") - .withDescription("If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc.") - .withValidator(core::StandardPropertyTypes::BOOLEAN_VALIDATOR) - .withDefaultValue("false") - .isRequired(true) - .build(); - EXTENSIONAPI static constexpr auto Properties = std::to_array({ - Bucket, - AccessKey, - SecretKey, - CredentialsFile, - AWSCredentialsProviderService, - Region, - CommunicationsTimeout, - EndpointOverrideURL, - ProxyHost, - ProxyPort, - ProxyUsername, - ProxyPassword, - UseDefaultCredentials - }); + EXTENSIONAPI static constexpr auto Properties = minifi::utils::array_cat(AwsProcessor::Properties, std::to_array({Bucket})); - explicit S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr logger); + explicit S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr logger); void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; protected: explicit S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr logger, std::unique_ptr s3_request_sender); - - std::optional getAWSCredentialsFromControllerService(core::ProcessContext& context) const; - std::optional getAWSCredentials(core::ProcessContext& context, const core::FlowFile* const flow_file); - std::optional getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file); - std::optional getCommonELSupportedProperties(core::ProcessContext& context, const core::FlowFile* const flow_file); - - std::shared_ptr logger_; aws::s3::S3Wrapper s3_wrapper_; - std::optional client_config_; }; } // namespace org::apache::nifi::minifi::aws::processors diff --git a/extensions/aws/s3/S3RequestSender.h b/extensions/aws/s3/S3RequestSender.h index a2cd2078c5..1995031c82 100644 --- a/extensions/aws/s3/S3RequestSender.h +++ b/extensions/aws/s3/S3RequestSender.h @@ -55,13 +55,6 @@ namespace org::apache::nifi::minifi::aws::s3 { -struct ProxyOptions { - std::string host; - uint32_t port = 0; - std::string username; - std::string password; -}; - class S3RequestSender { public: virtual std::optional sendPutObjectRequest( diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h index 9bd4725a76..c6efbeb8f4 100644 --- a/extensions/aws/s3/S3Wrapper.h +++ b/extensions/aws/s3/S3Wrapper.h @@ -29,24 +29,24 @@ #include #include -#include "aws/s3/model/StorageClass.h" -#include "aws/s3/model/ServerSideEncryption.h" +#include "Exception.h" +#include "MultipartUploadStateStorage.h" +#include "S3RequestSender.h" #include "aws/s3/model/ObjectCannedACL.h" - +#include "aws/s3/model/ServerSideEncryption.h" +#include "aws/s3/model/StorageClass.h" #include "core/logging/Logger.h" #include "core/logging/LoggerFactory.h" +#include "io/InputStream.h" +#include "io/OutputStream.h" +#include "range/v3/algorithm/find.hpp" #include "utils/AWSInitializer.h" +#include "utils/ListingStateManager.h" +#include "utils/Literals.h" #include "utils/OptionalUtils.h" #include "utils/StringUtils.h" -#include "utils/ListingStateManager.h" #include "utils/gsl.h" -#include "S3RequestSender.h" -#include "Exception.h" -#include "MultipartUploadStateStorage.h" -#include "range/v3/algorithm/find.hpp" -#include "utils/Literals.h" -#include "io/InputStream.h" -#include "io/OutputStream.h" +#include "utils/ProxyOptions.h" namespace org::apache::nifi::minifi::aws::s3 { @@ -109,7 +109,7 @@ struct RequestParameters { Aws::Auth::AWSCredentials credentials; Aws::Client::ClientConfiguration client_config; - void setClientConfig(const aws::s3::ProxyOptions& proxy, const std::string& endpoint_override_url) { + void setClientConfig(const aws::ProxyOptions& proxy, const std::string& endpoint_override_url) { client_config.proxyHost = proxy.host; client_config.proxyPort = proxy.port; client_config.proxyUserName = proxy.username; diff --git a/extensions/aws/tests/PutKinesisStreamTests.cpp b/extensions/aws/tests/PutKinesisStreamTests.cpp new file mode 100644 index 0000000000..0187464bf0 --- /dev/null +++ b/extensions/aws/tests/PutKinesisStreamTests.cpp @@ -0,0 +1,195 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "aws/kinesis/model/PutRecordsRequest.h" +#include "core/Resource.h" +#include "processors/PutKinesisStream.h" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "unit/TestBase.h" + +namespace org::apache::nifi::minifi::aws::processors::test { + +class MockKinesisClient final : public Aws::Kinesis::KinesisClient { + Aws::Kinesis::Model::PutRecordsOutcome PutRecords(const Aws::Kinesis::Model::PutRecordsRequest& request) const override { + Aws::Kinesis::Model::PutRecordsResult result; + for ([[maybe_unused]] const auto& request_entry : request.GetRecords()) { + Aws::Kinesis::Model::PutRecordsResultEntry result_entry; + result_entry.SetSequenceNumber(fmt::format("sequence_number_{}", ++sequence_number_)); + result_entry.SetShardId("shard_id"); + result.AddRecords(result_entry); + } + return result; + } + + mutable uint32_t sequence_number_ = 0; +}; + +class PutKinesisStreamMocked final : public aws::processors::PutKinesisStream { + public: + static constexpr const char* Description = "PutKinesisStreamMocked"; + + explicit PutKinesisStreamMocked(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier()) + : PutKinesisStream(name, uuid) { + } + + PutKinesisStreamMocked(const PutKinesisStreamMocked&) = delete; + PutKinesisStreamMocked(PutKinesisStreamMocked&&) = delete; + PutKinesisStreamMocked& operator=(const PutKinesisStreamMocked&) = delete; + PutKinesisStreamMocked& operator=(PutKinesisStreamMocked&&) = delete; + + ~PutKinesisStreamMocked() override = default; + + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + std::unique_ptr getClient(const Aws::Auth::AWSCredentials&) override { + return std::make_unique(); + } +}; +REGISTER_RESOURCE(PutKinesisStreamMocked, Processor); + +TEST_CASE("PutKinesisStream simple happy path") { + minifi::test::SingleProcessorTestController controller(std::make_unique("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + + + const auto result = controller.trigger({{.content = "foo"}, {.content = "bar"}}); + CHECK(result.at(PutKinesisStream::Failure).empty()); + CHECK(result.at(PutKinesisStream::Success).size() == 2); + const auto res_ff_1 = result.at(PutKinesisStream::Success).at(0); + const auto res_ff_2 = result.at(PutKinesisStream::Success).at(1); + + CHECK(controller.plan->getContent(res_ff_1) == "foo"); + CHECK(controller.plan->getContent(res_ff_2) == "bar"); + + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) == "sequence_number_1"); + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == "shard_id"); + CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) == "sequence_number_2"); + CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == "shard_id"); +} + +TEST_CASE("PutKinesisStream smaller batch size than available ffs") { + minifi::test::SingleProcessorTestController controller(std::make_unique("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MessageBatchSize, "10"); + + const auto result = controller.trigger({ + {.content = "Lorem"}, + {.content = "ipsum"}, + {.content = "dolor"}, + {.content = "sit"}, + {.content = "amet"}, + {.content = "consectetur"}, + {.content = "adipiscing"}, + {.content = "elit"}, + {.content = "Morbi"}, + {.content = "dapibus"}, + {.content = "risus"}, + {.content = "a"}, + {.content = "bibendum"}, + {.content = "luctus"}}); + + CHECK(result.at(PutKinesisStream::Success).size() == 10); +} + +TEST_CASE("PutKinesisStream max batch data size fills up") { + minifi::test::SingleProcessorTestController controller(std::make_unique("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MessageBatchSize, "10"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MaxBatchDataSize, "12 B"); + + const auto result = controller.trigger({ + {.content = "Lorem"}, + {.content = "ipsum"}, + {.content = "dolor"}, + {.content = "sit"}, + {.content = "amet"}, + {.content = "consectetur"}, + {.content = "adipiscing"}, + {.content = "elit"}, + {.content = "Morbi"}, + {.content = "dapibus"}, + {.content = "risus"}, + {.content = "a"}, + {.content = "bibendum"}, + {.content = "luctus"}}); + + CHECK(result.at(PutKinesisStream::Success).size() == 3); +} + +TEST_CASE("PutKinesisStream max batch data size to different streams") { + minifi::test::SingleProcessorTestController controller(std::make_unique("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MessageBatchSize, "10"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MaxBatchDataSize, "12 B"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "${stream_name}"); + + const auto result = controller.trigger({ + {.content = "Lorem", .attributes = {{"stream_name", "stream_one"}}}, + {.content = "ipsum", .attributes = {{"stream_name", "stream_two"}}}, + {.content = "dolor", .attributes = {{"stream_name", "stream_three"}}}, + {.content = "sit", .attributes = {{"stream_name", "stream_four"}}}, + {.content = "amet", .attributes = {{"stream_name", "stream_five"}}}, + {.content = "consectetur", .attributes = {{"stream_name", "stream_six"}}}, + {.content = "adipiscing", .attributes = {{"stream_name", "stream_seven"}}}, + {.content = "elit", .attributes = {{"stream_name", "stream_eight"}}}, + {.content = "Morbi", .attributes = {{"stream_name", "stream_nine"}}}, + {.content = "dapibus", .attributes = {{"stream_name", "stream_ten"}}}, + {.content = "risus", .attributes = {{"stream_name", "stream_eleven"}}}, + {.content = "a", .attributes = {{"stream_name", "stream_twelve"}}}, + {.content = "bibendum", .attributes = {{"stream_name", "stream_thirteen"}}}, + {.content = "luctus", .attributes = {{"stream_name", "stream_fourteen"}}}}); + + CHECK(result.at(PutKinesisStream::Success).size() == 10); +} + +TEST_CASE("PutKinesisStream with too large message") { + minifi::test::SingleProcessorTestController controller(std::make_unique("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor(); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); + controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name"); + + std::string too_large_msg((1_MB + 10), 'x'); + const auto result = controller.trigger(too_large_msg); + CHECK(result.at(PutKinesisStream::Failure).size() == 1); + CHECK(result.at(PutKinesisStream::Success).empty()); + + const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0); + + CHECK(controller.plan->getContent(res_ff_1) == too_large_msg); + + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "record too big 1000010, max allowed 1000000"); + CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == std::nullopt); +} + +} // namespace org::apache::nifi::minifi::aws::processors::test diff --git a/extensions/aws/utils/ProxyOptions.h b/extensions/aws/utils/ProxyOptions.h new file mode 100644 index 0000000000..6b251cd929 --- /dev/null +++ b/extensions/aws/utils/ProxyOptions.h @@ -0,0 +1,35 @@ +/** + * @file S3RequestSender.h + * S3RequestSender class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + + +namespace org::apache::nifi::minifi::aws { + +struct ProxyOptions { + std::string host; + uint32_t port = 0; + std::string username; + std::string password; +}; + +} // namespace org::apache::nifi::minifi::aws