diff --git a/extensions/kafka/CMakeLists.txt b/extensions/kafka/CMakeLists.txt index 08f400e2c6..ae8cc9a9bb 100644 --- a/extensions/kafka/CMakeLists.txt +++ b/extensions/kafka/CMakeLists.txt @@ -25,7 +25,7 @@ include(Fetchlibrdkafka) include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) -file(GLOB SOURCES "*.cpp") +file(GLOB SOURCES "*.cpp" "poc/*.cpp") add_minifi_library(minifi-rdkafka-extensions SHARED ${SOURCES}) diff --git a/extensions/kafka/poc/CommitKafkaPoC.cpp b/extensions/kafka/poc/CommitKafkaPoC.cpp new file mode 100644 index 0000000000..df71545aae --- /dev/null +++ b/extensions/kafka/poc/CommitKafkaPoC.cpp @@ -0,0 +1,38 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CommitKafkaPoC.h" +#include "core/FlowFile.h" +#include "core/ProcessSession.h" +#include "core/PropertyType.h" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::kafka { + +void CommitKafkaPoC::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + kafka_client_controller_service_ = std::dynamic_pointer_cast(context.getControllerService()); +} + +void CommitKafkaPoC::onTrigger(core::ProcessContext&, core::ProcessSession&) { +} + +void CommitKafkaPoC::initialize() { + +} + +REGISTER_RESOURCE(CommitKafkaPoC, Processor); +} // namespace org::apache::nifi::minifi::kafka diff --git a/extensions/kafka/poc/CommitKafkaPoC.h b/extensions/kafka/poc/CommitKafkaPoC.h new file mode 100644 index 0000000000..5a33277902 --- /dev/null +++ b/extensions/kafka/poc/CommitKafkaPoC.h @@ -0,0 +1,59 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "core/Processor.h" +#include "core/PropertyDefinitionBuilder.h" +#include "KafkaClientControllerService.h" + +namespace org::apache::nifi::minifi::kafka { + +class CommitKafkaPoC : public core::ProcessorImpl { + public: + EXTENSIONAPI static constexpr const char* Description = "CommitKafkaPoC"; + EXTENSIONAPI static constexpr auto KafkaClient = + core::PropertyDefinitionBuilder<>::createProperty("Kafka Client Controller Service") + .withDescription("Kafka Client Controller Service") + .isRequired(true) + .withAllowedTypes() + .build(); + + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "success"}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "failure"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; + + EXTENSIONAPI static constexpr auto Properties = std::to_array({KafkaClient}); + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = true; + + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + void initialize() override; + + using ProcessorImpl::ProcessorImpl; + +private: + std::shared_ptr kafka_client_controller_service_; +}; + +} // namespace org::apache::nifi::minifi::kafka diff --git a/extensions/kafka/poc/ConsumeKafkaPoC.cpp b/extensions/kafka/poc/ConsumeKafkaPoC.cpp new file mode 100644 index 0000000000..ff31ec103d --- /dev/null +++ b/extensions/kafka/poc/ConsumeKafkaPoC.cpp @@ -0,0 +1,37 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ConsumeKafkaPoC.h" +#include "core/FlowFile.h" +#include "core/ProcessSession.h" +#include "core/PropertyType.h" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::kafka { + +void ConsumeKafkaPoC::onSchedule(core::ProcessContext&, core::ProcessSessionFactory&) { +} + +void ConsumeKafkaPoC::onTrigger(core::ProcessContext&, core::ProcessSession&) { +} + +void ConsumeKafkaPoC::initialize() { + +} + +REGISTER_RESOURCE(ConsumeKafkaPoC, Processor); +} // namespace org::apache::nifi::minifi::kafka \ No newline at end of file diff --git a/extensions/kafka/poc/ConsumeKafkaPoC.h b/extensions/kafka/poc/ConsumeKafkaPoC.h new file mode 100644 index 0000000000..7f6bbecc92 --- /dev/null +++ b/extensions/kafka/poc/ConsumeKafkaPoC.h @@ -0,0 +1,59 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "core/Processor.h" +#include "core/PropertyDefinitionBuilder.h" +#include "KafkaClientControllerService.h" + + +namespace org::apache::nifi::minifi::kafka { + +class ConsumeKafkaPoC : public core::ProcessorImpl { + public: + EXTENSIONAPI static constexpr const char* Description = "ConsumeKafkaPoC"; + EXTENSIONAPI static constexpr auto KafkaClient = + core::PropertyDefinitionBuilder<>::createProperty("Kafka Client Controller Service") + .withDescription("Kafka Client Controller Service") + .isRequired(true) + .withAllowedTypes() + .build(); + + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "success"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success}; + + EXTENSIONAPI static constexpr auto Properties = std::to_array({KafkaClient}); + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN; + EXTENSIONAPI static constexpr bool IsSingleThreaded = true; + + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + void initialize() override; + + using ProcessorImpl::ProcessorImpl; + + private: + std::shared_ptr kafka_client_controller_service_; +}; + +} // namespace org::apache::nifi::minifi::kafka diff --git a/extensions/kafka/poc/KafkaClientControllerService.cpp b/extensions/kafka/poc/KafkaClientControllerService.cpp new file mode 100644 index 0000000000..d6ab1671ba --- /dev/null +++ b/extensions/kafka/poc/KafkaClientControllerService.cpp @@ -0,0 +1,27 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "KafkaClientControllerService.h" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::kafka { + +void KafkaClientControllerService::onEnable() { +} + +REGISTER_RESOURCE(KafkaClientControllerService, ControllerService); +} // namespace org::apache::nifi::minifi::kafka diff --git a/extensions/kafka/poc/KafkaClientControllerService.h b/extensions/kafka/poc/KafkaClientControllerService.h new file mode 100644 index 0000000000..e0ee277c43 --- /dev/null +++ b/extensions/kafka/poc/KafkaClientControllerService.h @@ -0,0 +1,55 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "core/controller/ControllerService.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" + +namespace org::apache::nifi::minifi::kafka { + +class KafkaClientControllerService : public core::controller::ControllerServiceImpl { +public: + EXTENSIONAPI static constexpr const char* Description = "Manages the client for Apache Kafka. This allows for multiple Kafka related processors " + "to reference this single client."; + + EXTENSIONAPI static constexpr auto TopicName = + core::PropertyDefinitionBuilder<>::createProperty("Topic Name") + .withDescription( + "The name of the Kafka Topic to pull from.") + .supportsExpressionLanguage(true) + .isRequired(true) + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::to_array({TopicName}); + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES + + using ControllerServiceImpl::ControllerServiceImpl; + + void initialize() override {} + void yield() override {} + bool isWorkAvailable() override { return false; } + bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; } + + void onEnable() override; +}; +} // namespace org::apache::nifi::minifi::kafka