Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2519 Property refactor #1923

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
19 changes: 0 additions & 19 deletions docker/test/integration/features/kafka.feature
Original file line number Diff line number Diff line change
Expand Up @@ -403,25 +403,6 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
| 3 | 5 sec | 15 seconds | 6 | 12 |
| 6 | 5 sec | 15 seconds | 12 | 24 |

Scenario Outline: Unsupported encoding attributes for ConsumeKafka throw scheduling errors
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont allow setting unsupported properties anymore

Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
And the "<property name>" property of the ConsumeKafka processor is set to "<property value>"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
And the "success" relationship of the ConsumeKafka processor is connected to the PutFile

And a kafka broker is set up in correspondence with the third-party kafka publisher

When all instances start up
And a message with content "<message 1>" is published to the "ConsumeKafkaTest" topic
And a message with content "<message 2>" is published to the "ConsumeKafkaTest" topic

Then no files are placed in the monitored directory in 20 seconds of running time

Examples: Unsupported property values
| message 1 | message 2 | property name | property value |
| Miyamoto Musashi | Eiji Yoshikawa | Key Attribute Encoding | UTF-32 |
| Shogun | James Clavell | Message Header Encoding | UTF-32 |

Scenario: ConsumeKafka receives data via SSL
Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
And these processor properties are set:
Expand Down
8 changes: 4 additions & 4 deletions encrypt-config/FlowConfigEncryptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ std::vector<SensitiveItem> listSensitiveItems(const minifi::core::ProcessGroup &
process_group.getAllProcessors(processors);
for (const auto* processor : processors) {
gsl_Expects(processor);
for (const auto& [_, property] : processor->getProperties()) {
for (const auto& [_, property] : processor->getSupportedProperties()) {
if (property.isSensitive()) {
sensitive_items.push_back(SensitiveItem{
.component_type = ComponentType::Processor,
.component_id = processor->getUUID(),
.component_name = processor->getName(),
.item_name = property.getName(),
.item_display_name = property.getDisplayName(),
.item_value = property.getValue().to_string()});
.item_value = std::string{property.getValue().value_or("")}});
}
}
}
Expand All @@ -104,15 +104,15 @@ std::vector<SensitiveItem> listSensitiveItems(const minifi::core::ProcessGroup &
continue;
}
processed_controller_services.insert(controller_service->getUUID());
for (const auto& [_, property] : controller_service->getProperties()) {
for (const auto& [_, property] : controller_service->getSupportedProperties()) {
if (property.isSensitive()) {
sensitive_items.push_back(SensitiveItem{
.component_type = ComponentType::ControllerService,
.component_id = controller_service->getUUID(),
.component_name = controller_service->getName(),
.item_name = property.getName(),
.item_display_name = property.getDisplayName(),
.item_value = property.getValue().to_string()});
.item_value = std::string{property.getValue().value_or("")}});
}
}
}
Expand Down
71 changes: 42 additions & 29 deletions extension-utils/include/utils/ProcessorConfigUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,74 @@
#include <vector>

#include "core/ProcessContext.h"
#include "core/PropertyType.h"
#include "utils/Enum.h"

namespace org::apache::nifi::minifi::utils {

template<typename PropertyType = std::string>
PropertyType getRequiredPropertyOrThrow(const core::ProcessContext& context, std::string_view property_name) {
PropertyType value;
if (!context.getProperty(property_name, value)) {
throw std::runtime_error(std::string(property_name) + " property missing or invalid");
}
return value;
}
std::optional<std::string> parseOptionalProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
std::string parseProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::optional<bool> parseOptionalBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
bool parseBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::optional<uint64_t> parseOptionalU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
uint64_t parseU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext& context, std::string_view property_name);
std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext& context, std::string_view property_name);
bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, std::string_view property_name);
std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, std::string_view property_name);
std::optional<int64_t> parseOptionalI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
int64_t parseI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::optional<std::chrono::milliseconds> parseOptionalMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
std::chrono::milliseconds parseMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

std::optional<uint64_t> parseOptionalDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);
uint64_t parseDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr);

template<typename T>
T parseEnumProperty(const core::ProcessContext& context, const core::PropertyReference& prop) {
std::string value;
if (!context.getProperty(prop.name, value)) {
T parseEnumProperty(const core::ProcessContext& context, const core::PropertyReference& prop, const core::FlowFile* flow_file = nullptr) {
const auto enum_str = context.getProperty(prop.name, flow_file);
if (!enum_str) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' is missing");
}
auto result = magic_enum::enum_cast<T>(value);
auto result = magic_enum::enum_cast<T>(*enum_str);
if (!result) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' has invalid value: '" + value + "'");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' has invalid value: '" + *enum_str + "'");
}
return result.value();
}

