Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2502 Add processorBulletins C2 metric node to FlowInformation #1920

Open
wants to merge 2 commits into
base: MINIFICPP-2512
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions C2.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ be requested via C2 DESCRIBE manifest command.
# minimize REST heartbeat updates
#nifi.c2.rest.heartbeat.minimize.updates=true

# specify the maximum number of bulletins to send in a heartbeat
# nifi.c2.flow.info.processor.bulletin.limit=1000

#### Flow Id and URL

Flow id and URL are usually retrieved from the C2 server. These identify the last updated flow version and where the flow was downloaded from. These properties are persisted in the minifi.properties file.
Expand Down Expand Up @@ -194,6 +197,20 @@ configuration produces the following JSON:
"uuid": "2438e3c8-015a-1000-79ca-83af40ec1997"
}
},
"processorBulletins": [
{
"id": 1,
"timestamp": "Mon Jan 27 12:10:47 UTC 2025",
"level": "ERROR",
"category": "Log Message",
"message": "Error connecting to localhost:8776 due to Connection refused (2438e3c8-015a-1000-79ca-83af40ec1991)",
"groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
"groupName": "MiNiFi Flow",
"groupPath": "MiNiFi Flow",
"sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991",
"sourceName": "GetTCP"
}
],
"processorStatuses": [
{
"id": "5128e3c8-015a-1000-79ca-83af40ec1990",
Expand Down Expand Up @@ -536,6 +553,20 @@ Contains information about the flow the agent is running, including the versione
"uuid": "8368e3c8-015a-1003-52ca-83af40ec1332"
}
},
"processorBulletins": [
{
"id": 1,
"timestamp": "Mon Jan 27 12:10:47 UTC 2025",
"level": "ERROR",
"category": "Log Message",
"message": "Error connecting to localhost:8776 due to Connection refused (2438e3c8-015a-1000-79ca-83af40ec1991)",
"groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
"groupName": "MiNiFi Flow",
"groupPath": "MiNiFi Flow",
"sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991",
"sourceName": "GetTCP"
}
],
"processorStatuses": [
{
"id": "5128e3c8-015a-1000-79ca-83af40ec1990",
Expand Down
3 changes: 3 additions & 0 deletions conf/minifi.properties
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ nifi.c2.full.heartbeat=false
# specify encoding strategy for c2 requests (gzip, none)
#nifi.c2.rest.request.encoding=none

# specify the maximum number of bulletins to send in a heartbeat
#nifi.c2.flow.info.processor.bulletin.limit=1000

## enable the controller socket provider on port 9998
## off by default.
#controller.socket.enable=true
Expand Down
9 changes: 6 additions & 3 deletions extensions/python/ExecutePythonProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,7 @@ void ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty() {

std::unique_ptr<PythonScriptEngine> ExecutePythonProcessor::createScriptEngine() {
auto engine = std::make_unique<PythonScriptEngine>();

python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
engine->initialize(Success, Failure, Original, python_logger_);

return engine;
}

Expand Down Expand Up @@ -225,6 +222,12 @@ std::vector<core::Relationship> ExecutePythonProcessor::getPythonRelationships()
return relationships;
}

void ExecutePythonProcessor::setLoggerCallback(const std::function<void(core::logging::LOG_LEVEL level, const std::string& message)>& callback) {
gsl_Expects(logger_ && python_logger_);
logger_->setLogCallback(callback);
python_logger_->setLogCallback(callback);
}

REGISTER_RESOURCE(ExecutePythonProcessor, Processor);

} // namespace org::apache::nifi::minifi::extensions::python::processors
3 changes: 2 additions & 1 deletion extensions/python/ExecutePythonProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
python_dynamic_(false),
reload_on_script_change_(true) {
logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getLogger(uuid_);
python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
}

EXTENSIONAPI static constexpr const char* Description = "Executes a script given the flow file and a process session. "
Expand Down Expand Up @@ -138,8 +139,8 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
}

std::map<std::string, core::Property> getProperties() const override;

std::vector<core::Relationship> getPythonRelationships();
void setLoggerCallback(const std::function<void(core::logging::LOG_LEVEL level, const std::string& message)>& callback) override;

