Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdebreceni committed Jan 6, 2025
1 parent 2656f32 commit 7033b8b
Show file tree
Hide file tree
Showing 50 changed files with 68 additions and 457 deletions.
2 changes: 0 additions & 2 deletions extensions/rocksdb-repos/ProvenanceRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ class ProvenanceRepository : public core::repository::RocksDbRepository {

void destroy();

// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProvenanceRepository(const ProvenanceRepository &parent) = delete;

ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
Expand Down
3 changes: 1 addition & 2 deletions libminifi/include/ResourceClaim.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ class ResourceClaimImpl : public ResourceClaim {
private:
// Logger
std::shared_ptr<core::logging::Logger> logger_;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer

ResourceClaimImpl(const ResourceClaimImpl &parent);
ResourceClaimImpl &operator=(const ResourceClaimImpl &parent);

Expand Down
3 changes: 1 addition & 2 deletions libminifi/include/core/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ class ProcessGroup : public CoreComponentImpl {
mutable std::recursive_mutex mutex_;
// Logger
std::shared_ptr<logging::Logger> logger_;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer

ProcessGroup(const ProcessGroup &parent);
ProcessGroup &operator=(const ProcessGroup &parent);
static std::shared_ptr<utils::IdGenerator> id_generator_;
Expand Down
54 changes: 12 additions & 42 deletions libminifi/include/core/ProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,20 @@ std::string to_string(const ReadBufferResult& read_buffer_result);

namespace org::apache::nifi::minifi::core {

// ProcessSession Class
class ProcessSessionImpl : public ReferenceContainerImpl, public virtual ProcessSession {
public:
// Constructor
/*!
* Create a new process session
*/
explicit ProcessSessionImpl(std::shared_ptr<ProcessContext> processContext);

// Destructor
~ProcessSessionImpl() override;

// Commit the session
void commit() override;
// Roll Back the session
void rollback() override;

nonstd::expected<void, std::exception_ptr> rollbackNoThrow() noexcept override;
// Get Provenance Report

std::shared_ptr<provenance::ProvenanceReporter> getProvenanceReporter() override {
return provenance_report_;
}
// writes the created contents to the underlying repository

void flushContent() override;

std::shared_ptr<core::FlowFile> get() override;
Expand All @@ -80,65 +71,52 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process
void add(const std::shared_ptr<core::FlowFile> &record) override;
std::shared_ptr<core::FlowFile> clone(const core::FlowFile& parent) override;
std::shared_ptr<core::FlowFile> clone(const core::FlowFile& parent, int64_t offset, int64_t size) override;
// Transfer the FlowFile to the relationship

void transfer(const std::shared_ptr<core::FlowFile>& flow, const Relationship& relationship) override;
void transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow, const std::string& relationship_name) override;

void putAttribute(core::FlowFile& flow, std::string_view key, const std::string& value) override;
void removeAttribute(core::FlowFile& flow, std::string_view key) override;

void remove(const std::shared_ptr<core::FlowFile> &flow) override;
// Access the contents of the flow file as an input stream; returns null if the flow file has no content claim

std::shared_ptr<io::InputStream> getFlowFileContentStream(const core::FlowFile& flow_file) override;
// Execute the given read callback against the content

int64_t read(const std::shared_ptr<core::FlowFile>& flow_file, const io::InputStreamCallback& callback) override;

int64_t read(const core::FlowFile& flow_file, const io::InputStreamCallback& callback) override;
// Read content into buffer

detail::ReadBufferResult readBuffer(const std::shared_ptr<core::FlowFile>& flow) override;
// Execute the given write callback against the content

void write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) override;

void write(core::FlowFile& flow, const io::OutputStreamCallback& callback) override;
// Read and write the flow file at the same time (eg. for processing it line by line)
int64_t readWrite(const std::shared_ptr<core::FlowFile> &flow, const io::InputOutputStreamCallback& callback) override;
// Replace content with buffer

void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, std::span<const char> buffer) override;
void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, std::span<const std::byte> buffer) override;
// Execute the given write/append callback against the content

void append(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) override;
// Append buffer to content

void appendBuffer(const std::shared_ptr<core::FlowFile>& flow, std::span<const char> buffer) override;
void appendBuffer(const std::shared_ptr<core::FlowFile>& flow, std::span<const std::byte> buffer) override;
// Penalize the flow

void penalize(const std::shared_ptr<core::FlowFile> &flow) override;

bool outgoingConnectionsFull(const std::string& relationship) override;

/**
* Imports a file from the data stream
* @param stream incoming data stream that contains the data to store into a file
* @param flow flow file
*/
void importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow) override;
void importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow) override;

// import from the data source.
void import(const std::string& source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0) override;

/**
* Exports the data stream to a file
* @param string file to export stream to
* @param flow flow file
* @param bool whether or not to keep the content in the flow file
*/
bool exportContent(const std::string &destination, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) override;

