diff --git a/C2.md b/C2.md index 91a29e030d..8a2358dfc8 100644 --- a/C2.md +++ b/C2.md @@ -115,6 +115,9 @@ be requested via C2 DESCRIBE manifest command. # minimize REST heartbeat updates #nifi.c2.rest.heartbeat.minimize.updates=true + # specify the maximum number of bulletins to send in a heartbeat + # nifi.c2.flow.info.processor.bulletin.limit=1000 + #### Flow Id and URL Flow id and URL are usually retrieved from the C2 server. These identify the last updated flow version and where the flow was downloaded from. These properties are persisted in the minifi.properties file. @@ -194,6 +197,20 @@ configuration produces the following JSON: "uuid": "2438e3c8-015a-1000-79ca-83af40ec1997" } }, + "processorBulletins": [ + { + "id": 1, + "timestamp": "Mon Jan 27 12:10:47 UTC 2025", + "level": "ERROR", + "category": "Log Message", + "message": "Error connecting to localhost:8776 due to Connection refused (2438e3c8-015a-1000-79ca-83af40ec1991)", + "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990", + "groupName": "MiNiFi Flow", + "groupPath": "MiNiFi Flow", + "sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991", + "sourceName": "GetTCP" + } + ], "processorStatuses": [ { "id": "5128e3c8-015a-1000-79ca-83af40ec1990", @@ -536,6 +553,20 @@ Contains information about the flow the agent is running, including the versione "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332" } }, + "processorBulletins": [ + { + "id": 1, + "timestamp": "Mon Jan 27 12:10:47 UTC 2025", + "level": "ERROR", + "category": "Log Message", + "message": "Error connecting to localhost:8776 due to Connection refused (2438e3c8-015a-1000-79ca-83af40ec1991)", + "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990", + "groupName": "MiNiFi Flow", + "groupPath": "MiNiFi Flow", + "sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991", + "sourceName": "GetTCP" + } + ], "processorStatuses": [ { "id": "5128e3c8-015a-1000-79ca-83af40ec1990", @@ -550,7 +581,7 @@ Contains information about the flow the agent is running, including the versione "processingNanos": 0, "activeThreadCount": -1, "terminatedThreadCount": -1, - "running": true + "runStatus": "Running" }, { "id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f", @@ -565,11 +596,11 @@ Contains information about the flow the agent is running, including the versione "processingNanos": 2119148, "activeThreadCount": -1, "terminatedThreadCount": -1, - "running": true + "runStatus": "Running" } ], "flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64", - "running": true + "runStatus": "Running" } ``` diff --git a/conf/minifi.properties b/conf/minifi.properties index 1a18fb076d..b2f5974c99 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -120,6 +120,9 @@ nifi.c2.full.heartbeat=false # specify encoding strategy for c2 requests (gzip, none) #nifi.c2.rest.request.encoding=none +# specify the maximum number of bulletins to send in a heartbeat +#nifi.c2.flow.info.processor.bulletin.limit=1000 + ## enable the controller socket provider on port 9998 ## off by default. #controller.socket.enable=true diff --git a/extensions/python/ExecutePythonProcessor.cpp b/extensions/python/ExecutePythonProcessor.cpp index 0fd8e0c96c..d83bf69c7b 100644 --- a/extensions/python/ExecutePythonProcessor.cpp +++ b/extensions/python/ExecutePythonProcessor.cpp @@ -160,10 +160,7 @@ void ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty() { std::unique_ptr ExecutePythonProcessor::createScriptEngine() { auto engine = std::make_unique(); - - python_logger_ = core::logging::LoggerFactory::getAliasedLogger(getName()); engine->initialize(Success, Failure, Original, python_logger_); - return engine; } @@ -225,6 +222,12 @@ std::vector ExecutePythonProcessor::getPythonRelationships() return relationships; } +void ExecutePythonProcessor::setLoggerCallback(const std::function& callback) { + gsl_Expects(logger_ && python_logger_); + logger_->addLogCallback(callback); + python_logger_->addLogCallback(callback); +} + REGISTER_RESOURCE(ExecutePythonProcessor, Processor); } // namespace org::apache::nifi::minifi::extensions::python::processors diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h index 64947d3118..b6e7a512ca 100644 --- a/extensions/python/ExecutePythonProcessor.h +++ b/extensions/python/ExecutePythonProcessor.h @@ -46,6 +46,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl { python_dynamic_(false), reload_on_script_change_(true) { logger_ = core::logging::LoggerFactory::getLogger(uuid_); + python_logger_ = core::logging::LoggerFactory::getAliasedLogger(getName()); } EXTENSIONAPI static constexpr const char* Description = "Executes a script given the flow file and a process session. " @@ -138,8 +139,8 @@ class ExecutePythonProcessor : public core::ProcessorImpl { } std::map getProperties() const override; - std::vector getPythonRelationships(); + void setLoggerCallback(const std::function& callback) override; protected: const core::Property* findProperty(const std::string& name) const override; diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp index 14084be803..7fe2ffb051 100644 --- a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp +++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp @@ -74,6 +74,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") { "schedulingPeriod": "3 sec", "penaltyDuration": "12 sec", "yieldDuration": "4 sec", + "bulletinLevel": "ERROR", "runDurationMillis": 12, "autoTerminatedRelationships": ["one", "two"], "properties": { @@ -143,6 +144,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") { REQUIRE(3s == proc->getSchedulingPeriod()); REQUIRE(12s == proc->getPenalizationPeriod()); REQUIRE(4s == proc->getYieldPeriod()); + REQUIRE(proc->getLogBulletinLevel() == logging::LOG_LEVEL::err); REQUIRE(proc->isAutoTerminated({"one", ""})); REQUIRE(proc->isAutoTerminated({"two", ""})); REQUIRE_FALSE(proc->isAutoTerminated({"three", ""})); diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp index 876167a4db..b187a69ca4 100644 --- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp +++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp @@ -100,6 +100,7 @@ Security Properties: scheduling period: 1 sec penalization period: 30 sec yield period: 1 sec + bulletin level: ERROR run duration nanos: 0 auto-terminated relationships list: Properties: @@ -157,6 +158,7 @@ Provenance Reporting: REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriod()); REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod()); REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriod()); + REQUIRE(rootFlowConfig->findProcessorByName("TailFile")->getLogBulletinLevel() == logging::LOG_LEVEL::err); REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano()); std::map connectionMap; diff --git a/libminifi/include/core/BulletinStore.h b/libminifi/include/core/BulletinStore.h new file mode 100644 index 0000000000..3726014548 --- /dev/null +++ b/libminifi/include/core/BulletinStore.h @@ -0,0 +1,70 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "properties/Configure.h" +#include "core/logging/LoggerFactory.h" +#include "core/Processor.h" + +namespace org::apache::nifi::minifi { +namespace test { +class BulletinStoreTestAccessor; +} // namespace test + +namespace core { + +struct Bulletin { + uint64_t id = 0; + std::chrono::time_point timestamp; + std::string level; + std::string category; + std::string message; + std::string group_id; + std::string group_name; + std::string group_path; + std::string source_id; + std::string source_name; +}; + +class BulletinStore { + public: + explicit BulletinStore(const Configure& configure); + void addProcessorBulletin(const core::Processor& processor, core::logging::LOG_LEVEL log_level, const std::string& message); + std::deque getBulletins(std::optional time_interval_to_include = {}) const; + size_t getMaxBulletinCount() const; + + private: + friend class minifi::test::BulletinStoreTestAccessor; + + static constexpr size_t DEFAULT_BULLETIN_COUNT = 1000; + size_t max_bulletin_count_; + mutable std::mutex mutex_; + uint64_t id_counter = 1; + std::deque bulletins_; + std::shared_ptr logger_{logging::LoggerFactory::getLogger()}; +}; + +} // namespace core +} // namespace org::apache::nifi::minifi diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 9cbe93e81d..fe53ed02a5 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -42,6 +42,7 @@ #include "utils/file/FileSystem.h" #include "utils/ChecksumCalculator.h" #include "ParameterContext.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::core { @@ -60,6 +61,7 @@ struct ConfigurationContext { std::optional path{std::nullopt}; std::shared_ptr filesystem{std::make_shared()}; std::optional sensitive_values_encryptor{std::nullopt}; + core::BulletinStore* bulletin_store{nullptr}; }; enum class FlowSerializationType { Json, NifiJson, Yaml }; @@ -149,6 +151,7 @@ class FlowConfiguration : public CoreComponentImpl { std::shared_ptr filesystem_; utils::crypto::EncryptionProvider sensitive_values_encryptor_; utils::ChecksumCalculator checksum_calculator_; + core::BulletinStore* bulletin_store_ = nullptr; private: virtual std::string serialize(const ProcessGroup&) { return ""; } diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index bb3b018b7e..1a267b04e4 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -73,17 +73,13 @@ class ProcessGroup : public CoreComponentImpl { ProcessGroup(ProcessGroupType type, std::string_view name); ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid); ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid, int version); - // Destructor ~ProcessGroup() override; - // Set URL void setURL(std::string url) { url_ = std::move(url); } - // Get URL std::string getURL() { return (url_); } - // SetTransmitting void setTransmitting(bool val) { transmitting_ = val; } @@ -93,7 +89,6 @@ class ProcessGroup : public CoreComponentImpl { uint64_t getTimeout() { return timeout_; } - // setInterface void setInterface(std::string &ifc) { local_network_interface_ = ifc; } @@ -124,11 +119,9 @@ class ProcessGroup : public CoreComponentImpl { http::HTTPProxy getHTTPProxy() { return proxy_; } - // Set Processor yield period in MilliSecond void setYieldPeriodMsec(std::chrono::milliseconds period) { yield_period_msec_ = period; } - // Get Processor yield period in MilliSecond std::chrono::milliseconds getYieldPeriodMsec() { return (yield_period_msec_); } @@ -147,13 +140,11 @@ class ProcessGroup : public CoreComponentImpl { const std::function& filter = nullptr); bool isRemoteProcessGroup(); - // set parent process group void setParent(ProcessGroup *parent) { std::lock_guard lock(mutex_); parent_process_group_ = parent; } - // get parent process group - ProcessGroup *getParent() { + ProcessGroup *getParent() const { std::lock_guard lock(mutex_); return parent_process_group_; } @@ -185,25 +176,17 @@ class ProcessGroup : public CoreComponentImpl { } return nullptr; } - // findProcessor based on UUID Processor* findProcessorById(const utils::Identifier& uuid, Traverse traverse = Traverse::IncludeChildren) const; - // findProcessor based on name Processor* findProcessorByName(const std::string &processorName, Traverse traverse = Traverse::IncludeChildren) const; void getAllProcessors(std::vector& processor_vec) const; - /** - * Add controller service - * @param nodeId node identifier - * @param node controller service node. - */ void addControllerService(const std::string &nodeId, const std::shared_ptr &node); core::controller::ControllerServiceNode* findControllerService(const std::string &nodeId, Traverse traverse = Traverse::ExcludeChildren) const; std::vector getAllControllerServices() const; - // update property value void updatePropertyValue(const std::string& processorName, const std::string& propertyName, const std::string& propertyValue); void getConnections(std::map& connectionMap); @@ -228,45 +211,29 @@ class ProcessGroup : public CoreComponentImpl { protected: void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler); - // version int config_version_; - // Process Group Type const ProcessGroupType type_; - // Processors (ProcessNode) inside this process group which include Remote Process Group input/Output port std::set> processors_; std::set failed_processors_; std::set ports_; std::set> child_process_groups_; - // Connections between the processor inside the group; std::set> connections_; - // Parent Process Group ProcessGroup* parent_process_group_; - // Yield Period in Milliseconds std::atomic yield_period_msec_; std::atomic timeout_; - - // URL std::string url_; - // local network interface std::string local_network_interface_; - // Transmitting std::atomic transmitting_; - // http proxy http::HTTPProxy proxy_; std::string transport_protocol_; - - // controller services - core::controller::ControllerServiceNodeMap controller_service_map_; - ParameterContext* parameter_context_ = nullptr; private: static Port* findPortById(const std::set& ports, const utils::Identifier& uuid); + std::string buildGroupPath() const; - // Mutex for protection mutable std::recursive_mutex mutex_; - // Logger std::shared_ptr logger_; ProcessGroup(const ProcessGroup &parent); diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h index e4706204cb..dd29562aa5 100644 --- a/libminifi/include/core/ProcessorConfig.h +++ b/libminifi/include/core/ProcessorConfig.h @@ -14,8 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ -#define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ +#pragma once #include #include @@ -23,12 +22,7 @@ #include "core/Core.h" #include "core/Property.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace core { - +namespace org::apache::nifi::minifi::core { constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"}; constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"}; @@ -47,16 +41,11 @@ struct ProcessorConfig { std::string schedulingPeriod; std::string penalizationPeriod; std::string yieldPeriod; + std::string bulletinLevel; std::string runDurationNanos; std::vector autoTerminatedRelationships; std::vector properties; std::string parameterContextName; }; -} // namespace core -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org - -#endif // LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/include/core/flow/FlowSchema.h b/libminifi/include/core/flow/FlowSchema.h index 6cbad427a0..238b481eaf 100644 --- a/libminifi/include/core/flow/FlowSchema.h +++ b/libminifi/include/core/flow/FlowSchema.h @@ -35,6 +35,7 @@ struct FlowSchema { Keys max_concurrent_tasks; Keys penalization_period; Keys proc_yield_period; + Keys bulletin_level; Keys runduration_nanos; Keys connections; diff --git a/libminifi/include/core/logging/LoggerBase.h b/libminifi/include/core/logging/LoggerBase.h index 69142b4738..29cd40ed51 100644 --- a/libminifi/include/core/logging/LoggerBase.h +++ b/libminifi/include/core/logging/LoggerBase.h @@ -80,6 +80,36 @@ inline LOG_LEVEL mapFromSpdLogLevel(spdlog::level::level_enum level) { throw std::invalid_argument(fmt::format("Invalid spdlog::level::level_enum {}", magic_enum::enum_underlying(level))); } +inline std::string mapLogLevelToString(LOG_LEVEL level) { + switch (level) { + case trace: return "TRACE"; + case debug: return "DEBUG"; + case info: return "INFO"; + case warn: return "WARN"; + case err: return "ERROR"; + case critical: return "CRITICAL"; + case off: return "OFF"; + } + throw std::invalid_argument(fmt::format("Invalid LOG_LEVEL {}", magic_enum::enum_underlying(level))); +} + +inline LOG_LEVEL mapStringToLogLevel(const std::string& level_str) { + if (level_str == "TRACE") { + return trace; + } else if (level_str == "DEBUG") { + return debug; + } else if (level_str == "INFO") { + return info; + } else if (level_str == "WARN") { + return warn; + } else if (level_str == "ERROR") { + return err; + } else if (level_str == "CRITICAL") { + return critical; + } + throw std::invalid_argument(fmt::format("Invalid LOG_LEVEL {}", level_str)); +} + class LoggerBase : public Logger { public: LoggerBase(LoggerBase const&) = delete; @@ -92,6 +122,7 @@ class LoggerBase : public Logger { bool should_log(LOG_LEVEL level) override; void log_string(LOG_LEVEL level, std::string str) override; LOG_LEVEL level() const override; + void addLogCallback(const std::function& callback) override; protected: LoggerBase(std::shared_ptr delegate, std::shared_ptr controller); @@ -109,6 +140,7 @@ class LoggerBase : public Logger { private: std::atomic max_log_size_{LOG_BUFFER_SIZE}; + std::vector> log_callbacks_; }; } // namespace org::apache::nifi::minifi::core::logging diff --git a/libminifi/include/core/state/MetricsPublisherStore.h b/libminifi/include/core/state/MetricsPublisherStore.h index 4be8c3504b..61cad498bf 100644 --- a/libminifi/include/core/state/MetricsPublisherStore.h +++ b/libminifi/include/core/state/MetricsPublisherStore.h @@ -28,13 +28,14 @@ #include "utils/gsl.h" #include "core/ProcessGroup.h" #include "utils/file/AssetManager.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::state { class MetricsPublisherStore { public: MetricsPublisherStore(std::shared_ptr configuration, const std::vector>& repository_metric_sources, - std::shared_ptr flow_configuration, utils::file::AssetManager* asset_manager = nullptr); + std::shared_ptr flow_configuration, utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* bulletin_store = nullptr); void initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink); void loadMetricNodes(core::ProcessGroup* root); void clearMetricNodes(); diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index 7ba9a49311..e0f69b2cae 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -27,6 +27,7 @@ #include "Connection.h" #include "core/state/ConnectionStore.h" #include "core/Processor.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::state::response { @@ -121,6 +122,10 @@ class FlowInformation : public StateMonitorNode { processors_ = std::move(processors); } + void setBulletinStore(core::BulletinStore* bulletin_store) { + bulletin_store_ = bulletin_store; + } + std::vector serialize() override; std::vector calculateMetrics() override; @@ -128,6 +133,7 @@ class FlowInformation : public StateMonitorNode { std::shared_ptr flow_version_; ConnectionStore connection_store_; std::vector processors_; + core::BulletinStore* bulletin_store_ = nullptr; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index 69fc73a7b9..018bc5e08c 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -36,13 +36,14 @@ #include "core/RepositoryMetricsSource.h" #include "utils/file/AssetManager.h" #include "minifi-cpp/core/state/nodes/ResponseNodeLoader.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::state::response { class ResponseNodeLoaderImpl : public ResponseNodeLoader { public: ResponseNodeLoaderImpl(std::shared_ptr configuration, std::vector> repository_metric_sources, - std::shared_ptr flow_configuration, utils::file::AssetManager* asset_manager = nullptr); + std::shared_ptr flow_configuration, utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* bulletin_store = nullptr); void setNewConfigRoot(core::ProcessGroup* root) override; void clearConfigRoot() override; @@ -81,6 +82,7 @@ class ResponseNodeLoaderImpl : public ResponseNodeLoader { utils::file::AssetManager* asset_manager_{}; core::controller::ControllerServiceProvider* controller_{}; state::StateMonitor* update_sink_{}; + core::BulletinStore* bulletin_store_{}; std::shared_ptr logger_{core::logging::LoggerFactory::getLogger()}; }; diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index df50a7d728..b68ffde4de 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -98,6 +98,7 @@ const std::unordered_maplog_debug("Bulletin limit not set, using default value of {}", DEFAULT_BULLETIN_COUNT); + max_bulletin_count_ = DEFAULT_BULLETIN_COUNT; + return; + } + try { + max_bulletin_count_ = std::stoul(*max_bulletin_count_str); + } catch(const std::exception&) { + logger_->log_warn("Invalid value for bulletin limit, using default value of {}", DEFAULT_BULLETIN_COUNT); + max_bulletin_count_ = DEFAULT_BULLETIN_COUNT; + } +} + +void BulletinStore::addProcessorBulletin(const core::Processor& processor, core::logging::LOG_LEVEL log_level, const std::string& message) { + std::lock_guard lock(mutex_); + Bulletin bulletin; + bulletin.id = id_counter++; + bulletin.timestamp = std::chrono::system_clock::now(); + bulletin.level = core::logging::mapLogLevelToString(log_level); + bulletin.category = "Log Message"; + bulletin.message = message; + bulletin.group_id = processor.getProcessGroupUUIDStr(); + bulletin.group_name = processor.getProcessGroupName(); + bulletin.group_path = processor.getProcessGroupPath(); + bulletin.source_id = processor.getUUIDStr(); + bulletin.source_name = processor.getName(); + if (bulletins_.size() >= max_bulletin_count_) { + bulletins_.pop_front(); + } + bulletins_.push_back(std::move(bulletin)); +} + +std::deque BulletinStore::getBulletins(std::optional time_interval_to_include) const { + std::lock_guard lock(mutex_); + if (!time_interval_to_include) { + return bulletins_; + } + for (auto it = bulletins_.begin(); it != bulletins_.end(); ++it) { + if (std::chrono::system_clock::now() - it->timestamp <= *time_interval_to_include) { + return {it, bulletins_.end()}; + } + } + return {}; +} + +size_t BulletinStore::getMaxBulletinCount() const { + std::lock_guard lock(mutex_); + return max_bulletin_count_; +} + +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index ce32dd0517..f215918c43 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -37,6 +37,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx) service_provider_(std::make_shared(std::make_unique(), configuration_)), filesystem_(std::move(ctx.filesystem)), sensitive_values_encryptor_(std::move(ctx.sensitive_values_encryptor.value())), + bulletin_store_(ctx.bulletin_store), logger_(logging::LoggerFactory::getLogger()) { std::string flowUrl; std::string bucket_id = "default"; diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 4f274575da..df183661c9 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -87,6 +87,8 @@ std::tuple ProcessGroup::addProcessor(std::unique_ptrgetName(); std::lock_guard lock(mutex_); processor->setProcessGroupUUIDStr(getUUIDStr()); + processor->setProcessGroupName(getName()); + processor->setProcessGroupPath(buildGroupPath()); const auto [iter, inserted] = processors_.insert(std::move(processor)); if (inserted) { logger_->log_debug("Add processor {} into process group {}", name, name_); @@ -485,4 +487,15 @@ ParameterContext* ProcessGroup::getParameterContext() const { return parameter_context_; } +std::string ProcessGroup::buildGroupPath() const { + std::lock_guard lock(mutex_); + std::string path = name_; + auto parent = parent_process_group_; + while (parent != nullptr) { + path.insert(0, parent->getName() + " / "); + parent = parent->getParent(); + } + return path; +} + } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp index 4ab3aba0b3..53637ec76c 100644 --- a/libminifi/src/core/flow/FlowSchema.cpp +++ b/libminifi/src/core/flow/FlowSchema.cpp @@ -31,6 +31,7 @@ FlowSchema FlowSchema::getDefault() { .max_concurrent_tasks = {"max concurrent tasks"}, .penalization_period = {"penalization period"}, .proc_yield_period = {"yield period"}, + .bulletin_level = {"bulletin level"}, .runduration_nanos = {"run duration nanos"}, .connections = {"Connections"}, @@ -96,6 +97,7 @@ FlowSchema FlowSchema::getNiFiFlowJson() { .max_concurrent_tasks = {"concurrentlySchedulableTaskCount"}, .penalization_period = {"penaltyDuration"}, .proc_yield_period = {"yieldDuration"}, + .bulletin_level = {"bulletinLevel"}, // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch between nano and milli is not an issue .runduration_nanos = {"runDurationMillis"}, diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index d79550b460..7740a59aac 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -247,6 +247,11 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co logger_->log_debug("parseProcessorNode: yield period => [{}]", procCfg.yieldPeriod); } + if (auto bulletin_level_node = procNode[schema_.bulletin_level]) { + procCfg.bulletinLevel = bulletin_level_node.getString().value(); + logger_->log_debug("parseProcessorNode: bulletin level => [{}]", procCfg.bulletinLevel); + } + if (auto runNode = procNode[schema_.runduration_nanos]) { procCfg.runDurationNanos = runNode.getIntegerAsString().value(); logger_->log_debug("parseProcessorNode: run duration nanos => [{}]", procCfg.runDurationNanos); @@ -289,6 +294,18 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co processor->setYieldPeriodMsec(yield_period.value()); } + if (!procCfg.bulletinLevel.empty()) { + processor->setLogBulletinLevel(core::logging::mapStringToLogLevel(procCfg.bulletinLevel)); + } + processor->setLoggerCallback([this, processor = processor.get()](core::logging::LOG_LEVEL level, const std::string& message) { + if (level < processor->getLogBulletinLevel()) { + return; + } + if (bulletin_store_) { + bulletin_store_->addProcessorBulletin(*processor, level, message); + } + }); + // Default to running processor->setScheduledState(core::RUNNING); diff --git a/libminifi/src/core/logging/LoggerBase.cpp b/libminifi/src/core/logging/LoggerBase.cpp index f4415a891c..9df1a890ac 100644 --- a/libminifi/src/core/logging/LoggerBase.cpp +++ b/libminifi/src/core/logging/LoggerBase.cpp @@ -36,6 +36,10 @@ void LoggerControl::setEnabled(bool status) { is_enabled_ = status; } +void LoggerBase::addLogCallback(const std::function& callback) { + std::lock_guard lock(mutex_); + log_callbacks_.push_back(callback); +} bool LoggerBase::should_log(LOG_LEVEL level) { if (controller_ && !controller_->is_enabled()) @@ -46,6 +50,9 @@ bool LoggerBase::should_log(LOG_LEVEL level) { } void LoggerBase::log_string(LOG_LEVEL level, std::string str) { + for (const auto& callback : log_callbacks_) { + callback(level, str); + } delegate_->log(mapToSpdLogLevel(level), str.c_str()); } diff --git a/libminifi/src/core/state/MetricsPublisherStore.cpp b/libminifi/src/core/state/MetricsPublisherStore.cpp index 85fddb39a6..6c89e58c16 100644 --- a/libminifi/src/core/state/MetricsPublisherStore.cpp +++ b/libminifi/src/core/state/MetricsPublisherStore.cpp @@ -23,9 +23,9 @@ namespace org::apache::nifi::minifi::state { MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr configuration, const std::vector>& repository_metric_sources, - std::shared_ptr flow_configuration, utils::file::AssetManager* asset_manager) + std::shared_ptr flow_configuration, utils::file::AssetManager* asset_manager, core::BulletinStore* bulletin_store) : configuration_(configuration), - response_node_loader_(std::make_shared(std::move(configuration), repository_metric_sources, std::move(flow_configuration), asset_manager)) { + response_node_loader_(std::make_shared(std::move(configuration), repository_metric_sources, std::move(flow_configuration), asset_manager, bulletin_store)) { } void MetricsPublisherStore::initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink) { diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index de8be246fb..483c983c4d 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -17,6 +17,9 @@ #include "core/state/nodes/FlowInformation.h" #include "core/Resource.h" +#include "utils/TimeUtil.h" + +using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::state::response { @@ -98,6 +101,30 @@ std::vector FlowInformation::serialize() { serialized.push_back(processorsStatusesNode); } + if (bulletin_store_) { + SerializedResponseNode processorBulletinsNode{.name = "processorBulletins", .array = true, .collapsible = false}; + auto bulletins = bulletin_store_->getBulletins(5min); + for (const auto& bulletin : bulletins) { + processorBulletinsNode.children.push_back({ + .name = std::to_string(bulletin.id), + .collapsible = false, + .children = { + {.name = "id", .value = bulletin.id}, + {.name = "timestamp", .value = utils::timeutils::getNiFiDateTimeFormat(std::chrono::time_point_cast(bulletin.timestamp))}, + {.name = "level", .value = bulletin.level}, + {.name = "category", .value = bulletin.category}, + {.name = "message", .value = bulletin.message}, + {.name = "groupId", .value = bulletin.group_id}, + {.name = "groupName", .value = bulletin.group_name}, + {.name = "groupPath", .value = bulletin.group_path}, + {.name = "sourceId", .value = bulletin.source_id}, + {.name = "sourceName", .value = bulletin.source_name} + } + }); + } + serialized.push_back(processorBulletinsNode); + } + return serialized; } diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index ef8d6e867c..257eab3df5 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -34,11 +34,12 @@ namespace org::apache::nifi::minifi::state::response { ResponseNodeLoaderImpl::ResponseNodeLoaderImpl(std::shared_ptr configuration, std::vector> repository_metric_sources, - std::shared_ptr flow_configuration, utils::file::AssetManager* asset_manager) + std::shared_ptr flow_configuration, utils::file::AssetManager* asset_manager, core::BulletinStore* bulletin_store) : configuration_(std::move(configuration)), repository_metric_sources_(std::move(repository_metric_sources)), flow_configuration_(std::move(flow_configuration)), - asset_manager_(asset_manager) { + asset_manager_(asset_manager), + bulletin_store_(bulletin_store) { } void ResponseNodeLoaderImpl::clearConfigRoot() { @@ -233,6 +234,10 @@ void ResponseNodeLoaderImpl::initializeFlowInformation(const SharedResponseNode& flow_information->setFlowVersion(flow_configuration_->getFlowVersion()); } + if (bulletin_store_) { + flow_information->setBulletinStore(bulletin_store_); + } + if (root_) { std::vector processors; root_->getAllProcessors(processors); diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 6f6ec14caa..8ea952e0cb 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -45,7 +45,7 @@ class VerifyC2Metrics : public VerifyC2Base { LogTestController::getInstance().setTrace(); LogTestController::getInstance().setDebug(); LogTestController::getInstance().setDebug(); - LogTestController::getInstance().setOff(); + LogTestController::getInstance().setDebug(); VerifyC2Base::testSetup(); } @@ -93,6 +93,7 @@ class MetricsHandler: public HeartbeatHandler { VERIFY_UPDATED_METRICS }; + static constexpr const char* PROCESS_GROUP_UUID = "2438e3c8-015a-1000-79ca-83af40ec1990"; static constexpr const char* GETTCP_UUID = "2438e3c8-015a-1000-79ca-83af40ec1991"; static constexpr const char* LOGATTRIBUTE1_UUID = "2438e3c8-015a-1000-79ca-83af40ec1992"; static constexpr const char* LOGATTRIBUTE2_UUID = "5128e3c8-015a-1000-79ca-83af40ec1990"; @@ -160,6 +161,27 @@ class MetricsHandler: public HeartbeatHandler { runtime_metrics["flowInfo"].HasMember("processorStatuses"); } + static bool verifyProcessorBulletins(const rapidjson::Value& runtime_metrics) { + if (!runtime_metrics["flowInfo"].HasMember("processorBulletins")) { + return false; + } + auto bulletins = runtime_metrics["flowInfo"]["processorBulletins"].GetArray(); + return std::any_of(bulletins.begin(), bulletins.end(), [](const auto& bulletin) { + std::string message = bulletin["message"].GetString(); + return bulletin["id"].GetInt() > 0 && + !std::string{bulletin["timestamp"].GetString()}.empty() && + bulletin["level"].GetString() == std::string("ERROR") && + bulletin["category"].GetString() == std::string("Log Message") && + message.find("Error connecting to") != std::string::npos && + message.find(GETTCP_UUID) != std::string::npos && + bulletin["groupId"].GetString() == std::string(PROCESS_GROUP_UUID) && + bulletin["groupName"].GetString() == std::string("MiNiFi Flow") && + bulletin["groupPath"].GetString() == std::string("MiNiFi Flow") && + bulletin["sourceId"].GetString() == std::string(GETTCP_UUID) && + bulletin["sourceName"].GetString() == std::string("GetTCP"); + }); + } + static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { return verifyCommonRuntimeMetricNodes(runtime_metrics, "2438e3c8-015a-1000-79ca-83af40ec1997") && [&]() { @@ -173,7 +195,8 @@ class MetricsHandler: public HeartbeatHandler { } return processorMetricsAreValid(processor); }); - }(); + }() && + verifyProcessorBulletins(runtime_metrics); } static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) { diff --git a/libminifi/test/libtest/integration/IntegrationBase.cpp b/libminifi/test/libtest/integration/IntegrationBase.cpp index 3652e0ccb2..a7facb26da 100644 --- a/libminifi/test/libtest/integration/IntegrationBase.cpp +++ b/libminifi/test/libtest/integration/IntegrationBase.cpp @@ -91,6 +91,7 @@ void IntegrationBase::run(const std::optional& test_file_ std::string nifi_configuration_class_name = "adaptiveconfiguration"; configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); + bulletin_store_ = std::make_unique(*configuration); std::shared_ptr flow_config = core::createFlowConfiguration( core::ConfigurationContext{ .flow_file_repo = test_repo, @@ -98,7 +99,8 @@ void IntegrationBase::run(const std::optional& test_file_ .configuration = configuration, .path = test_file_location, .filesystem = filesystem, - .sensitive_values_encryptor = sensitive_values_encryptor + .sensitive_values_encryptor = sensitive_values_encryptor, + .bulletin_store = bulletin_store_.get() }, nifi_configuration_class_name); auto controller_service_provider = flow_config->getControllerServiceProvider(); @@ -119,7 +121,7 @@ void IntegrationBase::run(const std::optional& test_file_ std::vector> repo_metric_sources{test_repo, test_flow_repo, content_repo}; asset_manager_ = std::make_unique(*configuration); - auto metrics_publisher_store = std::make_unique(configuration, repo_metric_sources, flow_config, asset_manager_.get()); + auto metrics_publisher_store = std::make_unique(configuration, repo_metric_sources, flow_config, asset_manager_.get(), bulletin_store_.get()); flowController_ = std::make_unique(test_repo, test_flow_repo, configuration, std::move(flow_config), content_repo, std::move(metrics_publisher_store), filesystem, request_restart, asset_manager_.get()); flowController_->load(); diff --git a/libminifi/test/libtest/integration/IntegrationBase.h b/libminifi/test/libtest/integration/IntegrationBase.h index 76c5186063..0028204864 100644 --- a/libminifi/test/libtest/integration/IntegrationBase.h +++ b/libminifi/test/libtest/integration/IntegrationBase.h @@ -30,6 +30,7 @@ #include "properties/Configure.h" #include "utils/file/AssetManager.h" #include "utils/file/FileUtils.h" +#include "core/BulletinStore.h" namespace minifi = org::apache::nifi::minifi; namespace core = minifi::core; @@ -110,6 +111,7 @@ class IntegrationBase { void configureSecurity(); std::shared_ptr configuration; std::unique_ptr asset_manager_; + std::unique_ptr bulletin_store_; std::unique_ptr response_node_loader_; std::unique_ptr flowController_; std::chrono::milliseconds wait_time_; diff --git a/libminifi/test/resources/TestC2Metrics.yml b/libminifi/test/resources/TestC2Metrics.yml index 6a0af5e4c1..fdb8924198 100644 --- a/libminifi/test/resources/TestC2Metrics.yml +++ b/libminifi/test/resources/TestC2Metrics.yml @@ -29,6 +29,7 @@ Processors: penalization period: 30 sec yield period: 10 sec run duration nanos: 0 + bulletin level: ERROR auto-terminated relationships list: Properties: Endpoint List: localhost:8776 @@ -43,6 +44,7 @@ Processors: scheduling period: 30 sec penalization period: 30 sec yield period: 1 sec + bulletin level: ERROR run duration nanos: 0 auto-terminated relationships list: - response diff --git a/libminifi/test/unit/BulletinStoreTests.cpp b/libminifi/test/unit/BulletinStoreTests.cpp new file mode 100644 index 0000000000..bb32ca303a --- /dev/null +++ b/libminifi/test/unit/BulletinStoreTests.cpp @@ -0,0 +1,108 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "core/BulletinStore.h" +#include "properties/Configure.h" +#include "unit/DummyProcessor.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::test { + +class BulletinStoreTestAccessor { + public: + static std::deque& getBulletins(core::BulletinStore& store) { + return store.bulletins_; + } +}; + +std::unique_ptr createDummyProcessor() { + auto processor = std::make_unique("DummyProcessor", minifi::utils::Identifier::parse("4d7fa7e6-2459-46dd-b2ba-61517239edf5").value()); + processor->setProcessGroupUUIDStr("68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2"); + processor->setProcessGroupName("sub_group"); + processor->setProcessGroupPath("root/sub_group"); + return processor; +} + +TEST_CASE("Create BulletinStore with default max size of 1000", "[bulletinStore]") { + ConfigureImpl configuration; + SECTION("No limit is configured") {} + SECTION("Invalid value is configured") { + configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, "invalid"); + } + core::BulletinStore bulletin_store(configuration); + REQUIRE(bulletin_store.getMaxBulletinCount() == 1000); +} + +TEST_CASE("Create BulletinStore with custom max size of 10000", "[bulletinStore]") { + ConfigureImpl configuration; + configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, "10000"); + core::BulletinStore bulletin_store(configuration); + REQUIRE(bulletin_store.getMaxBulletinCount() == 10000); +} + +TEST_CASE("Remove oldest entries when limit is reached", "[bulletinStore]") { + ConfigureImpl configuration; + configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, "2"); + core::BulletinStore bulletin_store(configuration); + auto processor = createDummyProcessor(); + for (size_t i = 0; i < 3; ++i) { + bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, "Warning message"); + } + auto bulletins = bulletin_store.getBulletins(); + REQUIRE(bulletins.size() == 2); + REQUIRE(bulletins[0].id == 2); + REQUIRE(bulletins[1].id == 3); + REQUIRE(bulletins[0].message == "Warning message"); +} + +TEST_CASE("Return all bulletins when no time interval is defined or all entries are part of the time interval", "[bulletinStore]") { + ConfigureImpl configuration; + core::BulletinStore bulletin_store(configuration); + auto processor = createDummyProcessor(); + for (size_t i = 0; i < 3; ++i) { + bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, "Warning message"); + } + auto bulletins = bulletin_store.getBulletins(); + REQUIRE(bulletins.size() == 3); + REQUIRE(bulletins[0].id == 1); + REQUIRE(bulletins[1].id == 2); + REQUIRE(bulletins[2].id == 3); + REQUIRE(bulletins[2].message == "Warning message"); +} + +TEST_CASE("Return only bulletins that are inside the defined time interval", "[bulletinStore]") { + ConfigureImpl configuration; + core::BulletinStore bulletin_store(configuration); + auto processor = createDummyProcessor(); + for (size_t i = 0; i < 3; ++i) { + bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, "Warning message"); + } + BulletinStoreTestAccessor::getBulletins(bulletin_store)[0].timestamp -= 5min; + + auto bulletins = bulletin_store.getBulletins(3min); + REQUIRE(bulletins.size() == 2); + REQUIRE(bulletins[0].id == 2); + REQUIRE(bulletins[1].id == 3); + REQUIRE(bulletins[0].message == "Warning message"); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/minifi-api/include/minifi-cpp/core/Processor.h b/minifi-api/include/minifi-cpp/core/Processor.h index 52b9ab6c90..9a41e3c036 100644 --- a/minifi-api/include/minifi-cpp/core/Processor.h +++ b/minifi-api/include/minifi-cpp/core/Processor.h @@ -30,6 +30,7 @@ #include "minifi-cpp/core/state/nodes/MetricsBase.h" #include "ProcessorMetrics.h" #include "utils/gsl.h" +#include "core/logging/Logger.h" namespace org::apache::nifi::minifi { @@ -85,6 +86,13 @@ class Processor : public virtual Connectable, public virtual ConfigurableCompone virtual gsl::not_null> getMetrics() const = 0; virtual std::string getProcessGroupUUIDStr() const = 0; virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0; + virtual std::string getProcessGroupName() const = 0; + virtual void setProcessGroupName(const std::string &name) = 0; + virtual std::string getProcessGroupPath() const = 0; + virtual void setProcessGroupPath(const std::string &path) = 0; + virtual logging::LOG_LEVEL getLogBulletinLevel() const = 0; + virtual void setLogBulletinLevel(logging::LOG_LEVEL level) = 0; + virtual void setLoggerCallback(const std::function& callback) = 0; virtual void updateReachability(const std::lock_guard& graph_lock, bool force = false) = 0; virtual const std::unordered_map>& reachable_processors() const = 0; diff --git a/minifi-api/include/minifi-cpp/core/logging/Logger.h b/minifi-api/include/minifi-cpp/core/logging/Logger.h index f94a467582..d73681a91f 100644 --- a/minifi-api/include/minifi-cpp/core/logging/Logger.h +++ b/minifi-api/include/minifi-cpp/core/logging/Logger.h @@ -99,6 +99,8 @@ class Logger { virtual ~Logger() = default; + virtual void addLogCallback(const std::function& callback) = 0; + protected: virtual int getMaxLogSize() = 0; @@ -124,7 +126,8 @@ class Logger { if (!should_log(level)) { return; } - log_string(level, stringify(std::move(fmt), map_args(std::forward(args))...)); + auto message = stringify(std::move(fmt), map_args(std::forward(args))...); + log_string(level, message); } }; diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h b/minifi-api/include/minifi-cpp/properties/Configuration.h index 280771ca54..d580e8c20a 100644 --- a/minifi-api/include/minifi-cpp/properties/Configuration.h +++ b/minifi-api/include/minifi-cpp/properties/Configuration.h @@ -128,6 +128,7 @@ class Configuration : public virtual Properties { static constexpr const char *nifi_c2_rest_ssl_context_service = "nifi.c2.rest.ssl.context.service"; static constexpr const char *nifi_c2_rest_heartbeat_minimize_updates = "nifi.c2.rest.heartbeat.minimize.updates"; static constexpr const char *nifi_c2_rest_request_encoding = "nifi.c2.rest.request.encoding"; + static constexpr const char *nifi_c2_flow_info_processor_bulletin_limit = "nifi.c2.flow.info.processor.bulletin.limit"; // state management options static constexpr const char *nifi_state_storage_local = "nifi.state.storage.local"; diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index 2297048f8f..d24e63daec 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -65,10 +65,10 @@ #include "MainHelper.h" #include "agent/JsonSchema.h" #include "core/state/nodes/ResponseNodeLoader.h" -#include "c2/C2Agent.h" #include "core/state/MetricsPublisherStore.h" #include "argparse/argparse.hpp" #include "agent/agent_version.h" +#include "core/BulletinStore.h" namespace minifi = org::apache::nifi::minifi; namespace core = minifi::core; @@ -390,6 +390,8 @@ int main(int argc, char **argv) { should_encrypt_flow_config, utils::crypto::EncryptionProvider::create(minifiHome)); + std::unique_ptr bulletin_store = std::make_unique(*configure); + std::shared_ptr flow_configuration = core::createFlowConfiguration( core::ConfigurationContext{ .flow_file_repo = flow_repo, @@ -397,13 +399,14 @@ int main(int argc, char **argv) { .configuration = configure, .path = configure->get(minifi::Configure::nifi_flow_configuration_file), .filesystem = filesystem, - .sensitive_values_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome) + .sensitive_values_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome), + .bulletin_store = bulletin_store.get() }, nifi_configuration_class_name); auto asset_manager = std::make_unique(*configure); std::vector> repo_metric_sources{prov_repo, flow_repo, content_repo}; - auto metrics_publisher_store = std::make_unique(configure, repo_metric_sources, flow_configuration, asset_manager.get()); + auto metrics_publisher_store = std::make_unique(configure, repo_metric_sources, flow_configuration, asset_manager.get(), bulletin_store.get()); const auto controller = std::make_unique( prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, std::move(metrics_publisher_store), filesystem, request_restart, asset_manager.get()); diff --git a/utils/include/core/Processor.h b/utils/include/core/Processor.h index 4a17fcc9bd..6cbf2947eb 100644 --- a/utils/include/core/Processor.h +++ b/utils/include/core/Processor.h @@ -175,6 +175,22 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C process_group_uuid_ = uuid; } + std::string getProcessGroupName() const override { + return process_group_name_; + } + + void setProcessGroupName(const std::string &name) override { + process_group_name_ = name; + } + + std::string getProcessGroupPath() const override { + return process_group_path_; + } + + void setProcessGroupPath(const std::string &path) override { + process_group_path_ = path; + } + void yield() override; void yield(std::chrono::steady_clock::duration delta_time) override; @@ -228,6 +244,16 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C return metrics_; } + logging::LOG_LEVEL getLogBulletinLevel() const override { + return log_bulletin_level_; + } + + void setLogBulletinLevel(logging::LOG_LEVEL level) override { + log_bulletin_level_ = level; + } + + void setLoggerCallback(const std::function& callback) override; + static constexpr auto DynamicProperties = std::array{}; static constexpr auto OutputAttributes = std::array{}; @@ -249,6 +275,7 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C gsl::not_null> metrics_; std::shared_ptr logger_; + logging::LOG_LEVEL log_bulletin_level_ = logging::LOG_LEVEL::warn; private: mutable std::mutex mutex_; @@ -272,6 +299,8 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C std::unordered_map> reachable_processors_; std::string process_group_uuid_; + std::string process_group_name_; + std::string process_group_path_; }; } // namespace core diff --git a/utils/include/utils/TimeUtil.h b/utils/include/utils/TimeUtil.h index 974e7228b8..252d93fc8b 100644 --- a/utils/include/utils/TimeUtil.h +++ b/utils/include/utils/TimeUtil.h @@ -104,6 +104,10 @@ inline std::string getRFC2616Format(std::chrono::sys_seconds tp) { return date::format("%a, %d %b %Y %H:%M:%S %Z", tp); } +inline std::string getNiFiDateTimeFormat(std::chrono::sys_seconds tp) { + return date::format("%a %b %d %H:%M:%S %Z %Y", tp); +} + inline date::sys_seconds to_sys_time(const std::tm& t) { using date::year; using date::month; diff --git a/utils/src/core/Processor.cpp b/utils/src/core/Processor.cpp index cf2eb6b531..41d665686c 100644 --- a/utils/src/core/Processor.cpp +++ b/utils/src/core/Processor.cpp @@ -37,7 +37,6 @@ #include "range/v3/algorithm/any_of.hpp" #include "fmt/format.h" #include "Exception.h" -#include "core/Processor.h" #include "core/ProcessorMetrics.h" using namespace std::literals::chrono_literals; @@ -370,4 +369,8 @@ std::chrono::steady_clock::duration ProcessorImpl::getYieldTime() const { return std::max(yield_expiration_.load()-std::chrono::steady_clock::now(), std::chrono::steady_clock::duration{0}); } +void ProcessorImpl::setLoggerCallback(const std::function& callback) { + logger_->addLogCallback(callback); +} + } // namespace org::apache::nifi::minifi::core