From 47a6468b5c0fc9ea5f1355f4f658e5c5571b610d Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Thu, 23 Jan 2025 16:13:39 +0100 Subject: [PATCH] Rebase fix --- libminifi/include/core/ProcessSession.h | 2 + libminifi/src/core/ProcessSession.cpp | 22 +++++----- .../src/core/state/nodes/FlowInformation.cpp | 32 +++++++------- libminifi/test/unit/MetricsTests.cpp | 42 +++++++++---------- .../include/minifi-cpp/core/ProcessSession.h | 2 + .../include/minifi-cpp/core/Processor.h | 2 + .../minifi-cpp/core/ProcessorMetrics.h | 29 ++++++------- utils/include/core/Processor.h | 4 +- utils/include/core/ProcessorMetrics.h | 42 +++++++++---------- utils/src/core/Processor.cpp | 2 +- utils/src/core/ProcessorMetrics.cpp | 30 ++++++------- 11 files changed, 108 insertions(+), 101 deletions(-) diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index b424acbce4..38665c6626 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -110,6 +110,8 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process void importFrom(io::InputStream&& stream, const std::shared_ptr &flow) override; void import(const std::string& source, const std::shared_ptr &flow, bool keepSource = true, uint64_t offset = 0) override; + void import(const std::string& source, std::vector> &flows, uint64_t offset, char inputDelimiter) override; + void import(const std::string& source, std::vector> &flows, bool keepSource, uint64_t offset, char inputDelimiter) override; bool exportContent(const std::string &destination, const std::shared_ptr &flow, bool keepContent) override; diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 0aa27752a0..5526cda691 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -267,7 +267,7 @@ void ProcessSessionImpl::write(core::FlowFile &flow, const io::OutputStreamCallb auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time); provenance_report_->modifyContent(flow, details, duration); if (metrics_) { - metrics_->bytes_written += stream->size(); + metrics_->bytesWritten() += stream->size(); } } catch (const std::exception& exception) { logger_->log_debug("Caught Exception during process session write, type: {}, what: {}", typeid(exception).name(), exception.what()); @@ -319,7 +319,7 @@ void ProcessSessionImpl::append(const std::shared_ptr &flow, con } flow->setSize(flow_file_size + (stream->size() - stream_size_before_callback)); if (metrics_) { - metrics_->bytes_written += stream->size() - stream_size_before_callback; + metrics_->bytesWritten() += stream->size() - stream_size_before_callback; } std::stringstream details; @@ -379,7 +379,7 @@ int64_t ProcessSessionImpl::read(const core::FlowFile& flow_file, const io::Inpu throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } if (metrics_) { - metrics_->bytes_read += ret; + metrics_->bytesRead() += ret; } return ret; } catch (const std::exception& exception) { @@ -428,8 +428,8 @@ int64_t ProcessSessionImpl::readWrite(const std::shared_ptr &flo flow->setOffset(0); flow->setResourceClaim(output_claim); if (metrics_) { - metrics_->bytes_written += read_write_result->bytes_written; - metrics_->bytes_read += read_write_result->bytes_read; + metrics_->bytesWritten() += read_write_result->bytes_written; + metrics_->bytesRead() += read_write_result->bytes_read; } return read_write_result->bytes_written; @@ -498,7 +498,7 @@ void ProcessSessionImpl::importFrom(io::InputStream &stream, const std::shared_p content_stream->close(); if (metrics_) { - metrics_->bytes_written += content_stream->size(); + metrics_->bytesWritten() += content_stream->size(); } std::stringstream details; details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); @@ -562,7 +562,7 @@ void ProcessSessionImpl::import(const std::string& source, const std::shared_ptr stream->close(); if (metrics_) { - metrics_->bytes_written += stream->size(); + metrics_->bytesWritten() += stream->size(); } input.close(); if (!keepSource) { @@ -667,7 +667,7 @@ void ProcessSessionImpl::import(const std::string& source, std::vectorgetOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr()); stream->close(); if (metrics_) { - metrics_->bytes_written += stream->size(); + metrics_->bytesWritten() += stream->size(); } std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr(); auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time); @@ -953,7 +953,7 @@ void ProcessSessionImpl::commit() { if (metrics_) { auto time_delta = std::chrono::steady_clock::now() - commit_start_time; metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast(time_delta)); - metrics_->processing_nanos += std::chrono::duration_cast(time_delta).count(); + metrics_->processingNanos() += std::chrono::duration_cast(time_delta).count(); } } catch (const std::exception& exception) { logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what()); @@ -1165,8 +1165,8 @@ std::shared_ptr ProcessSessionImpl::get() { ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId()); } if (metrics_) { - metrics_->incoming_bytes += ret->getSize(); - ++metrics_->incoming_flow_files; + metrics_->incomingBytes() += ret->getSize(); + ++metrics_->incomingFlowFiles(); } return ret; } diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index f31b26c297..de8be246fb 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -81,14 +81,14 @@ std::vector FlowInformation::serialize() { .children = { {.name = "id", .value = std::string{processor->getUUIDStr()}}, {.name = "groupId", .value = processor->getProcessGroupUUIDStr()}, - {.name = "bytesRead", .value = metrics->bytes_read.load()}, - {.name = "bytesWritten", .value = metrics->bytes_written.load()}, - {.name = "flowFilesIn", .value = metrics->incoming_flow_files.load()}, - {.name = "flowFilesOut", .value = metrics->transferred_flow_files.load()}, - {.name = "bytesIn", .value = metrics->incoming_bytes.load()}, - {.name = "bytesOut", .value = metrics->transferred_bytes.load()}, - {.name = "invocations", .value = metrics->invocations.load()}, - {.name = "processingNanos", .value = metrics->processing_nanos.load()}, + {.name = "bytesRead", .value = metrics->bytesRead().load()}, + {.name = "bytesWritten", .value = metrics->bytesWritten().load()}, + {.name = "flowFilesIn", .value = metrics->incomingFlowFiles().load()}, + {.name = "flowFilesOut", .value = metrics->transferredFlowFiles().load()}, + {.name = "bytesIn", .value = metrics->incomingBytes().load()}, + {.name = "bytesOut", .value = metrics->transferredBytes().load()}, + {.name = "invocations", .value = metrics->invocations().load()}, + {.name = "processingNanos", .value = metrics->processingNanos().load()}, {.name = "activeThreadCount", .value = -1}, {.name = "terminatedThreadCount", .value = -1}, {.name = "runStatus", .value = (processor->isRunning() ? "Running" : "Stopped")} @@ -115,21 +115,21 @@ std::vector FlowInformation::calculateMetrics() { continue; } auto processor_metrics = processor->getMetrics(); - metrics.push_back({"bytes_read", gsl::narrow(processor_metrics->bytes_read.load()), + metrics.push_back({"bytes_read", gsl::narrow(processor_metrics->bytesRead().load()), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); - metrics.push_back({"bytes_written", gsl::narrow(processor_metrics->bytes_written.load()), + metrics.push_back({"bytes_written", gsl::narrow(processor_metrics->bytesWritten().load()), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); - metrics.push_back({"flow_files_in", gsl::narrow(processor_metrics->incoming_flow_files.load()), + metrics.push_back({"flow_files_in", gsl::narrow(processor_metrics->incomingFlowFiles().load()), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); - metrics.push_back({"flow_files_out", gsl::narrow(processor_metrics->transferred_flow_files.load()), + metrics.push_back({"flow_files_out", gsl::narrow(processor_metrics->transferredFlowFiles().load()), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); - metrics.push_back({"bytes_in", gsl::narrow(processor_metrics->incoming_bytes.load()), + metrics.push_back({"bytes_in", gsl::narrow(processor_metrics->incomingBytes().load()), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); - metrics.push_back({"bytes_out", gsl::narrow(processor_metrics->transferred_bytes.load()), + metrics.push_back({"bytes_out", gsl::narrow(processor_metrics->transferredBytes().load()), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); - metrics.push_back({"invocations", gsl::narrow(processor_metrics->invocations.load()), + metrics.push_back({"invocations", gsl::narrow(processor_metrics->invocations().load()), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); - metrics.push_back({"processing_nanos", gsl::narrow(processor_metrics->processing_nanos.load()), + metrics.push_back({"processing_nanos", gsl::narrow(processor_metrics->processingNanos().load()), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); metrics.push_back({"is_running", (processor->isRunning() ? 1.0 : 0.0), {{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}}); diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index f8c986af48..806970b071 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -287,12 +287,12 @@ TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") { REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms); } -class DuplicateContentProcessor : public minifi::core::Processor { - using minifi::core::Processor::Processor; +class DuplicateContentProcessor : public minifi::core::ProcessorImpl { + using minifi::core::ProcessorImpl::ProcessorImpl; public: - DuplicateContentProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : Processor(name, uuid) {} - explicit DuplicateContentProcessor(std::string_view name) : Processor(name) {} + DuplicateContentProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : ProcessorImpl(name, uuid) {} + explicit DuplicateContentProcessor(std::string_view name) : ProcessorImpl(name) {} static constexpr const char* Description = "A processor that creates two more of the same flow file."; static constexpr auto Properties = std::array{}; static constexpr auto Success = core::RelationshipDefinition{"success", "Newly created FlowFiles"}; @@ -333,29 +333,29 @@ TEST_CASE("Test processor metrics change after trigger", "[ProcessorMetrics]") { minifi::test::SingleProcessorTestController test_controller(std::make_unique("DuplicateContentProcessor")); test_controller.trigger({minifi::test::InputFlowFileData{"log line 1", {}}}); auto metrics = test_controller.getProcessor()->getMetrics(); - CHECK(metrics->invocations == 1); - CHECK(metrics->incoming_flow_files == 1); - CHECK(metrics->transferred_flow_files == 2); + CHECK(metrics->invocations() == 1); + CHECK(metrics->incomingFlowFiles() == 1); + CHECK(metrics->transferredFlowFiles() == 2); CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 1); CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 1); - CHECK(metrics->incoming_bytes == 10); - CHECK(metrics->transferred_bytes == 30); - CHECK(metrics->bytes_read == 10); - CHECK(metrics->bytes_written == 20); - auto old_nanos = metrics->processing_nanos.load(); - CHECK(metrics->processing_nanos > 0); + CHECK(metrics->incomingBytes() == 10); + CHECK(metrics->transferredBytes() == 30); + CHECK(metrics->bytesRead() == 10); + CHECK(metrics->bytesWritten() == 20); + auto old_nanos = metrics->processingNanos().load(); + CHECK(metrics->processingNanos() > 0); test_controller.trigger({minifi::test::InputFlowFileData{"new log line 2", {}}}); - CHECK(metrics->invocations == 2); - CHECK(metrics->incoming_flow_files == 2); - CHECK(metrics->transferred_flow_files == 4); + CHECK(metrics->invocations() == 2); + CHECK(metrics->incomingFlowFiles() == 2); + CHECK(metrics->transferredFlowFiles() == 4); CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 2); CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 2); - CHECK(metrics->incoming_bytes == 24); - CHECK(metrics->transferred_bytes == 72); - CHECK(metrics->bytes_read == 24); - CHECK(metrics->bytes_written == 48); - CHECK(metrics->processing_nanos > old_nanos); + CHECK(metrics->incomingBytes() == 24); + CHECK(metrics->transferredBytes() == 72); + CHECK(metrics->bytesRead() == 24); + CHECK(metrics->bytesWritten() == 48); + CHECK(metrics->processingNanos() > old_nanos); } diff --git a/minifi-api/include/minifi-cpp/core/ProcessSession.h b/minifi-api/include/minifi-cpp/core/ProcessSession.h index e19af3b188..196357e215 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessSession.h +++ b/minifi-api/include/minifi-cpp/core/ProcessSession.h @@ -97,6 +97,8 @@ class ProcessSession : public virtual ReferenceContainer { // import from the data source. virtual void import(const std::string& source, const std::shared_ptr &flow, bool keepSource = true, uint64_t offset = 0) = 0; + virtual void import(const std::string& source, std::vector> &flows, uint64_t offset, char inputDelimiter) = 0; + virtual void import(const std::string& source, std::vector> &flows, bool keepSource, uint64_t offset, char inputDelimiter) = 0; /** * Exports the data stream to a file diff --git a/minifi-api/include/minifi-cpp/core/Processor.h b/minifi-api/include/minifi-cpp/core/Processor.h index 092df14261..52b9ab6c90 100644 --- a/minifi-api/include/minifi-cpp/core/Processor.h +++ b/minifi-api/include/minifi-cpp/core/Processor.h @@ -83,6 +83,8 @@ class Processor : public virtual Connectable, public virtual ConfigurableCompone virtual void validateAnnotations() const = 0; virtual annotation::Input getInputRequirement() const = 0; virtual gsl::not_null> getMetrics() const = 0; + virtual std::string getProcessGroupUUIDStr() const = 0; + virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0; virtual void updateReachability(const std::lock_guard& graph_lock, bool force = false) = 0; virtual const std::unordered_map>& reachable_processors() const = 0; diff --git a/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h b/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h index 5e4825731d..8d19c1cd19 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h +++ b/minifi-api/include/minifi-cpp/core/ProcessorMetrics.h @@ -34,23 +34,24 @@ class ProcessorMetrics : public virtual state::response::ResponseNode { virtual std::chrono::milliseconds getAverageSessionCommitRuntime() const = 0; virtual std::chrono::milliseconds getLastSessionCommitRuntime() const = 0; virtual void addLastSessionCommitRuntime(std::chrono::milliseconds runtime) = 0; + virtual std::optional getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const = 0; virtual std::atomic& invocations() = 0; virtual const std::atomic& invocations() const = 0; - virtual std::atomic& incoming_flow_files() = 0; - virtual const std::atomic& incoming_flow_files() const = 0; - virtual std::atomic& transferred_flow_files() = 0; - virtual const std::atomic& transferred_flow_files() const = 0; - virtual std::atomic& incoming_bytes() = 0; - virtual const std::atomic& incoming_bytes() const = 0; - virtual std::atomic& transferred_bytes() = 0; - virtual const std::atomic& transferred_bytes() const = 0; - virtual std::atomic& bytes_read() = 0; - virtual const std::atomic& bytes_read() const = 0; - virtual std::atomic& bytes_written() = 0; - virtual const std::atomic& bytes_written() const = 0; - virtual std::atomic& processing_nanos() = 0; - virtual const std::atomic& processing_nanos() const = 0; + virtual std::atomic& incomingFlowFiles() = 0; + virtual const std::atomic& incomingFlowFiles() const = 0; + virtual std::atomic& transferredFlowFiles() = 0; + virtual const std::atomic& transferredFlowFiles() const = 0; + virtual std::atomic& incomingBytes() = 0; + virtual const std::atomic& incomingBytes() const = 0; + virtual std::atomic& transferredBytes() = 0; + virtual const std::atomic& transferredBytes() const = 0; + virtual std::atomic& bytesRead() = 0; + virtual const std::atomic& bytesRead() const = 0; + virtual std::atomic& bytesWritten() = 0; + virtual const std::atomic& bytesWritten() const = 0; + virtual std::atomic& processingNanos() = 0; + virtual const std::atomic& processingNanos() const = 0; }; } // namespace org::apache::nifi::minifi::core diff --git a/utils/include/core/Processor.h b/utils/include/core/Processor.h index 336f13a59d..4a17fcc9bd 100644 --- a/utils/include/core/Processor.h +++ b/utils/include/core/Processor.h @@ -167,11 +167,11 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C active_tasks_ = 0; } - std::string getProcessGroupUUIDStr() const { + std::string getProcessGroupUUIDStr() const override { return process_group_uuid_; } - void setProcessGroupUUIDStr(const std::string &uuid) { + void setProcessGroupUUIDStr(const std::string &uuid) override { process_group_uuid_ = uuid; } diff --git a/utils/include/core/ProcessorMetrics.h b/utils/include/core/ProcessorMetrics.h index 3d1ebfb45e..030d2ae342 100644 --- a/utils/include/core/ProcessorMetrics.h +++ b/utils/include/core/ProcessorMetrics.h @@ -49,27 +49,27 @@ class ProcessorMetricsImpl : public state::response::ResponseNodeImpl, public vi std::chrono::milliseconds getLastOnTriggerRuntime() const override; void addLastOnTriggerRuntime(std::chrono::milliseconds runtime) override; - std::chrono::milliseconds getAverageSessionCommitRuntime() const; - std::chrono::milliseconds getLastSessionCommitRuntime() const; - void addLastSessionCommitRuntime(std::chrono::milliseconds runtime); - std::optional getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const; - - std::atomic& invocations() {return invocations_;} - const std::atomic& invocations() const {return invocations_;} - std::atomic& incoming_flow_files() {return incoming_flow_files_;} - const std::atomic& incoming_flow_files() const {return incoming_flow_files_;} - std::atomic& transferred_flow_files() {return transferred_flow_files_;} - const std::atomic& transferred_flow_files() const {return transferred_flow_files_;} - std::atomic& incoming_bytes() {return incoming_bytes_;} - const std::atomic& incoming_bytes() const {return incoming_bytes_;} - std::atomic& transferred_bytes() {return transferred_bytes_;} - const std::atomic& transferred_bytes() const {return transferred_bytes_;} - std::atomic& bytes_read() {return bytes_read_;} - const std::atomic& bytes_read() const {return bytes_read_;} - std::atomic& bytes_written() {return bytes_written_;} - const std::atomic& bytes_written() const {return bytes_written_;} - std::atomic& processing_nanos() {return processing_nanos_;} - const std::atomic& processing_nanos() const {return processing_nanos_;} + std::chrono::milliseconds getAverageSessionCommitRuntime() const override; + std::chrono::milliseconds getLastSessionCommitRuntime() const override; + void addLastSessionCommitRuntime(std::chrono::milliseconds runtime) override; + std::optional getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const override; + + std::atomic& invocations() override {return invocations_;} + const std::atomic& invocations() const override {return invocations_;} + std::atomic& incomingFlowFiles() override {return incoming_flow_files_;} + const std::atomic& incomingFlowFiles() const override {return incoming_flow_files_;} + std::atomic& transferredFlowFiles() override {return transferred_flow_files_;} + const std::atomic& transferredFlowFiles() const override {return transferred_flow_files_;} + std::atomic& incomingBytes() override {return incoming_bytes_;} + const std::atomic& incomingBytes() const override {return incoming_bytes_;} + std::atomic& transferredBytes() override {return transferred_bytes_;} + const std::atomic& transferredBytes() const override {return transferred_bytes_;} + std::atomic& bytesRead() override {return bytes_read_;} + const std::atomic& bytesRead() const override {return bytes_read_;} + std::atomic& bytesWritten() override {return bytes_written_;} + const std::atomic& bytesWritten() const override {return bytes_written_;} + std::atomic& processingNanos() override {return processing_nanos_;} + const std::atomic& processingNanos() const override {return processing_nanos_;} protected: template diff --git a/utils/src/core/Processor.cpp b/utils/src/core/Processor.cpp index 1e17f80d41..cf2eb6b531 100644 --- a/utils/src/core/Processor.cpp +++ b/utils/src/core/Processor.cpp @@ -201,7 +201,7 @@ void ProcessorImpl::triggerAndCommit(const std::shared_ptr& cont } void ProcessorImpl::trigger(const std::shared_ptr& context, const std::shared_ptr& process_session) { - ++metrics_->invocations; + ++metrics_->invocations(); const auto start = std::chrono::steady_clock::now(); onTrigger(*context, *process_session); metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast(std::chrono::steady_clock::now() - start)); diff --git a/utils/src/core/ProcessorMetrics.cpp b/utils/src/core/ProcessorMetrics.cpp index 81e66c2c90..f615be769c 100644 --- a/utils/src/core/ProcessorMetrics.cpp +++ b/utils/src/core/ProcessorMetrics.cpp @@ -49,13 +49,13 @@ std::vector ProcessorMetricsImpl::seria {.name = "LastOnTriggerRunTime", .value = static_cast(getLastOnTriggerRuntime().count())}, {.name = "AverageSessionCommitRunTime", .value = static_cast(getAverageSessionCommitRuntime().count())}, {.name = "LastSessionCommitRunTime", .value = static_cast(getLastSessionCommitRuntime().count())}, - {.name = "TransferredFlowFiles", .value = static_cast(transferred_flow_files())}, - {.name = "TransferredBytes", .value = static_cast(transferred_bytes())}, - {.name = "IncomingFlowFiles", .value = static_cast(incoming_flow_files())}, - {.name = "IncomingBytes", .value = static_cast(incoming_bytes())}, - {.name = "BytesRead", .value = static_cast(bytes_read())}, - {.name = "BytesWritten", .value = static_cast(bytes_written())}, - {.name = "ProcessingNanos", .value = static_cast(processing_nanos())} + {.name = "TransferredFlowFiles", .value = static_cast(transferredFlowFiles())}, + {.name = "TransferredBytes", .value = static_cast(transferredBytes())}, + {.name = "IncomingFlowFiles", .value = static_cast(incomingFlowFiles())}, + {.name = "IncomingBytes", .value = static_cast(incomingBytes())}, + {.name = "BytesRead", .value = static_cast(bytesRead())}, + {.name = "BytesWritten", .value = static_cast(bytesWritten())}, + {.name = "ProcessingNanos", .value = static_cast(processingNanos())} } }; @@ -83,13 +83,13 @@ std::vector ProcessorMetricsImpl::calculateMetrics() { {"last_onTrigger_runtime_milliseconds", static_cast(getLastOnTriggerRuntime().count()), getCommonLabels()}, {"average_session_commit_runtime_milliseconds", static_cast(getAverageSessionCommitRuntime().count()), getCommonLabels()}, {"last_session_commit_runtime_milliseconds", static_cast(getLastSessionCommitRuntime().count()), getCommonLabels()}, - {"transferred_flow_files", static_cast(transferred_flow_files()), getCommonLabels()}, - {"transferred_bytes", static_cast(transferred_bytes()), getCommonLabels()}, - {"incoming_flow_files", static_cast(incoming_flow_files()), getCommonLabels()}, - {"incoming_bytes", static_cast(incoming_bytes()), getCommonLabels()}, - {"bytes_read", static_cast(bytes_read()), getCommonLabels()}, - {"bytes_written", static_cast(bytes_written()), getCommonLabels()}, - {"processing_nanos", static_cast(processing_nanos()), getCommonLabels()} + {"transferred_flow_files", static_cast(transferredFlowFiles()), getCommonLabels()}, + {"transferred_bytes", static_cast(transferredBytes()), getCommonLabels()}, + {"incoming_flow_files", static_cast(incomingFlowFiles()), getCommonLabels()}, + {"incoming_bytes", static_cast(incomingBytes()), getCommonLabels()}, + {"bytes_read", static_cast(bytesRead()), getCommonLabels()}, + {"bytes_written", static_cast(bytesWritten()), getCommonLabels()}, + {"processing_nanos", static_cast(processingNanos()), getCommonLabels()} }; { @@ -132,7 +132,7 @@ std::chrono::milliseconds ProcessorMetricsImpl::getLastSessionCommitRuntime() co return session_commit_runtime_averager_.getLastValue(); } -std::optional ProcessorMetrics::getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const { +std::optional ProcessorMetricsImpl::getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const { std::lock_guard lock(transferred_relationships_mutex_); const auto relationship_it = transferred_relationships_.find(relationship); if (relationship_it != std::end(transferred_relationships_)) {