Skip to content

Commit

Permalink
substitution
Browse files Browse the repository at this point in the history
  • Loading branch information
martinzink committed Jan 25, 2024
1 parent 300b94a commit a9d4669
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 17 deletions.
50 changes: 35 additions & 15 deletions extensions/standard-processors/modbus/ReadModbusTcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,47 @@ class ReadModbusTcp final : public core::Processor {
.withDescription("The ip address or hostname of the destination.")
.withDefaultValue("localhost")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();

EXTENSIONAPI static constexpr auto Port = core::PropertyDefinitionBuilder<>::createProperty("Port")
.withDescription("The port or service on the destination.")
.withDefaultValue("502")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();

EXTENSIONAPI static constexpr auto Timeout = core::PropertyDefinitionBuilder<>::createProperty("Timeout")
.withDescription("Request timeout")
.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
.withDefaultValue("1s")
EXTENSIONAPI static constexpr auto IdleConnectionExpiration = core::PropertyDefinitionBuilder<>::createProperty("Idle Connection Expiration")
.withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
.withDefaultValue("15 seconds")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto ConnectionPerFlowFile = core::PropertyDefinitionBuilder<>::createProperty("Connection Per FlowFile")
.withDescription("Specifies whether to send each FlowFile's content on an individual connection.")
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withDefaultValue("false")
.isRequired(true)
.supportsExpressionLanguage(true)
.supportsExpressionLanguage(false)
.build();
EXTENSIONAPI static constexpr auto Timeout = core::PropertyDefinitionBuilder<>::createProperty("Timeout")
.withDescription("The timeout for connecting to and communicating with the destination.")
.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
.withDefaultValue("15 seconds")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto SSLContextService = core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service")
.withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.")
.isRequired(false)
.withAllowedTypes<minifi::controllers::SSLContextService>()
.build();

EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 8>{
Hostname,
Port,
Timeout
IdleConnectionExpiration,
ConnectionPerFlowFile,
Timeout,
SSLContextService
};

EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Successfully processed"};
Expand All @@ -82,16 +103,15 @@ class ReadModbusTcp final : public core::Processor {

void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() final;
void initialize() override;

private:
void processFlowFile(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
core::ProcessSession& session,
const std::shared_ptr<core::FlowFile>& flow_file);
std::unordered_map<std::string, std::string> getAddressMap(const core::FlowFile& flow_file) const;
std::shared_ptr<core::FlowFile> getFlowFile(core::ProcessSession& session) const;
bool hasUsableSocket() const;
asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
asio::awaitable<std::error_code> establishNewConnection(const asio::ip::tcp::resolver::results_type& endpoints);

std::optional<utils::net::TcpSocket> tcp_socket_;
void removeExpiredConnections();

asio::io_context io_context_;
std::optional<std::unordered_map<utils::net::ConnectionId, std::shared_ptr<utils::net::ConnectionHandlerBase>>> connections_;
Expand Down
4 changes: 2 additions & 2 deletions extensions/standard-processors/processors/PutTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ void PutTCP::onTrigger(core::ProcessContext& context, core::ProcessSession& sess

removeExpiredConnections();

auto hostname = context.getProperty(Hostname, flow_file).value_or(std::string{});
auto port = context.getProperty(Port, flow_file).value_or(std::string{});
const auto hostname = context.getProperty(Hostname, flow_file).value_or(std::string{});
const auto port = context.getProperty(Port, flow_file).value_or(std::string{});
if (hostname.empty() || port.empty()) {
logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(),
hostname.empty() ? "(empty)" : hostname.c_str(),
Expand Down

0 comments on commit a9d4669

Please sign in to comment.