Skip to content

Commit

Permalink
POC Kafka separate consume/commit
Browse files Browse the repository at this point in the history
  • Loading branch information
martinzink committed Feb 21, 2025
1 parent 785e230 commit bc5f33e
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 10 deletions.
6 changes: 6 additions & 0 deletions extension-utils/include/utils/ProcessorConfigUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "core/ProcessContext.h"
#include "core/PropertyType.h"
#include "utils/Enum.h"
#include "utils/expected.h"
#include "utils/OptionalUtils.h"

namespace org::apache::nifi::minifi::utils {

Expand Down Expand Up @@ -93,4 +95,8 @@ std::optional<std::shared_ptr<ControllerServiceType>> parseOptionalControllerSer
return typed_controller_service;
}

template<typename ControllerServiceType>
std::shared_ptr<ControllerServiceType> parseControllerService(const core::ProcessContext& context, const core::PropertyReference& prop, const utils::Identifier& processor_uuid) {
return parseOptionalControllerService<ControllerServiceType>(context, prop, processor_uuid) | utils::expect("Required Controller Service");
}
} // namespace org::apache::nifi::minifi::utils
22 changes: 19 additions & 3 deletions extensions/kafka/poc/CommitKafkaPoC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,34 @@
#include "core/ProcessSession.h"
#include "core/PropertyType.h"
#include "core/Resource.h"
#include "utils/ProcessorConfigUtils.h"
#include "utils/gsl.h"

namespace org::apache::nifi::minifi::kafka {

void CommitKafkaPoC::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
kafka_client_controller_service_ = std::dynamic_pointer_cast<KafkaClientControllerService>(context.getControllerService());
kafka_client_controller_service_ = minifi::utils::parseControllerService<KafkaClientControllerService>(context, KafkaClient, getUUID());
}

void CommitKafkaPoC::onTrigger(core::ProcessContext&, core::ProcessSession&) {
void CommitKafkaPoC::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
gsl_Expects(kafka_client_controller_service_);
std::vector<std::shared_ptr<core::FlowFile>> flow_files;
while (auto ff = session.get()) {
flow_files.push_back(std::move(ff));
}
if (flow_files.empty()) {
context.yield();
return;
}
gsl_Assert(kafka_client_controller_service_->commit(*flow_files.back()));
for (auto ff : flow_files) {
session.transfer(ff, Success);
}
}

void CommitKafkaPoC::initialize() {

setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}

REGISTER_RESOURCE(CommitKafkaPoC, Processor);
Expand Down
3 changes: 1 addition & 2 deletions extensions/kafka/poc/CommitKafkaPoC.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class CommitKafkaPoC : public core::ProcessorImpl {
.build();

EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "success"};
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "failure"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success};

EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({KafkaClient});

Expand Down
18 changes: 15 additions & 3 deletions extensions/kafka/poc/ConsumeKafkaPoC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,29 @@
#include "core/ProcessSession.h"
#include "core/PropertyType.h"
#include "core/Resource.h"
#include "utils/ProcessorConfigUtils.h"