protected:
const core::Property* findProperty(const std::string& name) const override;
Expand Down
2 changes: 2 additions & 0 deletions extensions/standard-processors/tests/unit/FlowJsonTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") {
"schedulingPeriod": "3 sec",
"penaltyDuration": "12 sec",
"yieldDuration": "4 sec",
"bulletinLevel": "ERROR",
"runDurationMillis": 12,
"autoTerminatedRelationships": ["one", "two"],
"properties": {
Expand Down Expand Up @@ -143,6 +144,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") {
REQUIRE(3s == proc->getSchedulingPeriod());
REQUIRE(12s == proc->getPenalizationPeriod());
REQUIRE(4s == proc->getYieldPeriod());
REQUIRE(proc->getLogBulletinLevel() == logging::LOG_LEVEL::err);
REQUIRE(proc->isAutoTerminated({"one", ""}));
REQUIRE(proc->isAutoTerminated({"two", ""}));
REQUIRE_FALSE(proc->isAutoTerminated({"three", ""}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Security Properties:
scheduling period: 1 sec
penalization period: 30 sec
yield period: 1 sec
bulletin level: ERROR
run duration nanos: 0
auto-terminated relationships list:
Properties:
Expand Down Expand Up @@ -157,6 +158,7 @@ Provenance Reporting:
REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriod());
REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriod());
REQUIRE(rootFlowConfig->findProcessorByName("TailFile")->getLogBulletinLevel() == logging::LOG_LEVEL::err);
REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());

std::map<std::string, minifi::Connection*> connectionMap;
Expand Down
70 changes: 70 additions & 0 deletions libminifi/include/core/BulletinStore.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>
#include <deque>
#include <mutex>
#include <memory>
#include <optional>
#include <chrono>

#include "properties/Configure.h"
#include "core/logging/LoggerFactory.h"
#include "core/Processor.h"

namespace org::apache::nifi::minifi {
namespace test {
class BulletinStoreTestAccessor;
} // namespace test

namespace core {

struct Bulletin {
uint64_t id = 0;
std::chrono::time_point<std::chrono::system_clock> timestamp;
std::string level;
std::string category;
std::string message;
std::string group_id;
std::string group_name;
std::string group_path;
std::string source_id;
std::string source_name;
};

class BulletinStore {
public:
explicit BulletinStore(const Configure& configure);
void addProcessorBulletin(const core::Processor& processor, core::logging::LOG_LEVEL log_level, const std::string& message);
std::deque<Bulletin> getBulletins(std::optional<std::chrono::system_clock::duration> time_interval_to_include = {}) const;
size_t getMaxBulletinCount() const;

private:
friend class minifi::test::BulletinStoreTestAccessor;

static constexpr size_t DEFAULT_BULLETIN_COUNT = 1000;
size_t max_bulletin_count_;
mutable std::mutex mutex_;
uint64_t id_counter = 1;
std::deque<Bulletin> bulletins_;
std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<BulletinStore>::getLogger()};
};

} // namespace core
} // namespace org::apache::nifi::minifi
3 changes: 3 additions & 0 deletions libminifi/include/core/FlowConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "utils/file/FileSystem.h"
#include "utils/ChecksumCalculator.h"
#include "ParameterContext.h"
#include "core/BulletinStore.h"

