Skip to content

Commit

Permalink
MINIFICPP-2502 Add processorBulletins C2 metric node to FlowInformation
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jan 28, 2025
1 parent 4d5fc3d commit 90baa4e
Show file tree
Hide file tree
Showing 37 changed files with 522 additions and 72 deletions.
37 changes: 34 additions & 3 deletions C2.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ be requested via C2 DESCRIBE manifest command.
# minimize REST heartbeat updates
#nifi.c2.rest.heartbeat.minimize.updates=true

# specify the maximum number of bulletins to send in a heartbeat
# nifi.c2.flow.info.processor.bulletin.limit=1000

#### Flow Id and URL

Flow id and URL are usually retrieved from the C2 server. These identify the last updated flow version and where the flow was downloaded from. These properties are persisted in the minifi.properties file.
Expand Down Expand Up @@ -194,6 +197,20 @@ configuration produces the following JSON:
"uuid": "2438e3c8-015a-1000-79ca-83af40ec1997"
}
},
"processorBulletins": [
{
"id": 1,
"timestamp": "Mon Jan 27 12:10:47 UTC 2025",
"level": "ERROR",
"category": "Log Message",
"message": "Error connecting to localhost:8776 due to Connection refused (2438e3c8-015a-1000-79ca-83af40ec1991)",
"groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
"groupName": "MiNiFi Flow",
"groupPath": "MiNiFi Flow",
"sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991",
"sourceName": "GetTCP"
}
],
"processorStatuses": [
{
"id": "5128e3c8-015a-1000-79ca-83af40ec1990",
Expand Down Expand Up @@ -536,6 +553,20 @@ Contains information about the flow the agent is running, including the versione
"uuid": "8368e3c8-015a-1003-52ca-83af40ec1332"
}
},
"processorBulletins": [
{
"id": 1,
"timestamp": "Mon Jan 27 12:10:47 UTC 2025",
"level": "ERROR",
"category": "Log Message",
"message": "Error connecting to localhost:8776 due to Connection refused (2438e3c8-015a-1000-79ca-83af40ec1991)",
"groupId": "2438e3c8-015a-1000-79ca-83af40ec1990",
"groupName": "MiNiFi Flow",
"groupPath": "MiNiFi Flow",
"sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991",
"sourceName": "GetTCP"
}
],
"processorStatuses": [
{
"id": "5128e3c8-015a-1000-79ca-83af40ec1990",
Expand All @@ -550,7 +581,7 @@ Contains information about the flow the agent is running, including the versione
"processingNanos": 0,
"activeThreadCount": -1,
"terminatedThreadCount": -1,
"running": true
"runStatus": "Running"
},
{
"id": "4fe2d51d-076a-49b0-88de-5cf5adf52b8f",
Expand All @@ -565,11 +596,11 @@ Contains information about the flow the agent is running, including the versione
"processingNanos": 2119148,
"activeThreadCount": -1,
"terminatedThreadCount": -1,
"running": true
"runStatus": "Running"
}
],
"flowId": "96273342-b9fe-11ef-a0ad-10f60a596f64",
"running": true
"runStatus": "Running"
}
```

Expand Down
3 changes: 3 additions & 0 deletions conf/minifi.properties
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ nifi.c2.full.heartbeat=false
# specify encoding strategy for c2 requests (gzip, none)
#nifi.c2.rest.request.encoding=none

# specify the maximum number of bulletins to send in a heartbeat
#nifi.c2.flow.info.processor.bulletin.limit=1000

## enable the controller socket provider on port 9998
## off by default.
#controller.socket.enable=true
Expand Down
9 changes: 6 additions & 3 deletions extensions/python/ExecutePythonProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,7 @@ void ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty() {

std::unique_ptr<PythonScriptEngine> ExecutePythonProcessor::createScriptEngine() {
auto engine = std::make_unique<PythonScriptEngine>();

python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
engine->initialize(Success, Failure, Original, python_logger_);

return engine;
}

Expand Down Expand Up @@ -225,6 +222,12 @@ std::vector<core::Relationship> ExecutePythonProcessor::getPythonRelationships()
return relationships;
}

void ExecutePythonProcessor::setLoggerCallback(const std::function<void(core::logging::LOG_LEVEL level, const std::string& message)>& callback) {
gsl_Expects(logger_ && python_logger_);
logger_->addLogCallback(callback);
python_logger_->addLogCallback(callback);
}

REGISTER_RESOURCE(ExecutePythonProcessor, Processor);

} // namespace org::apache::nifi::minifi::extensions::python::processors
3 changes: 2 additions & 1 deletion extensions/python/ExecutePythonProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
python_dynamic_(false),
reload_on_script_change_(true) {
logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getLogger(uuid_);
python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
}

EXTENSIONAPI static constexpr const char* Description = "Executes a script given the flow file and a process session. "
Expand Down Expand Up @@ -138,8 +139,8 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
}

std::map<std::string, core::Property> getProperties() const override;

std::vector<core::Relationship> getPythonRelationships();
void setLoggerCallback(const std::function<void(core::logging::LOG_LEVEL level, const std::string& message)>& callback) override;

protected:
const core::Property* findProperty(const std::string& name) const override;
Expand Down
2 changes: 2 additions & 0 deletions extensions/standard-processors/tests/unit/FlowJsonTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") {
"schedulingPeriod": "3 sec",
"penaltyDuration": "12 sec",
"yieldDuration": "4 sec",
"bulletinLevel": "ERROR",
"runDurationMillis": 12,
"autoTerminatedRelationships": ["one", "two"],
"properties": {
Expand Down Expand Up @@ -143,6 +144,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") {
REQUIRE(3s == proc->getSchedulingPeriod());
REQUIRE(12s == proc->getPenalizationPeriod());
REQUIRE(4s == proc->getYieldPeriod());
REQUIRE(proc->getLogBulletinLevel() == logging::LOG_LEVEL::err);
REQUIRE(proc->isAutoTerminated({"one", ""}));
REQUIRE(proc->isAutoTerminated({"two", ""}));
REQUIRE_FALSE(proc->isAutoTerminated({"three", ""}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Security Properties:
scheduling period: 1 sec
penalization period: 30 sec
yield period: 1 sec
bulletin level: ERROR
run duration nanos: 0
auto-terminated relationships list:
Properties:
Expand Down Expand Up @@ -157,6 +158,7 @@ Provenance Reporting:
REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriod());
REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriod());
REQUIRE(rootFlowConfig->findProcessorByName("TailFile")->getLogBulletinLevel() == logging::LOG_LEVEL::err);
REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());

std::map<std::string, minifi::Connection*> connectionMap;
Expand Down
70 changes: 70 additions & 0 deletions libminifi/include/core/BulletinStore.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>
#include <deque>
#include <mutex>
#include <memory>
#include <optional>
#include <chrono>

#include "properties/Configure.h"
#include "core/logging/LoggerFactory.h"
#include "core/Processor.h"

namespace org::apache::nifi::minifi {
namespace test {
class BulletinStoreTestAccessor;
} // namespace test

namespace core {

struct Bulletin {
uint64_t id = 0;
std::chrono::time_point<std::chrono::system_clock> timestamp;
std::string level;
std::string category;
std::string message;
std::string group_id;
std::string group_name;
std::string group_path;
std::string source_id;
std::string source_name;
};

class BulletinStore {
public:
explicit BulletinStore(const Configure& configure);
void addProcessorBulletin(const core::Processor& processor, core::logging::LOG_LEVEL log_level, const std::string& message);
std::deque<Bulletin> getBulletins(std::optional<std::chrono::system_clock::duration> time_interval_to_include = {}) const;
size_t getMaxBulletinCount() const;

private:
friend class minifi::test::BulletinStoreTestAccessor;

static constexpr size_t DEFAULT_BULLETIN_COUNT = 1000;
size_t max_bulletin_count_;
mutable std::mutex mutex_;
uint64_t id_counter = 1;
std::deque<Bulletin> bulletins_;
std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<BulletinStore>::getLogger()};
};

} // namespace core
} // namespace org::apache::nifi::minifi
3 changes: 3 additions & 0 deletions libminifi/include/core/FlowConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "utils/file/FileSystem.h"
#include "utils/ChecksumCalculator.h"
#include "ParameterContext.h"
#include "core/BulletinStore.h"

namespace org::apache::nifi::minifi::core {

Expand All @@ -60,6 +61,7 @@ struct ConfigurationContext {
std::optional<std::filesystem::path> path{std::nullopt};
std::shared_ptr<utils::file::FileSystem> filesystem{std::make_shared<utils::file::FileSystem>()};
std::optional<utils::crypto::EncryptionProvider> sensitive_values_encryptor{std::nullopt};
core::BulletinStore* bulletin_store{nullptr};
};

enum class FlowSerializationType { Json, NifiJson, Yaml };
Expand Down Expand Up @@ -149,6 +151,7 @@ class FlowConfiguration : public CoreComponentImpl {
std::shared_ptr<utils::file::FileSystem> filesystem_;
utils::crypto::EncryptionProvider sensitive_values_encryptor_;
utils::ChecksumCalculator checksum_calculator_;
core::BulletinStore* bulletin_store_ = nullptr;

private:
virtual std::string serialize(const ProcessGroup&) { return ""; }
Expand Down
37 changes: 2 additions & 35 deletions libminifi/include/core/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,13 @@ class ProcessGroup : public CoreComponentImpl {
ProcessGroup(ProcessGroupType type, std::string_view name);
ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid);
ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid, int version);
// Destructor
~ProcessGroup() override;
// Set URL
void setURL(std::string url) {
url_ = std::move(url);
}
// Get URL
std::string getURL() {
return (url_);
}
// SetTransmitting
void setTransmitting(bool val) {
transmitting_ = val;
}
Expand All @@ -93,7 +89,6 @@ class ProcessGroup : public CoreComponentImpl {
uint64_t getTimeout() {
return timeout_;
}
// setInterface
void setInterface(std::string &ifc) {
local_network_interface_ = ifc;
}
Expand Down Expand Up @@ -124,11 +119,9 @@ class ProcessGroup : public CoreComponentImpl {
http::HTTPProxy getHTTPProxy() {
return proxy_;
}
// Set Processor yield period in MilliSecond
void setYieldPeriodMsec(std::chrono::milliseconds period) {
yield_period_msec_ = period;
}
// Get Processor yield period in MilliSecond
std::chrono::milliseconds getYieldPeriodMsec() {
return (yield_period_msec_);
}
Expand All @@ -147,13 +140,11 @@ class ProcessGroup : public CoreComponentImpl {
const std::function<bool(const Processor*)>& filter = nullptr);

bool isRemoteProcessGroup();
// set parent process group
void setParent(ProcessGroup *parent) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
parent_process_group_ = parent;
}
// get parent process group
ProcessGroup *getParent() {
ProcessGroup *getParent() const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
}
Expand Down Expand Up @@ -185,25 +176,17 @@ class ProcessGroup : public CoreComponentImpl {
}
return nullptr;
}
// findProcessor based on UUID
Processor* findProcessorById(const utils::Identifier& uuid, Traverse traverse = Traverse::IncludeChildren) const;
// findProcessor based on name
Processor* findProcessorByName(const std::string &processorName, Traverse traverse = Traverse::IncludeChildren) const;

void getAllProcessors(std::vector<Processor*>& processor_vec) const;

/**
* Add controller service
* @param nodeId node identifier
* @param node controller service node.
*/
void addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node);

core::controller::ControllerServiceNode* findControllerService(const std::string &nodeId, Traverse traverse = Traverse::ExcludeChildren) const;

std::vector<const core::controller::ControllerServiceNode*> getAllControllerServices() const;

// update property value
void updatePropertyValue(const std::string& processorName, const std::string& propertyName, const std::string& propertyValue);

void getConnections(std::map<std::string, Connection*>& connectionMap);
Expand All @@ -228,45 +211,29 @@ class ProcessGroup : public CoreComponentImpl {
protected:
void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler);

// version
int config_version_;
// Process Group Type
const ProcessGroupType type_;
// Processors (ProcessNode) inside this process group which include Remote Process Group input/Output port
std::set<std::unique_ptr<Processor>> processors_;
std::set<Processor*> failed_processors_;
std::set<Port*> ports_;
std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
// Connections between the processor inside the group;
std::set<std::unique_ptr<Connection>> connections_;
// Parent Process Group
ProcessGroup* parent_process_group_;
// Yield Period in Milliseconds
std::atomic<std::chrono::milliseconds> yield_period_msec_;
std::atomic<uint64_t> timeout_;

// URL
std::string url_;
// local network interface
std::string local_network_interface_;
// Transmitting
std::atomic<bool> transmitting_;
// http proxy
http::HTTPProxy proxy_;
std::string transport_protocol_;

// controller services

core::controller::ControllerServiceNodeMap controller_service_map_;

ParameterContext* parameter_context_ = nullptr;

private:
static Port* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid);
std::string buildGroupPath() const;

// Mutex for protection
mutable std::recursive_mutex mutex_;
// Logger
std::shared_ptr<logging::Logger> logger_;

ProcessGroup(const ProcessGroup &parent);
Expand Down
Loading

0 comments on commit 90baa4e

Please sign in to comment.