template<typename T>
std::optional<T> parseOptionalEnumProperty(const core::ProcessContext& context, const core::PropertyReference& prop) {
std::string value;
if (!context.getProperty(prop.name, value)) {
const auto enum_str = context.getProperty(prop.name);

if (!enum_str) {
return std::nullopt;
}
auto result = magic_enum::enum_cast<T>(value);
auto result = magic_enum::enum_cast<T>(*enum_str);
if (!result) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' has invalid value: '" + value + "'");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Property '" + std::string(prop.name) + "' has invalid value: '" + *enum_str + "'");
}
return result.value();
}

template<typename T>
std::optional<T> parseOptionalEnumProperty(const core::ConfigurableComponent& component, const core::PropertyReference& property) {
std::string str_value;
if (!component.getProperty(property, str_value)) {
template<typename ControllerServiceType>
std::optional<std::shared_ptr<ControllerServiceType>> parseOptionalControllerService(const core::ProcessContext& context, const core::PropertyReference& prop, const utils::Identifier& processor_uuid) {
const auto controller_service_name = context.getProperty(prop.name);
if (!controller_service_name) {
return std::nullopt;
}
auto enum_value = magic_enum::enum_cast<T>(str_value);
if (!enum_value) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Property '{}' has invalid value {}", property.name, str_value));

const std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(*controller_service_name, processor_uuid);
if (!service) {
return std::nullopt;
}
return *enum_value;

auto typed_controller_service = std::dynamic_pointer_cast<ControllerServiceType>(service);
if (!typed_controller_service) {
return std::nullopt;
}

return typed_controller_service;
}

} // namespace org::apache::nifi::minifi::utils
14 changes: 0 additions & 14 deletions extension-utils/src/core/ProcessContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,7 @@
*/

#include "minifi-cpp/core/ProcessContext.h"
#include "core/TypedValues.h"

namespace org::apache::nifi::minifi::core {

bool ProcessContext::getProperty(std::string_view name, detail::NotAFlowFile auto &value) const {
if constexpr (std::is_base_of_v<TransformableValue, std::decay_t<decltype(value)>>) {
std::string prop_str;
if (!getProperty(std::string{name}, prop_str)) {
return false;
}
value = std::decay_t<decltype(value)>(std::move(prop_str));
return true;
} else {
return getProperty(name, value);
}
}

} // namespace org::apache::nifi::minifi::core
7 changes: 4 additions & 3 deletions extension-utils/src/utils/ListingStateManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ uint64_t ListingStateManager::getLatestListedKeyTimestampInMilliseconds(const st
stored_listed_key_timestamp_str = it->second;
}

int64_t stored_listed_key_timestamp = 0;
core::Property::StringToInt(stored_listed_key_timestamp_str, stored_listed_key_timestamp);
if (auto stored_listed_key_timestamp = parsing::parseIntegral<int64_t>(stored_listed_key_timestamp_str)) {
return *stored_listed_key_timestamp;
}

return stored_listed_key_timestamp;
throw std::runtime_error("Invalid listed key timestamp");
}

std::unordered_set<std::string> ListingStateManager::getLatestListedKeys(const std::unordered_map<std::string, std::string> &state) {
Expand Down
94 changes: 75 additions & 19 deletions extension-utils/src/utils/ProcessorConfigUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,93 @@
*/
#include "utils/ProcessorConfigUtils.h"

#include <vector>
#include <string>
#include <string_view>

#include "range/v3/algorithm/contains.hpp"
#include "utils/StringUtils.h"
#include "utils/expected.h"

namespace org::apache::nifi::minifi::utils {

std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext& context, std::string_view property_name) {
std::string property_string;
context.getProperty(property_name, property_string);
return utils::string::splitAndTrim(property_string, ",");
std::string parseProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::expect(fmt::format("Expected valid value from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext& context, std::string_view property_name) {
return utils::string::splitAndTrim(getRequiredPropertyOrThrow(context, property_name), ",");
std::optional<std::string> parseOptionalProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
return *property_str;
}
return std::nullopt;
}

std::optional<bool> parseOptionalBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
return parsing::parseBool(*property_str) | utils::expect(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessor().getName(), property.name));
}
return std::nullopt;
}

bool parseBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseBool) | utils::expect(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::optional<uint64_t> parseOptionalU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
if (property_str->empty()) {
return std::nullopt;
}
return parsing::parseIntegral<uint64_t>(*property_str) | utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessor().getName(), property.name));
}

return std::nullopt;
}

