Skip to content

Commit

Permalink
Property refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdebreceni authored and martinzink committed Jan 30, 2025
1 parent 001d12c commit 0b644ce
Show file tree
Hide file tree
Showing 389 changed files with 4,545 additions and 6,703 deletions.
19 changes: 0 additions & 19 deletions docker/test/integration/features/kafka.feature
Original file line number Diff line number Diff line change
Expand Up @@ -403,25 +403,6 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
| 3 | 5 sec | 15 seconds | 6 | 12 |
| 6 | 5 sec | 15 seconds | 12 | 24 |

Scenario Outline: Unsupported encoding attributes for ConsumeKafka throw scheduling errors
Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
And the "<property name>" property of the ConsumeKafka processor is set to "<property value>"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
And the "success" relationship of the ConsumeKafka processor is connected to the PutFile

And a kafka broker is set up in correspondence with the third-party kafka publisher

When all instances start up
And a message with content "<message 1>" is published to the "ConsumeKafkaTest" topic
And a message with content "<message 2>" is published to the "ConsumeKafkaTest" topic

Then no files are placed in the monitored directory in 20 seconds of running time

Examples: Unsupported property values
| message 1 | message 2 | property name | property value |
| Miyamoto Musashi | Eiji Yoshikawa | Key Attribute Encoding | UTF-32 |
| Shogun | James Clavell | Message Header Encoding | UTF-32 |

Scenario: ConsumeKafka receives data via SSL
Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
And these processor properties are set:
Expand Down
8 changes: 4 additions & 4 deletions encrypt-config/FlowConfigEncryptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ std::vector<SensitiveItem> listSensitiveItems(const minifi::core::ProcessGroup &
process_group.getAllProcessors(processors);
for (const auto* processor : processors) {
gsl_Expects(processor);
for (const auto& [_, property] : processor->getProperties()) {
for (const auto& [_, property] : processor->getSupportedProperties()) {
if (property.isSensitive()) {
sensitive_items.push_back(SensitiveItem{
.component_type = ComponentType::Processor,
.component_id = processor->getUUID(),
.component_name = processor->getName(),
.item_name = property.getName(),
.item_display_name = property.getDisplayName(),
.item_value = property.getValue().to_string()});
.item_value = std::string{property.getValue().value_or("")}});
}
}
}
Expand All @@ -104,15 +104,15 @@ std::vector<SensitiveItem> listSensitiveItems(const minifi::core::ProcessGroup &
continue;
}
processed_controller_services.insert(controller_service->getUUID());
for (const auto& [_, property] : controller_service->getProperties()) {
for (const auto& [_, property] : controller_service->getSupportedProperties()) {
if (property.isSensitive()) {
sensitive_items.push_back(SensitiveItem{
.component_type = ComponentType::ControllerService,
.component_id = controller_service->getUUID(),
.component_name = controller_service->getName(),
.item_name = property.getName(),
.item_display_name = property.getDisplayName(),
.item_value = property.getValue().to_string()});
.item_value = std::string{property.getValue().value_or("")}});
}
}
}
Expand Down
71 changes: 42 additions & 29 deletions extension-utils/include/utils/ProcessorConfigUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,74 @@
#include <vector>

#include "core/ProcessContext.h"
#include "core/PropertyType.h"
#include "utils/Enum.h"

namespace org::apache::nifi::minifi::utils {

template<typename PropertyType = std::string>
PropertyType getRequiredPropertyOrThrow(const core::ProcessContext& context, std::string_view property_name) {
PropertyType value;
if (!context.getProperty(property_name, value)) {
throw std::runtime_error(std::string(property_name) + " property missing or invalid");
}
return value;
}
std::optional<std::string> parseOptionalProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
std::string parseProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::optional<bool> parseOptionalBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
bool parseBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::optional<uint64_t> parseOptionalU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
uint64_t parseU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext& context, std::string_view property_name);
std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext& context, std::string_view property_name);
bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, std::string_view property_name);
std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, std::string_view property_name);
std::optional<int64_t> parseOptionalI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
int64_t parseI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::optional<std::chrono::milliseconds> parseOptionalMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
std::chrono::milliseconds parseMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::optional<uint64_t> parseOptionalDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
uint64_t parseDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

template<typename T>
T parseEnumProperty(const core::ProcessContext& context, const core::PropertyReference& prop) {
std::string value;
if (!context.getProperty(prop.name, value)) {
T parseEnumProperty(const core::ProcessContext& context, const core::PropertyReference& prop, const core::FlowFile* flow_file = nullptr) {
const auto enum_str = context.getProperty(prop.name, flow_file);
if (!enum_str) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' is missing");
}
auto result = magic_enum::enum_cast<T>(value);
auto result = magic_enum::enum_cast<T>(*enum_str);
if (!result) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' has invalid value: '" + value + "'");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' has invalid value: '" + *enum_str + "'");
}
return result.value();
}

