Skip to content

Commit

Permalink
asd
Browse files Browse the repository at this point in the history
  • Loading branch information
martinzink committed Feb 21, 2025
1 parent c43307d commit 785e230
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 1 deletion.
2 changes: 1 addition & 1 deletion extensions/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ include(Fetchlibrdkafka)

include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)

file(GLOB SOURCES "*.cpp")
file(GLOB SOURCES "*.cpp" "poc/*.cpp")

add_minifi_library(minifi-rdkafka-extensions SHARED ${SOURCES})

Expand Down
38 changes: 38 additions & 0 deletions extensions/kafka/poc/CommitKafkaPoC.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "CommitKafkaPoC.h"
#include "core/FlowFile.h"
#include "core/ProcessSession.h"
#include "core/PropertyType.h"
#include "core/Resource.h"

namespace org::apache::nifi::minifi::kafka {

void CommitKafkaPoC::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
kafka_client_controller_service_ = std::dynamic_pointer_cast<KafkaClientControllerService>(context.getControllerService());
}

void CommitKafkaPoC::onTrigger(core::ProcessContext&, core::ProcessSession&) {
}

void CommitKafkaPoC::initialize() {

}

REGISTER_RESOURCE(CommitKafkaPoC, Processor);
} // namespace org::apache::nifi::minifi::kafka
59 changes: 59 additions & 0 deletions extensions/kafka/poc/CommitKafkaPoC.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "core/Processor.h"
#include "core/PropertyDefinitionBuilder.h"
#include "KafkaClientControllerService.h"

namespace org::apache::nifi::minifi::kafka {

class CommitKafkaPoC : public core::ProcessorImpl {
public:
EXTENSIONAPI static constexpr const char* Description = "CommitKafkaPoC";
EXTENSIONAPI static constexpr auto KafkaClient =
core::PropertyDefinitionBuilder<>::createProperty("Kafka Client Controller Service")
.withDescription("Kafka Client Controller Service")
.isRequired(true)
.withAllowedTypes<KafkaClientControllerService>()
.build();

EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "success"};
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "failure"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure};

EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({KafkaClient});

EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = true;

ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS

void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;

using ProcessorImpl::ProcessorImpl;

private:
std::shared_ptr<KafkaClientControllerService> kafka_client_controller_service_;
};

} // namespace org::apache::nifi::minifi::kafka
37 changes: 37 additions & 0 deletions extensions/kafka/poc/ConsumeKafkaPoC.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "ConsumeKafkaPoC.h"
#include "core/FlowFile.h"
#include "core/ProcessSession.h"
#include "core/PropertyType.h"
#include "core/Resource.h"

namespace org::apache::nifi::minifi::kafka {

void ConsumeKafkaPoC::onSchedule(core::ProcessContext&, core::ProcessSessionFactory&) {
}

void ConsumeKafkaPoC::onTrigger(core::ProcessContext&, core::ProcessSession&) {
}

void ConsumeKafkaPoC::initialize() {

}

REGISTER_RESOURCE(ConsumeKafkaPoC, Processor);
} // namespace org::apache::nifi::minifi::kafka
59 changes: 59 additions & 0 deletions extensions/kafka/poc/ConsumeKafkaPoC.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "core/Processor.h"
#include "core/PropertyDefinitionBuilder.h"
#include "KafkaClientControllerService.h"


namespace org::apache::nifi::minifi::kafka {

class ConsumeKafkaPoC : public core::ProcessorImpl {
public:
EXTENSIONAPI static constexpr const char* Description = "ConsumeKafkaPoC";
EXTENSIONAPI static constexpr auto KafkaClient =
core::PropertyDefinitionBuilder<>::createProperty("Kafka Client Controller Service")
.withDescription("Kafka Client Controller Service")
.isRequired(true)
.withAllowedTypes<KafkaClientControllerService>()
.build();

EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "success"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success};

EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({KafkaClient});

EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
EXTENSIONAPI static constexpr bool IsSingleThreaded = true;

ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS

void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;

using ProcessorImpl::ProcessorImpl;

private:
std::shared_ptr<KafkaClientControllerService> kafka_client_controller_service_;
};

} // namespace org::apache::nifi::minifi::kafka
27 changes: 27 additions & 0 deletions extensions/kafka/poc/KafkaClientControllerService.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "KafkaClientControllerService.h"
#include "core/Resource.h"

namespace org::apache::nifi::minifi::kafka {

void KafkaClientControllerService::onEnable() {
}

REGISTER_RESOURCE(KafkaClientControllerService, ControllerService);
} // namespace org::apache::nifi::minifi::kafka
55 changes: 55 additions & 0 deletions extensions/kafka/poc/KafkaClientControllerService.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <array>
#include "core/controller/ControllerService.h"
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "core/PropertyType.h"

namespace org::apache::nifi::minifi::kafka {

class KafkaClientControllerService : public core::controller::ControllerServiceImpl {
public:
EXTENSIONAPI static constexpr const char* Description = "Manages the client for Apache Kafka. This allows for multiple Kafka related processors "
"to reference this single client.";

EXTENSIONAPI static constexpr auto TopicName =
core::PropertyDefinitionBuilder<>::createProperty("Topic Name")
.withDescription(
"The name of the Kafka Topic to pull from.")
.supportsExpressionLanguage(true)
.isRequired(true)
.build();

EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({TopicName});

EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES

using ControllerServiceImpl::ControllerServiceImpl;

void initialize() override {}
void yield() override {}
bool isWorkAvailable() override { return false; }
bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; }

void onEnable() override;
};
} // namespace org::apache::nifi::minifi::kafka

0 comments on commit 785e230

Please sign in to comment.