Skip to content

Commit

Permalink
MINIFICPP-2291 Fix the site-to-site transfer or large files
Browse files Browse the repository at this point in the history
Previously, the site-to-site client treated the TRANSACTION_FINISHED_BUT_DESTINATION_FULL
message from the server as a failure, and kept rolling back and retrying the flow file.
Now, we treat it as success but add a yield so the server has time to clear the incoming
connection, the same way as NiFi does.

Closes apache#1720

Signed-off-by: Marton Szasz <szaszm@apache.org>
  • Loading branch information
fgerlits authored and szaszm committed Jan 25, 2024
1 parent 9e4ef8d commit 1a8ae2f
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 11 deletions.
15 changes: 15 additions & 0 deletions docker/test/integration/features/s2s.feature
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol

When both instances start up
Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds

Scenario: A MiNiFi instance produces and transfers a large data file to a NiFi instance via s2s
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content "this is a very long file we want to send by site-to-site" is present in "/tmp/input"
And a RemoteProcessGroup node opened on "http://nifi-${feature_id}:8080/nifi"
And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup

And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" on port 8080
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
And the "success" relationship of the from-minifi is connected to the PutFile

When both instances start up
Then a flowfile with the content "this is a very long file we want to send by site-to-site" is placed in the monitored directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds

Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false
Given a MiNiFi CPP server with yaml config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ def serialize_node(self, connectable, nifi_version, root, visited):
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [conn_name] if not isinstance(connectable, InputPort) else [""],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"backPressureObjectThreshold": 10,
"backPressureDataSizeThreshold": "50 B",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
Expand Down
2 changes: 1 addition & 1 deletion extensions/standard-processors/processors/GetFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void GetFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFact

if (auto directory_str = context.getProperty(Directory)) {
if (!utils::file::is_directory(*directory_str)) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory \"" + value + "\" is not a directory");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Input Directory \"", *directory_str, "\" is not a directory"));
}
request_.inputDirectory = *directory_str;
} else {
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/sitetosite/SiteToSiteClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class SiteToSiteClient : public core::Connectable {
// Cancel the transaction
virtual void cancel(const utils::Identifier &transactionID);
// Complete the transaction
virtual bool complete(const utils::Identifier &transactionID);
virtual bool complete(core::ProcessContext& context, const utils::Identifier &transactionID);
// Error the transaction
virtual void error(const utils::Identifier &transactionID);

Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/core/FlowConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ bool FlowConfiguration::persist(const std::string& serialized_flow) {
}

const bool status = filesystem_->write(*config_path_, serialized_flow);
logger_->log_info("Result of updating the config file {}: {}", config_path_, status ? "success" : "failure");
logger_->log_info("Result of updating the config file {}: {}", *config_path_, status ? "success" : "failure");
checksum_calculator_.invalidateChecksum();
return status;
}
Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/sitetosite/RawSocketProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ bool RawSiteToSiteClient::transmitPayload(core::ProcessContext& context, core::P
if (!confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " + transactionID.to_string());
}
if (!complete(transactionID)) {
if (!complete(context, transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " + transactionID.to_string());
}
logger_->log_info("Site2Site transaction {} successfully send flow record {} content bytes {}", transactionID.to_string(), transaction->current_transfers_, transaction->_bytes);
Expand Down
15 changes: 10 additions & 5 deletions libminifi/src/sitetosite/SiteToSiteClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ bool SiteToSiteClient::transferFlowFiles(core::ProcessContext& context, core::Pr
if (!confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + transactionID.to_string());
}
if (!complete(transactionID)) {
if (!complete(context, transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + transactionID.to_string());
}
logger_->log_debug("Site2Site transaction {} successfully sent flow record {}, content bytes {}", transactionID.to_string(), transaction->total_transfers_, transaction->_bytes);
Expand Down Expand Up @@ -336,7 +336,7 @@ void SiteToSiteClient::error(const utils::Identifier& transactionID) {
}

// Complete the transaction
bool SiteToSiteClient::complete(const utils::Identifier& transactionID) {
bool SiteToSiteClient::complete(core::ProcessContext& context, const utils::Identifier& transactionID) {
int ret = 0;
std::shared_ptr<Transaction> transaction = nullptr;

Expand Down Expand Up @@ -382,12 +382,17 @@ bool SiteToSiteClient::complete(const utils::Identifier& transactionID) {
if (ret <= 0)
return false;

if (code == TRANSACTION_FINISHED) {
if (code == TRANSACTION_FINISHED || code == TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
logger_->log_info("Site2Site transaction {} peer finished transaction", transactionID.to_string());
transaction->_state = TRANSACTION_COMPLETED;

if (code == TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
logger_->log_info("Site2Site transaction {} reported destination full, yielding", transactionID.to_string());
context.yield();
}
return true;
} else {
logger_->log_warn("Site2Site transaction {} peer unknown respond code {}", transactionID.to_string(), magic_enum::enum_underlying(code));
logger_->log_warn("Site2Site transaction {} peer unexpected respond code {}: {}", transactionID.to_string(), magic_enum::enum_underlying(code), magic_enum::enum_name(code));
return false;
}
}
Expand Down Expand Up @@ -718,7 +723,7 @@ bool SiteToSiteClient::receiveFlowFiles(core::ProcessContext& context, core::Pro
if (transfers > 0 && !confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
}
if (!complete(transactionID)) {
if (!complete(context, transactionID)) {
std::stringstream transaction_str;
transaction_str << "Complete Transaction " << transactionID.to_string() << " Failed";
throw Exception(SITE2SITE_EXCEPTION, transaction_str.str());
Expand Down

0 comments on commit 1a8ae2f

Please sign in to comment.