From a9d4669f0695e76a1e4e123daf7a9de5595c8d61 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Thu, 25 Jan 2024 17:43:23 +0100 Subject: [PATCH] substitution --- .../modbus/ReadModbusTcp.h | 50 +++++++++++++------ .../standard-processors/processors/PutTCP.cpp | 4 +- 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/extensions/standard-processors/modbus/ReadModbusTcp.h b/extensions/standard-processors/modbus/ReadModbusTcp.h index 680d9a95b04..8ee994a27e6 100644 --- a/extensions/standard-processors/modbus/ReadModbusTcp.h +++ b/extensions/standard-processors/modbus/ReadModbusTcp.h @@ -47,26 +47,47 @@ class ReadModbusTcp final : public core::Processor { .withDescription("The ip address or hostname of the destination.") .withDefaultValue("localhost") .isRequired(true) + .supportsExpressionLanguage(true) .build(); - EXTENSIONAPI static constexpr auto Port = core::PropertyDefinitionBuilder<>::createProperty("Port") .withDescription("The port or service on the destination.") - .withDefaultValue("502") .isRequired(true) + .supportsExpressionLanguage(true) .build(); - - EXTENSIONAPI static constexpr auto Timeout = core::PropertyDefinitionBuilder<>::createProperty("Timeout") - .withDescription("Request timeout") - .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) - .withDefaultValue("1s") + EXTENSIONAPI static constexpr auto IdleConnectionExpiration = core::PropertyDefinitionBuilder<>::createProperty("Idle Connection Expiration") + .withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") + .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) + .withDefaultValue("15 seconds") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto ConnectionPerFlowFile = core::PropertyDefinitionBuilder<>::createProperty("Connection Per FlowFile") + .withDescription("Specifies whether to send each FlowFile's content on an individual connection.") + .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE) + .withDefaultValue("false") .isRequired(true) - .supportsExpressionLanguage(true) + .supportsExpressionLanguage(false) .build(); + EXTENSIONAPI static constexpr auto Timeout = core::PropertyDefinitionBuilder<>::createProperty("Timeout") + .withDescription("The timeout for connecting to and communicating with the destination.") + .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) + .withDefaultValue("15 seconds") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto SSLContextService = core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service") + .withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") + .isRequired(false) + .withAllowedTypes() + .build(); EXTENSIONAPI static constexpr auto Properties = std::array{ Hostname, Port, - Timeout + IdleConnectionExpiration, + ConnectionPerFlowFile, + Timeout, + SSLContextService }; EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Successfully processed"}; @@ -82,16 +103,15 @@ class ReadModbusTcp final : public core::Processor { void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; - void initialize() final; + void initialize() override; private: + void processFlowFile(const std::shared_ptr& connection_handler, + core::ProcessSession& session, + const std::shared_ptr& flow_file); std::unordered_map getAddressMap(const core::FlowFile& flow_file) const; std::shared_ptr getFlowFile(core::ProcessSession& session) const; - bool hasUsableSocket() const; - asio::awaitable setupUsableSocket(asio::io_context& io_context); - asio::awaitable establishNewConnection(const asio::ip::tcp::resolver::results_type& endpoints); - - std::optional tcp_socket_; + void removeExpiredConnections(); asio::io_context io_context_; std::optional>> connections_; diff --git a/extensions/standard-processors/processors/PutTCP.cpp b/extensions/standard-processors/processors/PutTCP.cpp index fd7fddd0ddc..7d37211fdfe 100644 --- a/extensions/standard-processors/processors/PutTCP.cpp +++ b/extensions/standard-processors/processors/PutTCP.cpp @@ -111,8 +111,8 @@ void PutTCP::onTrigger(core::ProcessContext& context, core::ProcessSession& sess removeExpiredConnections(); - auto hostname = context.getProperty(Hostname, flow_file).value_or(std::string{}); - auto port = context.getProperty(Port, flow_file).value_or(std::string{}); + const auto hostname = context.getProperty(Hostname, flow_file).value_or(std::string{}); + const auto port = context.getProperty(Port, flow_file).value_or(std::string{}); if (hostname.empty() || port.empty()) { logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(), hostname.empty() ? "(empty)" : hostname.c_str(),