template<typename T>
std::optional<T> parseOptionalEnumProperty(const core::ProcessContext& context, const core::PropertyReference& prop) {
std::string value;
if (!context.getProperty(prop.name, value)) {
const auto enum_str = context.getProperty(prop.name);

if (!enum_str) {
return std::nullopt;
}
auto result = magic_enum::enum_cast<T>(value);
auto result = magic_enum::enum_cast<T>(*enum_str);
if (!result) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' has invalid value: '" + value + "'");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' has invalid value: '" + *enum_str + "'");
}
return result.value();
}

template<typename T>
std::optional<T> parseOptionalEnumProperty(const core::ConfigurableComponent& component, const core::PropertyReference& property) {
std::string str_value;
if (!component.getProperty(property, str_value)) {
template<typename ControllerServiceType>
std::optional<std::shared_ptr<ControllerServiceType>> parseOptionalControllerService(const core::ProcessContext& context, const core::PropertyReference& prop, const utils::Identifier& processor_uuid) {
const auto controller_service_name = context.getProperty(prop.name);
if (!controller_service_name) {
return std::nullopt;
}
auto enum_value = magic_enum::enum_cast<T>(str_value);
if (!enum_value) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Property '{}' has invalid value {}", property.name, str_value));

const std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(*controller_service_name, processor_uuid);
if (!service) {
return std::nullopt;
}
return *enum_value;

auto typed_controller_service = std::dynamic_pointer_cast<ControllerServiceType>(service);
if (!typed_controller_service) {
return std::nullopt;
}

return typed_controller_service;
}

} // namespace org::apache::nifi::minifi::utils
14 changes: 0 additions & 14 deletions extension-utils/src/core/ProcessContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,7 @@
*/

#include "minifi-cpp/core/ProcessContext.h"
#include "core/TypedValues.h"

namespace org::apache::nifi::minifi::core {

bool ProcessContext::getProperty(std::string_view name, detail::NotAFlowFile auto &value) const {
if constexpr (std::is_base_of_v<TransformableValue, std::decay_t<decltype(value)>>) {
std::string prop_str;
if (!getProperty(std::string{name}, prop_str)) {
return false;
}
value = std::decay_t<decltype(value)>(std::move(prop_str));
return true;
} else {
return getProperty(name, value);
}
}

} // namespace org::apache::nifi::minifi::core
7 changes: 4 additions & 3 deletions extension-utils/src/utils/ListingStateManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ uint64_t ListingStateManager::getLatestListedKeyTimestampInMilliseconds(const st
stored_listed_key_timestamp_str = it->second;
}

int64_t stored_listed_key_timestamp = 0;
core::Property::StringToInt(stored_listed_key_timestamp_str, stored_listed_key_timestamp);
if (auto stored_listed_key_timestamp = parsing::parseIntegral<int64_t>(stored_listed_key_timestamp_str)) {
return *stored_listed_key_timestamp;
}

return stored_listed_key_timestamp;
throw std::runtime_error("Invalid listed key timestamp");
}

std::unordered_set<std::string> ListingStateManager::getLatestListedKeys(const std::unordered_map<std::string, std::string> &state) {
Expand Down
94 changes: 75 additions & 19 deletions extension-utils/src/utils/ProcessorConfigUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,93 @@
*/
#include "utils/ProcessorConfigUtils.h"

#include <vector>
#include <string>
#include <string_view>

#include "range/v3/algorithm/contains.hpp"
#include "utils/StringUtils.h"
#include "utils/expected.h"

namespace org::apache::nifi::minifi::utils {

std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext& context, std::string_view property_name) {
std::string property_string;
context.getProperty(property_name, property_string);
return utils::string::splitAndTrim(property_string, ",");
std::string parseProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::expect(fmt::format("Expected valid value from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext& context, std::string_view property_name) {
return utils::string::splitAndTrim(getRequiredPropertyOrThrow(context, property_name), ",");
std::optional<std::string> parseOptionalProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
return *property_str;
}
return std::nullopt;
}

std::optional<bool> parseOptionalBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
return parsing::parseBool(*property_str) | utils::expect(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessor().getName(), property.name));
}
return std::nullopt;
}

bool parseBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseBool) | utils::expect(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::optional<uint64_t> parseOptionalU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
if (property_str->empty()) {
return std::nullopt;
}
return parsing::parseIntegral<uint64_t>(*property_str) | utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessor().getName(), property.name));
}