bool exportContent(const std::string &destination, const std::string &tmpFileName, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) override;

// Stash the content to a key
void stash(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) override;
// Restore content previously stashed to a key

void restore(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) override;

bool existsFlowFileInRelationship(const Relationship &relationship) override;
Expand All @@ -149,8 +127,6 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process

bool hasBeenTransferred(const core::FlowFile &flow) const override;

// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProcessSessionImpl(const ProcessSessionImpl &parent) = delete;
ProcessSessionImpl &operator=(const ProcessSessionImpl &parent) = delete;

Expand Down Expand Up @@ -202,17 +178,11 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process
void ensureNonNullResourceClaim(
const std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap);

// Clone the flow file during transfer to multiple connections for a relationship
std::shared_ptr<core::FlowFile> cloneDuringTransfer(const core::FlowFile& parent);
// ProcessContext
std::shared_ptr<ProcessContext> process_context_;
// Logger
std::shared_ptr<logging::Logger> logger_;
// Provenance Report
std::shared_ptr<provenance::ProvenanceReporter> provenance_report_;

std::shared_ptr<ContentSession> content_session_;

StateManager* stateManager_;

static std::shared_ptr<utils::IdGenerator> id_generator_;
Expand Down
21 changes: 2 additions & 19 deletions libminifi/include/core/ProcessSessionFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,23 @@
#include "ProcessSession.h"
#include "minifi-cpp/core/ProcessSessionFactory.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
namespace org::apache::nifi::minifi::core {

// ProcessSessionFactory Class
class ProcessSessionFactoryImpl : public virtual ProcessSessionFactory {
public:
// Constructor
/*!
* Create a new process session factory
*/
explicit ProcessSessionFactoryImpl(std::shared_ptr<ProcessContext> processContext)
: process_context_(processContext) {
}

// Create the session
std::shared_ptr<ProcessSession> createSession() override;

// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProcessSessionFactoryImpl(const ProcessSessionFactoryImpl &parent) = delete;
ProcessSessionFactoryImpl &operator=(const ProcessSessionFactoryImpl &parent) = delete;

~ProcessSessionFactoryImpl() override = default;

private:
// ProcessContext
std::shared_ptr<ProcessContext> process_context_;
};

} // namespace core
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
} // namespace org::apache::nifi::minifi::core
12 changes: 2 additions & 10 deletions libminifi/include/core/WeakReference.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@

#include "minifi-cpp/core/WeakReference.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
namespace org::apache::nifi::minifi::core {

/**
* Reference container is a vector of weak references that enables
Expand Down Expand Up @@ -69,8 +65,4 @@ class ReferenceContainerImpl : public virtual ReferenceContainer {
std::vector<std::shared_ptr<WeakReference> > references;
};

} // namespace core
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
} // namespace org::apache::nifi::minifi::core
8 changes: 0 additions & 8 deletions libminifi/include/core/controller/ControllerServiceNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ namespace org::apache::nifi::minifi::core::controller {

class ControllerServiceNodeImpl : public CoreComponentImpl, public ConfigurableComponentImpl, public virtual ControllerServiceNode {
public:
/**
* Constructor for the controller service node.
* @param service controller service reference
* @param id identifier for this node.
* @param configuration shared pointer configuration.
*/
explicit ControllerServiceNodeImpl(std::shared_ptr<ControllerService> service, std::string id, std::shared_ptr<Configure> configuration)
: CoreComponentImpl(std::move(id)),
active(false),
Expand Down Expand Up @@ -105,9 +99,7 @@ class ControllerServiceNodeImpl : public CoreComponentImpl, public ConfigurableC

std::atomic<bool> active;
std::shared_ptr<Configure> configuration_;
// controller service.
std::shared_ptr<ControllerService> controller_service_;
// linked controller services.
std::vector<ControllerServiceNode*> linked_controller_services_;
};

Expand Down
15 changes: 2 additions & 13 deletions libminifi/include/core/state/FlowIdentifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
#include "minifi-cpp/core/state/FlowIdentifier.h"


namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace state {
namespace org::apache::nifi::minifi::state {

/**
* Purpose: Represents a flow identifier for a given flow update or instance.
Expand All @@ -36,9 +32,6 @@ class FlowIdentifierImpl : public virtual FlowIdentifier {
public:
FlowIdentifierImpl() = delete;

/**
* Constructor accepts the url, bucket id, and flow id.
*/
explicit FlowIdentifierImpl(const std::string &url, const std::string &bucket_id, const std::string &flow_id) {
registry_url_ = url;
bucket_id_ = bucket_id;
Expand Down Expand Up @@ -74,8 +67,4 @@ class FlowIdentifierImpl : public virtual FlowIdentifier {
};


} // namespace state
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
} // namespace org::apache::nifi::minifi::state
6 changes: 0 additions & 6 deletions libminifi/include/core/state/nodes/MetricsBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@

namespace org::apache::nifi::minifi::state::response {

/**
* Purpose: Defines a metric that
*/
class DeviceInformation : public ResponseNodeImpl {
public:
DeviceInformation(std::string_view name, const utils::Identifier& uuid)
Expand All @@ -47,9 +44,6 @@ class DeviceInformation : public ResponseNodeImpl {
}
};

/**
* Purpose: Defines a metric that
*/
class ObjectNode : public ResponseNodeImpl {
public:
explicit ObjectNode(const std::string_view name, const utils::Identifier& uuid = {})
Expand Down
4 changes: 0 additions & 4 deletions libminifi/include/properties/Properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ class PropertiesImpl : public virtual Properties {
return name_;
}

// Clear the load config
void clear() override {
std::lock_guard<std::mutex> lock(mutex_);
properties_.clear();
}
void set(const std::string& key, const std::string& value) override {
set(key, value, PropertyChangeLifetime::PERSISTENT);
}
// Set the config value
void set(const std::string &key, const std::string &value, PropertyChangeLifetime lifetime) override {
auto active_value = utils::string::replaceEnvironmentVariables(value);
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -78,7 +76,6 @@ class PropertiesImpl : public virtual Properties {
dirty_ = true;
}
}
// Check whether the config value existed
bool has(const std::string& key) const override {
std::lock_guard<std::mutex> lock(mutex_);
return properties_.count(key) > 0;
Expand Down Expand Up @@ -148,7 +145,6 @@ class PropertiesImpl : public virtual Properties {

// Mutex for protection
mutable std::mutex mutex_;
// Logger
std::shared_ptr<core::logging::Logger> logger_;
// Home location for this executable
std::filesystem::path minifi_home_;
Expand Down
4 changes: 0 additions & 4 deletions libminifi/include/provenance/Provenance.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,6 @@ class ProvenanceEventRecordImpl : public core::SerializableComponentImpl, public
std::string _alternateIdentifierUri;

private:
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProvenanceEventRecordImpl(const ProvenanceEventRecordImpl &parent);
ProvenanceEventRecordImpl &operator=(const ProvenanceEventRecordImpl &parent);
static std::shared_ptr<core::logging::Logger> logger_;
Expand Down Expand Up @@ -337,8 +335,6 @@ class ProvenanceReporterImpl : public virtual ProvenanceReporter {
std::set<std::shared_ptr<ProvenanceEventRecord>> _events;
std::shared_ptr<core::Repository> repo_;

// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProvenanceReporterImpl(const ProvenanceReporterImpl &parent);
ProvenanceReporterImpl &operator=(const ProvenanceReporterImpl &parent);
};
Expand Down
3 changes: 1 addition & 2 deletions libminifi/include/sitetosite/HTTPProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ class HttpSiteToSiteClient : public sitetosite::SiteToSiteClient {
private:
sitetosite::RespondCode current_code;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<HttpSiteToSiteClient>::getLogger();
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer

HttpSiteToSiteClient(const HttpSiteToSiteClient &parent);
HttpSiteToSiteClient &operator=(const HttpSiteToSiteClient &parent);
static std::shared_ptr<utils::IdGenerator> id_generator_;
Expand Down
2 changes: 0 additions & 2 deletions libminifi/include/sitetosite/RawSocketProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ class RawSiteToSiteClient : public sitetosite::SiteToSiteClient {
// commsIdentifier
utils::Identifier _commsIdentifier;

// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
RawSiteToSiteClient(const RawSiteToSiteClient &parent);
RawSiteToSiteClient &operator=(const RawSiteToSiteClient &parent);
static std::shared_ptr<utils::IdGenerator> id_generator_;
Expand Down
4 changes: 2 additions & 2 deletions libminifi/src/core/ProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,8 @@ void ProcessSessionImpl::commit() {

if (metrics_) {
for (const auto& [relationship_name, transfer_metrics] : transfers) {
metrics_->transferred_bytes() += transfer_metrics.transfer_size;
metrics_->transferred_flow_files() += transfer_metrics.transfer_count;
metrics_->transferredBytes() += transfer_metrics.transfer_size;
metrics_->transferredFlowFiles() += transfer_metrics.transfer_count;
metrics_->increaseRelationshipTransferCount(relationship_name, transfer_metrics.transfer_count);
}
}
Expand Down
1 change: 1 addition & 0 deletions libminifi/test/flow-tests/FlowControllerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "100 ms");

auto sinkProc = dynamic_cast<minifi::processors::TestProcessor*>(root->findProcessorByName("TestProcessor"));
gsl_Assert(sinkProc);
// prevent execution of the consumer processor
sinkProc->yield(10s);

Expand Down
Loading

0 comments on commit 7033b8b

Please sign in to comment.