namespace org::apache::nifi::minifi::core {

Expand All @@ -60,6 +61,7 @@ struct ConfigurationContext {
std::optional<std::filesystem::path> path{std::nullopt};
std::shared_ptr<utils::file::FileSystem> filesystem{std::make_shared<utils::file::FileSystem>()};
std::optional<utils::crypto::EncryptionProvider> sensitive_values_encryptor{std::nullopt};
core::BulletinStore* bulletin_store{nullptr};
};

enum class FlowSerializationType { Json, NifiJson, Yaml };
Expand Down Expand Up @@ -149,6 +151,7 @@ class FlowConfiguration : public CoreComponentImpl {
std::shared_ptr<utils::file::FileSystem> filesystem_;
utils::crypto::EncryptionProvider sensitive_values_encryptor_;
utils::ChecksumCalculator checksum_calculator_;
core::BulletinStore* bulletin_store_ = nullptr;

private:
virtual std::string serialize(const ProcessGroup&) { return ""; }
Expand Down
37 changes: 2 additions & 35 deletions libminifi/include/core/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,13 @@ class ProcessGroup : public CoreComponentImpl {
ProcessGroup(ProcessGroupType type, std::string_view name);
ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid);
ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid, int version);
// Destructor
~ProcessGroup() override;
// Set URL
void setURL(std::string url) {
url_ = std::move(url);
}
// Get URL
std::string getURL() {
return (url_);
}
// SetTransmitting
void setTransmitting(bool val) {
transmitting_ = val;
}
Expand All @@ -93,7 +89,6 @@ class ProcessGroup : public CoreComponentImpl {
uint64_t getTimeout() {
return timeout_;
}
// setInterface
void setInterface(std::string &ifc) {
local_network_interface_ = ifc;
}
Expand Down Expand Up @@ -124,11 +119,9 @@ class ProcessGroup : public CoreComponentImpl {
http::HTTPProxy getHTTPProxy() {
return proxy_;
}
// Set Processor yield period in MilliSecond
void setYieldPeriodMsec(std::chrono::milliseconds period) {
yield_period_msec_ = period;
}
// Get Processor yield period in MilliSecond
std::chrono::milliseconds getYieldPeriodMsec() {
return (yield_period_msec_);
}
Expand All @@ -147,13 +140,11 @@ class ProcessGroup : public CoreComponentImpl {
const std::function<bool(const Processor*)>& filter = nullptr);

bool isRemoteProcessGroup();
// set parent process group
void setParent(ProcessGroup *parent) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
parent_process_group_ = parent;
}
// get parent process group
ProcessGroup *getParent() {
ProcessGroup *getParent() const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
}
Expand Down Expand Up @@ -185,25 +176,17 @@ class ProcessGroup : public CoreComponentImpl {
}
return nullptr;
}
// findProcessor based on UUID
Processor* findProcessorById(const utils::Identifier& uuid, Traverse traverse = Traverse::IncludeChildren) const;
// findProcessor based on name
Processor* findProcessorByName(const std::string &processorName, Traverse traverse = Traverse::IncludeChildren) const;

void getAllProcessors(std::vector<Processor*>& processor_vec) const;

/**
* Add controller service
* @param nodeId node identifier
* @param node controller service node.
*/
void addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node);

core::controller::ControllerServiceNode* findControllerService(const std::string &nodeId, Traverse traverse = Traverse::ExcludeChildren) const;

std::vector<const core::controller::ControllerServiceNode*> getAllControllerServices() const;

// update property value
void updatePropertyValue(const std::string& processorName, const std::string& propertyName, const std::string& propertyValue);

void getConnections(std::map<std::string, Connection*>& connectionMap);
Expand All @@ -228,45 +211,29 @@ class ProcessGroup : public CoreComponentImpl {
protected:
void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler);

// version
int config_version_;
// Process Group Type
const ProcessGroupType type_;
// Processors (ProcessNode) inside this process group which include Remote Process Group input/Output port
std::set<std::unique_ptr<Processor>> processors_;
std::set<Processor*> failed_processors_;
std::set<Port*> ports_;
std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
// Connections between the processor inside the group;
std::set<std::unique_ptr<Connection>> connections_;
// Parent Process Group
ProcessGroup* parent_process_group_;
// Yield Period in Milliseconds
std::atomic<std::chrono::milliseconds> yield_period_msec_;
std::atomic<uint64_t> timeout_;

// URL
std::string url_;
// local network interface
std::string local_network_interface_;
// Transmitting
std::atomic<bool> transmitting_;
// http proxy
http::HTTPProxy proxy_;
std::string transport_protocol_;

// controller services

core::controller::ControllerServiceNodeMap controller_service_map_;

ParameterContext* parameter_context_ = nullptr;

private:
static Port* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid);
std::string buildGroupPath() const;

// Mutex for protection
mutable std::recursive_mutex mutex_;
// Logger
std::shared_ptr<logging::Logger> logger_;

ProcessGroup(const ProcessGroup &parent);
Expand Down
19 changes: 4 additions & 15 deletions libminifi/include/core/ProcessorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
#define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
#pragma once

#include <string>
#include <vector>

#include "core/Core.h"
#include "core/Property.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {

namespace org::apache::nifi::minifi::core {

constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"};
constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"};
Expand All @@ -47,16 +41,11 @@ struct ProcessorConfig {
std::string schedulingPeriod;
std::string penalizationPeriod;
std::string yieldPeriod;
std::string bulletinLevel;
std::string runDurationNanos;
std::vector<std::string> autoTerminatedRelationships;
std::vector<core::Property> properties;
std::string parameterContextName;
};

} // namespace core
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org

#endif // LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
} // namespace org::apache::nifi::minifi::core
Loading
Loading