Skip to content

Commit

Permalink
Rebase fix
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdebreceni committed Jan 23, 2025
1 parent 7949b98 commit 001d12c
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 101 deletions.
2 changes: 2 additions & 0 deletions libminifi/include/core/ProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process
void importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow) override;

void import(const std::string& source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0) override;
void import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char inputDelimiter) override;
void import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) override;

bool exportContent(const std::string &destination, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) override;

Expand Down
22 changes: 11 additions & 11 deletions libminifi/src/core/ProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ void ProcessSessionImpl::write(core::FlowFile &flow, const io::OutputStreamCallb
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(flow, details, duration);
if (metrics_) {
metrics_->bytes_written += stream->size();
metrics_->bytesWritten() += stream->size();
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session write, type: {}, what: {}", typeid(exception).name(), exception.what());
Expand Down Expand Up @@ -319,7 +319,7 @@ void ProcessSessionImpl::append(const std::shared_ptr<core::FlowFile> &flow, con
}
flow->setSize(flow_file_size + (stream->size() - stream_size_before_callback));
if (metrics_) {
metrics_->bytes_written += stream->size() - stream_size_before_callback;
metrics_->bytesWritten() += stream->size() - stream_size_before_callback;
}

std::stringstream details;
Expand Down Expand Up @@ -379,7 +379,7 @@ int64_t ProcessSessionImpl::read(const core::FlowFile& flow_file, const io::Inpu
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
if (metrics_) {
metrics_->bytes_read += ret;
metrics_->bytesRead() += ret;
}
return ret;
} catch (const std::exception& exception) {
Expand Down Expand Up @@ -428,8 +428,8 @@ int64_t ProcessSessionImpl::readWrite(const std::shared_ptr<core::FlowFile> &flo
flow->setOffset(0);
flow->setResourceClaim(output_claim);
if (metrics_) {
metrics_->bytes_written += read_write_result->bytes_written;
metrics_->bytes_read += read_write_result->bytes_read;
metrics_->bytesWritten() += read_write_result->bytes_written;
metrics_->bytesRead() += read_write_result->bytes_read;
}

return read_write_result->bytes_written;
Expand Down Expand Up @@ -498,7 +498,7 @@ void ProcessSessionImpl::importFrom(io::InputStream &stream, const std::shared_p

content_stream->close();
if (metrics_) {
metrics_->bytes_written += content_stream->size();
metrics_->bytesWritten() += content_stream->size();
}
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
Expand Down Expand Up @@ -562,7 +562,7 @@ void ProcessSessionImpl::import(const std::string& source, const std::shared_ptr

stream->close();
if (metrics_) {
metrics_->bytes_written += stream->size();
metrics_->bytesWritten() += stream->size();
}
input.close();
if (!keepSource) {
Expand Down Expand Up @@ -667,7 +667,7 @@ void ProcessSessionImpl::import(const std::string& source, std::vector<std::shar
flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
stream->close();
if (metrics_) {
metrics_->bytes_written += stream->size();
metrics_->bytesWritten() += stream->size();
}
std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
Expand Down Expand Up @@ -953,7 +953,7 @@ void ProcessSessionImpl::commit() {
if (metrics_) {
auto time_delta = std::chrono::steady_clock::now() - commit_start_time;
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(time_delta));
metrics_->processing_nanos += std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
metrics_->processingNanos() += std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what());
Expand Down Expand Up @@ -1165,8 +1165,8 @@ std::shared_ptr<core::FlowFile> ProcessSessionImpl::get() {
ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
}
if (metrics_) {
metrics_->incoming_bytes += ret->getSize();
++metrics_->incoming_flow_files;
metrics_->incomingBytes() += ret->getSize();
++metrics_->incomingFlowFiles();
}
return ret;
}
Expand Down
32 changes: 16 additions & 16 deletions libminifi/src/core/state/nodes/FlowInformation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ std::vector<SerializedResponseNode> FlowInformation::serialize() {
.children = {
{.name = "id", .value = std::string{processor->getUUIDStr()}},
{.name = "groupId", .value = processor->getProcessGroupUUIDStr()},
{.name = "bytesRead", .value = metrics->bytes_read.load()},
{.name = "bytesWritten", .value = metrics->bytes_written.load()},
{.name = "flowFilesIn", .value = metrics->incoming_flow_files.load()},
{.name = "flowFilesOut", .value = metrics->transferred_flow_files.load()},
{.name = "bytesIn", .value = metrics->incoming_bytes.load()},
{.name = "bytesOut", .value = metrics->transferred_bytes.load()},
{.name = "invocations", .value = metrics->invocations.load()},
{.name = "processingNanos", .value = metrics->processing_nanos.load()},
{.name = "bytesRead", .value = metrics->bytesRead().load()},
{.name = "bytesWritten", .value = metrics->bytesWritten().load()},
{.name = "flowFilesIn", .value = metrics->incomingFlowFiles().load()},
{.name = "flowFilesOut", .value = metrics->transferredFlowFiles().load()},
{.name = "bytesIn", .value = metrics->incomingBytes().load()},
{.name = "bytesOut", .value = metrics->transferredBytes().load()},
{.name = "invocations", .value = metrics->invocations().load()},
{.name = "processingNanos", .value = metrics->processingNanos().load()},
{.name = "activeThreadCount", .value = -1},
{.name = "terminatedThreadCount", .value = -1},
{.name = "runStatus", .value = (processor->isRunning() ? "Running" : "Stopped")}
Expand All @@ -115,21 +115,21 @@ std::vector<PublishedMetric> FlowInformation::calculateMetrics() {
continue;
}
auto processor_metrics = processor->getMetrics();
metrics.push_back({"bytes_read", gsl::narrow<double>(processor_metrics->bytes_read.load()),
metrics.push_back({"bytes_read", gsl::narrow<double>(processor_metrics->bytesRead().load()),
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
metrics.push_back({"bytes_written", gsl::narrow<double>(processor_metrics->bytes_written.load()),
metrics.push_back({"bytes_written", gsl::narrow<double>(processor_metrics->bytesWritten().load()),
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
metrics.push_back({"flow_files_in", gsl::narrow<double>(processor_metrics->incoming_flow_files.load()),
metrics.push_back({"flow_files_in", gsl::narrow<double>(processor_metrics->incomingFlowFiles().load()),
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
metrics.push_back({"flow_files_out", gsl::narrow<double>(processor_metrics->transferred_flow_files.load()),
metrics.push_back({"flow_files_out", gsl::narrow<double>(processor_metrics->transferredFlowFiles().load()),
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
metrics.push_back({"bytes_in", gsl::narrow<double>(processor_metrics->incoming_bytes.load()),
metrics.push_back({"bytes_in", gsl::narrow<double>(processor_metrics->incomingBytes().load()),
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
metrics.push_back({"bytes_out", gsl::narrow<double>(processor_metrics->transferred_bytes.load()),
metrics.push_back({"bytes_out", gsl::narrow<double>(processor_metrics->transferredBytes().load()),
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
metrics.push_back({"invocations", gsl::narrow<double>(processor_metrics->invocations.load()),
metrics.push_back({"invocations", gsl::narrow<double>(processor_metrics->invocations().load()),
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
metrics.push_back({"processing_nanos", gsl::narrow<double>(processor_metrics->processing_nanos.load()),
metrics.push_back({"processing_nanos", gsl::narrow<double>(processor_metrics->processingNanos().load()),
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
metrics.push_back({"is_running", (processor->isRunning() ? 1.0 : 0.0),
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
Expand Down
42 changes: 21 additions & 21 deletions libminifi/test/unit/MetricsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,12 @@ TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") {
REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms);
}

class DuplicateContentProcessor : public minifi::core::Processor {
using minifi::core::Processor::Processor;
class DuplicateContentProcessor : public minifi::core::ProcessorImpl {
using minifi::core::ProcessorImpl::ProcessorImpl;

public:
DuplicateContentProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : Processor(name, uuid) {}
explicit DuplicateContentProcessor(std::string_view name) : Processor(name) {}
DuplicateContentProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : ProcessorImpl(name, uuid) {}
explicit DuplicateContentProcessor(std::string_view name) : ProcessorImpl(name) {}
static constexpr const char* Description = "A processor that creates two more of the same flow file.";
static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
static constexpr auto Success = core::RelationshipDefinition{"success", "Newly created FlowFiles"};
Expand Down Expand Up @@ -333,29 +333,29 @@ TEST_CASE("Test processor metrics change after trigger", "[ProcessorMetrics]") {
minifi::test::SingleProcessorTestController test_controller(std::make_unique<DuplicateContentProcessor>("DuplicateContentProcessor"));
test_controller.trigger({minifi::test::InputFlowFileData{"log line 1", {}}});
auto metrics = test_controller.getProcessor()->getMetrics();
CHECK(metrics->invocations == 1);
CHECK(metrics->incoming_flow_files == 1);
CHECK(metrics->transferred_flow_files == 2);
CHECK(metrics->invocations() == 1);
CHECK(metrics->incomingFlowFiles() == 1);
CHECK(metrics->transferredFlowFiles() == 2);
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 1);
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 1);
CHECK(metrics->incoming_bytes == 10);
CHECK(metrics->transferred_bytes == 30);
CHECK(metrics->bytes_read == 10);
CHECK(metrics->bytes_written == 20);
auto old_nanos = metrics->processing_nanos.load();
CHECK(metrics->processing_nanos > 0);
CHECK(metrics->incomingBytes() == 10);
CHECK(metrics->transferredBytes() == 30);
CHECK(metrics->bytesRead() == 10);
CHECK(metrics->bytesWritten() == 20);
auto old_nanos = metrics->processingNanos().load();
CHECK(metrics->processingNanos() > 0);

test_controller.trigger({minifi::test::InputFlowFileData{"new log line 2", {}}});
CHECK(metrics->invocations == 2);
CHECK(metrics->incoming_flow_files == 2);
CHECK(metrics->transferred_flow_files == 4);
CHECK(metrics->invocations() == 2);
CHECK(metrics->incomingFlowFiles() == 2);
CHECK(metrics->transferredFlowFiles() == 4);
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 2);
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 2);
CHECK(metrics->incoming_bytes == 24);
CHECK(metrics->transferred_bytes == 72);
CHECK(metrics->bytes_read == 24);
CHECK(metrics->bytes_written == 48);
CHECK(metrics->processing_nanos > old_nanos);
CHECK(metrics->incomingBytes() == 24);
CHECK(metrics->transferredBytes() == 72);
CHECK(metrics->bytesRead() == 24);
CHECK(metrics->bytesWritten() == 48);
CHECK(metrics->processingNanos() > old_nanos);
}


Expand Down
2 changes: 2 additions & 0 deletions minifi-api/include/minifi-cpp/core/ProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class ProcessSession : public virtual ReferenceContainer {

// import from the data source.
virtual void import(const std::string& source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0) = 0;
virtual void import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char inputDelimiter) = 0;
virtual void import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) = 0;

/**
* Exports the data stream to a file
Expand Down
2 changes: 2 additions & 0 deletions minifi-api/include/minifi-cpp/core/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class Processor : public virtual Connectable, public virtual ConfigurableCompone
virtual void validateAnnotations() const = 0;
virtual annotation::Input getInputRequirement() const = 0;
virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const = 0;
virtual std::string getProcessGroupUUIDStr() const = 0;
virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0;

virtual void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false) = 0;
virtual const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const = 0;
Expand Down
29 changes: 15 additions & 14 deletions minifi-api/include/minifi-cpp/core/ProcessorMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,24 @@ class ProcessorMetrics : public virtual state::response::ResponseNode {
virtual std::chrono::milliseconds getAverageSessionCommitRuntime() const = 0;
virtual std::chrono::milliseconds getLastSessionCommitRuntime() const = 0;
virtual void addLastSessionCommitRuntime(std::chrono::milliseconds runtime) = 0;
virtual std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const = 0;

virtual std::atomic<size_t>& invocations() = 0;
virtual const std::atomic<size_t>& invocations() const = 0;
virtual std::atomic<size_t>& incoming_flow_files() = 0;
virtual const std::atomic<size_t>& incoming_flow_files() const = 0;
virtual std::atomic<size_t>& transferred_flow_files() = 0;
virtual const std::atomic<size_t>& transferred_flow_files() const = 0;
virtual std::atomic<uint64_t>& incoming_bytes() = 0;
virtual const std::atomic<uint64_t>& incoming_bytes() const = 0;
virtual std::atomic<uint64_t>& transferred_bytes() = 0;
virtual const std::atomic<uint64_t>& transferred_bytes() const = 0;
virtual std::atomic<uint64_t>& bytes_read() = 0;
virtual const std::atomic<uint64_t>& bytes_read() const = 0;
virtual std::atomic<uint64_t>& bytes_written() = 0;
virtual const std::atomic<uint64_t>& bytes_written() const = 0;
virtual std::atomic<uint64_t>& processing_nanos() = 0;
virtual const std::atomic<uint64_t>& processing_nanos() const = 0;
virtual std::atomic<size_t>& incomingFlowFiles() = 0;
virtual const std::atomic<size_t>& incomingFlowFiles() const = 0;
virtual std::atomic<size_t>& transferredFlowFiles() = 0;
virtual const std::atomic<size_t>& transferredFlowFiles() const = 0;
virtual std::atomic<uint64_t>& incomingBytes() = 0;
virtual const std::atomic<uint64_t>& incomingBytes() const = 0;
virtual std::atomic<uint64_t>& transferredBytes() = 0;
virtual const std::atomic<uint64_t>& transferredBytes() const = 0;
virtual std::atomic<uint64_t>& bytesRead() = 0;
virtual const std::atomic<uint64_t>& bytesRead() const = 0;
virtual std::atomic<uint64_t>& bytesWritten() = 0;
virtual const std::atomic<uint64_t>& bytesWritten() const = 0;
virtual std::atomic<uint64_t>& processingNanos() = 0;
virtual const std::atomic<uint64_t>& processingNanos() const = 0;
};

} // namespace org::apache::nifi::minifi::core
4 changes: 2 additions & 2 deletions utils/include/core/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C
active_tasks_ = 0;
}

std::string getProcessGroupUUIDStr() const {
std::string getProcessGroupUUIDStr() const override {
return process_group_uuid_;
}

void setProcessGroupUUIDStr(const std::string &uuid) {
void setProcessGroupUUIDStr(const std::string &uuid) override {
process_group_uuid_ = uuid;
}

Expand Down
Loading

0 comments on commit 001d12c

Please sign in to comment.