diff --git a/docker/test/integration/features/s2s.feature b/docker/test/integration/features/s2s.feature index 6b5f2e6f06..c236f2ac7b 100644 --- a/docker/test/integration/features/s2s.feature +++ b/docker/test/integration/features/s2s.feature @@ -34,6 +34,21 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol When both instances start up Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds + + Scenario: A MiNiFi instance produces and transfers a large data file to a NiFi instance via s2s + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "this is a very long file we want to send by site-to-site" is present in "/tmp/input" + And a RemoteProcessGroup node opened on "http://nifi-${feature_id}:8080/nifi" + And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup + + And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" on port 8080 + And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow + And the "success" relationship of the from-minifi is connected to the PutFile + + When both instances start up + Then a flowfile with the content "this is a very long file we want to send by site-to-site" is placed in the monitored directory in less than 90 seconds + And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false Given a MiNiFi CPP server with yaml config diff --git a/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py index 2c24f2d366..fa3f36ac66 100644 --- a/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py @@ -210,8 +210,8 @@ def serialize_node(self, connectable, nifi_version, root, visited): "labelIndex": 1, "zIndex": 0, "selectedRelationships": [conn_name] if not isinstance(connectable, InputPort) else [""], - "backPressureObjectThreshold": 10000, - "backPressureDataSizeThreshold": "1 GB", + "backPressureObjectThreshold": 10, + "backPressureDataSizeThreshold": "50 B", "flowFileExpiration": "0 sec", "prioritizers": [], "bends": [], diff --git a/extensions/standard-processors/processors/GetFile.cpp b/extensions/standard-processors/processors/GetFile.cpp index b7c5e174bd..3189967308 100644 --- a/extensions/standard-processors/processors/GetFile.cpp +++ b/extensions/standard-processors/processors/GetFile.cpp @@ -78,7 +78,7 @@ void GetFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFact if (auto directory_str = context.getProperty(Directory)) { if (!utils::file::is_directory(*directory_str)) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory \"" + value + "\" is not a directory"); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Input Directory \"", *directory_str, "\" is not a directory")); } request_.inputDirectory = *directory_str; } else { diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index a39e4d008e..53f4b2f295 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -201,7 +201,7 @@ class SiteToSiteClient : public core::Connectable { // Cancel the transaction virtual void cancel(const utils::Identifier &transactionID); // Complete the transaction - virtual bool complete(const utils::Identifier &transactionID); + virtual bool complete(core::ProcessContext& context, const utils::Identifier &transactionID); // Error the transaction virtual void error(const utils::Identifier &transactionID); diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index f955372b07..81729a1fb5 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -143,7 +143,7 @@ bool FlowConfiguration::persist(const std::string& serialized_flow) { } const bool status = filesystem_->write(*config_path_, serialized_flow); - logger_->log_info("Result of updating the config file {}: {}", config_path_, status ? "success" : "failure"); + logger_->log_info("Result of updating the config file {}: {}", *config_path_, status ? "success" : "failure"); checksum_calculator_.invalidateChecksum(); return status; } diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp index 040029888e..1a28cfc355 100644 --- a/libminifi/src/sitetosite/RawSocketProtocol.cpp +++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp @@ -595,7 +595,7 @@ bool RawSiteToSiteClient::transmitPayload(core::ProcessContext& context, core::P if (!confirm(transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " + transactionID.to_string()); } - if (!complete(transactionID)) { + if (!complete(context, transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " + transactionID.to_string()); } logger_->log_info("Site2Site transaction {} successfully send flow record {} content bytes {}", transactionID.to_string(), transaction->current_transfers_, transaction->_bytes); diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index 429ac70f58..725e200f3d 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -164,7 +164,7 @@ bool SiteToSiteClient::transferFlowFiles(core::ProcessContext& context, core::Pr if (!confirm(transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + transactionID.to_string()); } - if (!complete(transactionID)) { + if (!complete(context, transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + transactionID.to_string()); } logger_->log_debug("Site2Site transaction {} successfully sent flow record {}, content bytes {}", transactionID.to_string(), transaction->total_transfers_, transaction->_bytes); @@ -336,7 +336,7 @@ void SiteToSiteClient::error(const utils::Identifier& transactionID) { } // Complete the transaction -bool SiteToSiteClient::complete(const utils::Identifier& transactionID) { +bool SiteToSiteClient::complete(core::ProcessContext& context, const utils::Identifier& transactionID) { int ret = 0; std::shared_ptr transaction = nullptr; @@ -382,12 +382,17 @@ bool SiteToSiteClient::complete(const utils::Identifier& transactionID) { if (ret <= 0) return false; - if (code == TRANSACTION_FINISHED) { + if (code == TRANSACTION_FINISHED || code == TRANSACTION_FINISHED_BUT_DESTINATION_FULL) { logger_->log_info("Site2Site transaction {} peer finished transaction", transactionID.to_string()); transaction->_state = TRANSACTION_COMPLETED; + + if (code == TRANSACTION_FINISHED_BUT_DESTINATION_FULL) { + logger_->log_info("Site2Site transaction {} reported destination full, yielding", transactionID.to_string()); + context.yield(); + } return true; } else { - logger_->log_warn("Site2Site transaction {} peer unknown respond code {}", transactionID.to_string(), magic_enum::enum_underlying(code)); + logger_->log_warn("Site2Site transaction {} peer unexpected respond code {}: {}", transactionID.to_string(), magic_enum::enum_underlying(code), magic_enum::enum_name(code)); return false; } } @@ -718,7 +723,7 @@ bool SiteToSiteClient::receiveFlowFiles(core::ProcessContext& context, core::Pro if (transfers > 0 && !confirm(transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed"); } - if (!complete(transactionID)) { + if (!complete(context, transactionID)) { std::stringstream transaction_str; transaction_str << "Complete Transaction " << transactionID.to_string() << " Failed"; throw Exception(SITE2SITE_EXCEPTION, transaction_str.str());