uint64_t parseU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<uint64_t>) | utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::optional<int64_t> parseOptionalI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
if (property_str->empty()) {
return std::nullopt;
}
return parsing::parseIntegral<int64_t>(*property_str) | utils::expect(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessor().getName(), property.name));
}

return std::nullopt;
}

bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, std::string_view property_name) {
const std::string value_str = getRequiredPropertyOrThrow(context, property_name);
const auto maybe_value = utils::string::toBool(value_str);
if (!maybe_value) {
throw std::runtime_error(std::string(property_name) + " property is invalid: value is " + value_str);
int64_t parseI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<int64_t>) | utils::expect(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::optional<std::chrono::milliseconds> parseOptionalMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
if (property_str->empty()) {
return std::nullopt;
}
return parsing::parseDuration(*property_str) | utils::expect(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessor().getName(), property.name));
}
return maybe_value.value();

return std::nullopt;
}

std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, std::string_view property_name) {
const auto time_property = getRequiredPropertyOrThrow<core::TimePeriodValue>(context, property_name);
return time_property.getMilliseconds();
std::chrono::milliseconds parseMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | utils::expect(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessor().getName(), property.name));
}

std::optional<uint64_t> parseOptionalDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
if (const auto property_str = ctx.getProperty(property, flow_file)) {
if (property_str->empty()) {
return std::nullopt;
}
return parsing::parseDataSize(*property_str) | utils::expect(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessor().getName(), property.name));
}

return std::nullopt;
}

uint64_t parseDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDataSize) | utils::expect(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessor().getName(), property.name));
}


} // namespace org::apache::nifi::minifi::utils
2 changes: 1 addition & 1 deletion extension-utils/src/utils/net/Ssl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace org::apache::nifi::minifi::utils::net {
std::optional<utils::net::SslData> getSslData(const core::ProcessContext& context, const core::PropertyReference& ssl_prop, const std::shared_ptr<core::logging::Logger>& logger) {
auto getSslContextService = [&]() -> std::shared_ptr<minifi::controllers::SSLContextService> {
if (auto ssl_service_name = context.getProperty(ssl_prop); ssl_service_name && !ssl_service_name->empty()) {
if (auto service = context.getControllerService(*ssl_service_name, context.getProcessorNode()->getUUID())) {
if (auto service = context.getControllerService(*ssl_service_name, context.getProcessor().getUUID())) {
if (auto ssl_service = std::dynamic_pointer_cast<org::apache::nifi::minifi::controllers::SSLContextService>(service)) {
return ssl_service;
} else {
Expand Down
19 changes: 9 additions & 10 deletions extensions/aws/controllerservices/AWSCredentialsService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "AWSCredentialsService.h"

#include "core/Resource.h"
#include "utils/expected.h"

namespace org::apache::nifi::minifi::aws::controllers {

Expand All @@ -27,19 +28,17 @@ void AWSCredentialsService::initialize() {
}

void AWSCredentialsService::onEnable() {
std::string value;
if (getProperty(AccessKey, value)) {
aws_credentials_provider_.setAccessKey(value);
if (const auto access_key = getProperty(AccessKey.name)) {
aws_credentials_provider_.setAccessKey(*access_key);
}
if (getProperty(SecretKey, value)) {
aws_credentials_provider_.setSecretKey(value);
if (const auto secret_key = getProperty(SecretKey.name)) {
aws_credentials_provider_.setSecretKey(*secret_key);
}
if (getProperty(CredentialsFile, value)) {
aws_credentials_provider_.setCredentialsFile(value);
if (const auto credentials_file = getProperty(CredentialsFile.name)) {
aws_credentials_provider_.setCredentialsFile(*credentials_file);
}
bool use_default_credentials = false;
if (getProperty(UseDefaultCredentials, use_default_credentials)) {
aws_credentials_provider_.setUseDefaultCredentials(use_default_credentials);
if (const auto use_credentials = getProperty(UseDefaultCredentials.name) | minifi::utils::andThen(parsing::parseBool)) {
aws_credentials_provider_.setUseDefaultCredentials(*use_credentials);
}
}

Expand Down
2 changes: 1 addition & 1 deletion extensions/aws/controllerservices/AWSCredentialsService.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AWSCredentialsService : public core::controller::ControllerServiceImpl {

EXTENSIONAPI static constexpr auto UseDefaultCredentials = core::PropertyDefinitionBuilder<>::createProperty("Use Default Credentials")
.withDescription("If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc.")
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withValidator(core::StandardPropertyTypes::BOOLEAN_VALIDATOR)
.withDefaultValue("false")
.isRequired(true)
.build();
Expand Down
Loading
Loading