return std::nullopt;
}

uint64_t parseU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<uint64_t>) | utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::optional<int64_t> parseOptionalI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
if (property_str->empty()) {
return std::nullopt;
}
return parsing::parseIntegral<int64_t>(*property_str) | utils::expect(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessor().getName(), property.name));
}

return std::nullopt;
}

bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, std::string_view property_name) {
const std::string value_str = getRequiredPropertyOrThrow(context, property_name);
const auto maybe_value = utils::string::toBool(value_str);
if (!maybe_value) {
throw std::runtime_error(std::string(property_name) + " property is invalid: value is " + value_str);
int64_t parseI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<int64_t>) | utils::expect(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::optional<std::chrono::milliseconds> parseOptionalMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
if (property_str->empty()) {
return std::nullopt;
}
return parsing::parseDuration(*property_str) | utils::expect(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessor().getName(), property.name));
}
return maybe_value.value();

return std::nullopt;
}

std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, std::string_view property_name) {
const auto time_property = getRequiredPropertyOrThrow<core::TimePeriodValue>(context, property_name);
return time_property.getMilliseconds();
std::chrono::milliseconds parseMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | utils::expect(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::optional<uint64_t> parseOptionalDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
if (property_str->empty()) {
return std::nullopt;
}
return parsing::parseDataSize(*property_str) | utils::expect(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessor().getName(), property.name));
}

return std::nullopt;
}

uint64_t parseDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDataSize) | utils::expect(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessor().getName(), property.name));
}


} // namespace org::apache::nifi::minifi::utils
2 changes: 1 addition & 1 deletion extension-utils/src/utils/net/Ssl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace org::apache::nifi::minifi::utils::net {
std::optional<utils::net::SslData> getSslData(const core::ProcessContext& context, const core::PropertyReference& ssl_prop, const std::shared_ptr<core::logging::Logger>& logger) {
auto getSslContextService = [&]() -> std::shared_ptr<minifi::controllers::SSLContextService> {
if (auto ssl_service_name = context.getProperty(ssl_prop); ssl_service_name && !ssl_service_name->empty()) {
if (auto service = context.getControllerService(*ssl_service_name, context.getProcessorNode()->getUUID())) {
if (auto service = context.getControllerService(*ssl_service_name, context.getProcessor().getUUID())) {
if (auto ssl_service = std::dynamic_pointer_cast<org::apache::nifi::minifi::controllers::SSLContextService>(service)) {
return ssl_service;
} else {
Expand Down
19 changes: 9 additions & 10 deletions extensions/aws/controllerservices/AWSCredentialsService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "AWSCredentialsService.h"

#include "core/Resource.h"
#include "utils/expected.h"

namespace org::apache::nifi::minifi::aws::controllers {

Expand All @@ -27,19 +28,17 @@ void AWSCredentialsService::initialize() {
}

void AWSCredentialsService::onEnable() {
std::string value;
if (getProperty(AccessKey, value)) {
aws_credentials_provider_.setAccessKey(value);
if (const auto access_key = getProperty(AccessKey.name)) {
aws_credentials_provider_.setAccessKey(*access_key);
}
if (getProperty(SecretKey, value)) {
aws_credentials_provider_.setSecretKey(value);
if (const auto secret_key = getProperty(SecretKey.name)) {
aws_credentials_provider_.setSecretKey(*secret_key);
}
if (getProperty(CredentialsFile, value)) {
aws_credentials_provider_.setCredentialsFile(value);
if (const auto credentials_file = getProperty(CredentialsFile.name)) {
aws_credentials_provider_.setCredentialsFile(*credentials_file);
}
bool use_default_credentials = false;
if (getProperty(UseDefaultCredentials, use_default_credentials)) {
aws_credentials_provider_.setUseDefaultCredentials(use_default_credentials);
if (const auto use_credentials = getProperty(UseDefaultCredentials.name) | minifi::utils::andThen(parsing::parseBool)) {
aws_credentials_provider_.setUseDefaultCredentials(*use_credentials);
}
}

Expand Down
2 changes: 1 addition & 1 deletion extensions/aws/controllerservices/AWSCredentialsService.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AWSCredentialsService : public core::controller::ControllerServiceImpl {

EXTENSIONAPI static constexpr auto UseDefaultCredentials = core::PropertyDefinitionBuilder<>::createProperty("Use Default Credentials")
.withDescription("If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc.")
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withValidator(core::StandardPropertyTypes::BOOLEAN_VALIDATOR)
.withDefaultValue("false")
.isRequired(true)
.build();
Expand Down
Loading

0 comments on commit 0b644ce

Please sign in to comment.