namespace org::apache::nifi::minifi::kafka {

void ConsumeKafkaPoC::onSchedule(core::ProcessContext&, core::ProcessSessionFactory&) {
void ConsumeKafkaPoC::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
kafka_client_controller_service_ = minifi::utils::parseControllerService<KafkaClientControllerService>(context, KafkaClient, getUUID());
}

void ConsumeKafkaPoC::onTrigger(core::ProcessContext&, core::ProcessSession&) {
void ConsumeKafkaPoC::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
gsl_Expects(kafka_client_controller_service_);
constexpr uint32_t batch_size = 100;
for (uint32_t i = 0; i < batch_size; i++) {
if (auto msg_ff = kafka_client_controller_service_->poll(session)) {
session.transfer(msg_ff, Success);
} else {
break;
}
};
}

void ConsumeKafkaPoC::initialize() {

setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}

REGISTER_RESOURCE(ConsumeKafkaPoC, Processor);
Expand Down
46 changes: 46 additions & 0 deletions extensions/kafka/poc/KafkaClientControllerService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,56 @@

#include "KafkaClientControllerService.h"
#include "core/Resource.h"
#include "utils/gsl.h"
#include "utils/ParsingUtils.h"

using namespace std::literals::chrono_literals;

namespace org::apache::nifi::minifi::kafka {

void KafkaClientControllerService::onEnable() {
conf_ = {rd_kafka_conf_new(), utils::rd_kafka_conf_deleter()};
constexpr std::string_view KAFKA_BROKER = "localhost:9092"; // TODO(mzink)
topic_ = getProperty(TopicName.name) | utils::expect("Required property");
consumer_group_ = getProperty(ConsumerGroup.name) | utils::expect("Required property");

// TODO(mzink) error handling
gsl_Assert(rd_kafka_conf_set(conf_.get(), "bootstrap.servers", KAFKA_BROKER.data(), nullptr, 0) == RD_KAFKA_CONF_OK);
gsl_Assert(rd_kafka_conf_set(conf_.get(), "group.id", consumer_group_.data(), nullptr, 0) == RD_KAFKA_CONF_OK);
gsl_Assert(rd_kafka_conf_set(conf_.get(), "enable.auto.commit", "false", nullptr, 0) == RD_KAFKA_CONF_OK);
gsl_Assert(rd_kafka_conf_set(conf_.get(), "session.timeout.ms", "45000", nullptr, 0) == RD_KAFKA_CONF_OK);
gsl_Assert(rd_kafka_conf_set(conf_.get(), "max.poll.interval.ms", "300000", nullptr, 0) == RD_KAFKA_CONF_OK);

consumer_ = {rd_kafka_new(RD_KAFKA_CONSUMER, conf_.get(), nullptr, 0), utils::rd_kafka_consumer_deleter()};

gsl_Assert(consumer_);

partition_list_ = utils::rd_kafka_topic_partition_list_unique_ptr{rd_kafka_topic_partition_list_new(1)};

gsl_Assert(partition_list_);

rd_kafka_topic_partition_list_t* topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, topic_.c_str(), RD_KAFKA_PARTITION_UA);
gsl_Assert(rd_kafka_subscribe(consumer_.get(), topics) == RD_KAFKA_RESP_ERR_NO_ERROR);
}

std::shared_ptr<core::FlowFile> KafkaClientControllerService::poll(core::ProcessSession& session) {
constexpr std::chrono::milliseconds poll_time_out = 100ms;
utils::rd_kafka_message_unique_ptr message{rd_kafka_consumer_poll(consumer_.get(), poll_time_out.count())};
if (message) {
auto ff = session.create();
ff->setAttribute("kafka.offset", std::to_string(message->offset));
return ff;
}
return nullptr;
}

bool KafkaClientControllerService::commit(core::FlowFile& ff) {
const auto offset = parsing::parseIntegral<uint64_t>(ff.getAttribute("kafka.offset") | utils::expect("Missing kafka.offset attribute")) | utils::expect("Invalid offset");
auto partition = (partition_list_->elems[0]).partition;
rd_kafka_topic_partition_list_add(partition_list_.get(), topic_.c_str(), partition)->offset = offset + 1;
rd_kafka_commit(consumer_.get(), partition_list_.get(), 0); // Synchronous commit
return true;
}

REGISTER_RESOURCE(KafkaClientControllerService, ControllerService);
Expand Down
26 changes: 24 additions & 2 deletions extensions/kafka/poc/KafkaClientControllerService.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "core/PropertyType.h"
#include "core/ProcessSession.h"
#include "../rdkafka_utils.h"

namespace org::apache::nifi::minifi::kafka {

Expand All @@ -38,18 +40,38 @@ class KafkaClientControllerService : public core::controller::ControllerServiceI
.isRequired(true)
.build();

EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({TopicName});
EXTENSIONAPI static constexpr auto ConsumerGroup =
core::PropertyDefinitionBuilder<>::createProperty("Consumer Group")
.withDescription(
"Consumer Group")
.supportsExpressionLanguage(true)
.isRequired(true)
.build();

EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({TopicName, ConsumerGroup});

EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES

using ControllerServiceImpl::ControllerServiceImpl;

void initialize() override {}
void initialize() override {
setSupportedProperties(Properties);
}
void yield() override {}
bool isWorkAvailable() override { return false; }
bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; }

void onEnable() override;

std::shared_ptr<core::FlowFile> poll(core::ProcessSession& session);
bool commit(core::FlowFile& file);

private:
std::string topic_;
std::string consumer_group_;
std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf_ = nullptr;
std::unique_ptr<rd_kafka_t, utils::rd_kafka_consumer_deleter> consumer_ = nullptr;
std::unique_ptr<rd_kafka_topic_partition_list_t, utils::rd_kafka_topic_partition_list_deleter> partition_list_ = nullptr;
};
} // namespace org::apache::nifi::minifi::kafka
8 changes: 8 additions & 0 deletions utils/include/utils/OptionalUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ nonstd::expected<T, E> operator|(std::optional<T> object, to_expected_wrapper<E>
}
return std::move(*object);
}

template<typename T>
auto operator|(std::optional<T>&& object, const expect_wrapper e) {
if (object) {
return std::move(*object);
}
throw std::runtime_error(e.reason);
}
} // namespace detail
} // namespace org::apache::nifi::minifi::utils

Expand Down

0 comments on commit bc5f33e

Please sign in to comment.