Skip to content

Commit

Permalink
MINIFICPP-2278 Add custom relationship support for python processors
Browse files Browse the repository at this point in the history
Signed-off-by: Ferenc Gerlits <fgerlits@gmail.com>
This closes apache#1722
  • Loading branch information
lordgamez authored and fgerlits committed Feb 29, 2024
1 parent b0767d8 commit 21dcd4d
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 8 deletions.
13 changes: 7 additions & 6 deletions docker/test/integration/cluster/ImageStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ def __build_minifi_cpp_image_with_nifi_python_processors(self):
{pip3_install_command}
RUN pip3 install langchain
USER minificpp
COPY RotatingForwarder.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/RotatingForwarder.py
RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\
wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
""".format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
pip3_install_command=pip3_install_command,
parse_document_url=parse_document_url,
chunk_document_url=chunk_document_url))

return self.__build_image(dockerfile)
return self.__build_image(dockerfile, [os.path.join(self.test_dir, "resources", "python", "RotatingForwarder.py")])

def __build_http_proxy_image(self):
dockerfile = dedent("""\
Expand Down Expand Up @@ -178,11 +179,11 @@ def __build_image(self, dockerfile, context_files=[]):
docker_context.addfile(dockerfile_info,
fileobj=conf_dockerfile_buffer)

for context_file in context_files:
file_info = tarfile.TarInfo(context_file['name'])
file_info.size = context_file['size']
docker_context.addfile(file_info,
fileobj=context_file['file_obj'])
for context_file_path in context_files:
with open(context_file_path, 'rb') as file:
file_info = tarfile.TarInfo(os.path.basename(context_file_path))
file_info.size = os.path.getsize(context_file_path)
docker_context.addfile(file_info, file)
docker_context_buffer.seek(0)

logging.info('Creating configured image...')
Expand Down
20 changes: 20 additions & 0 deletions docker/test/integration/features/python.feature
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,23 @@ Feature: MiNiFi can use python processors in its flows
When all instances start up
Then at least one flowfile's content match the following regex: '{"text": "test_", "metadata": {"filename": "test_file.log", "uuid": "", "chunk_index": 0, "chunk_count": 3}}' in less than 30 seconds
And the Minifi logs contain the following message: "key:document.count value:3" in less than 10 seconds

@USE_NIFI_PYTHON_PROCESSORS
Scenario: MiNiFi C++ can use custom relationships in NiFi native python processors
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with filename "test_file.log" and content "test_data_one" is present in "/tmp/input"
And a file with filename "test_file2.log" and content "test_data_two" is present in "/tmp/input"
And a file with filename "test_file3.log" and content "test_data_three" is present in "/tmp/input"
And a file with filename "test_file4.log" and content "test_data_four" is present in "/tmp/input"
And a RotatingForwarder processor
And a PutFile processor with the "Directory" property set to "/tmp/output"

And the "success" relationship of the GetFile processor is connected to the RotatingForwarder
And the "first" relationship of the RotatingForwarder processor is connected to the PutFile
And the "second" relationship of the RotatingForwarder processor is connected to the PutFile
And the "third" relationship of the RotatingForwarder processor is connected to the PutFile
And the "fourth" relationship of the RotatingForwarder processor is connected to the PutFile

When all instances start up

Then flowfiles with these contents are placed in the monitored directory in less than 10 seconds: "test_data_one,test_data_two,test_data_three,test_data_four"
26 changes: 26 additions & 0 deletions docker/test/integration/minifi/processors/RotatingForwarder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..core.Processor import Processor


class RotatingForwarder(Processor):
def __init__(self, context):
super(RotatingForwarder, self).__init__(
context=context,
clazz='RotatingForwarder',
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
properties={},
schedule={'scheduling strategy': 'EVENT_DRIVEN'},
auto_terminate=[])
33 changes: 33 additions & 0 deletions docker/test/integration/resources/python/RotatingForwarder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult


class RotatingForwarder(FlowFileTransform):
"""
Forwards flow files to a different relationship each time it is called in a round robin manner.
"""
def __init__(self, **kwargs):
self.relationship_index = 0
self.relationships = ["first", "second", "third", "fourth"]

def transform(self, context, flowFile):
content = flowFile.getContentsAsBytes().decode()

relationship = self.relationships[self.relationship_index % len(self.relationships)]
self.relationship_index += 1
self.relationship_index %= len(self.relationships)
return FlowFileTransformResult(relationship, contents=content)
1 change: 0 additions & 1 deletion extensions/python/PYTHON.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ Due to some differences between the NiFi and MiNiFi C++ processors and implement
- Controller properties are not supported at the moment.
- There are some validators in NiFi that are not present in MiNiFi C++, so some property validations will be missing using the NiFi Python processors.
- Allowable values specified in NiFi Python processors are ignored in MiNiFi C++ (due to MiNiFi C++ requiring them to be specified at compile time), so the property values are not pre-verified.
- MiNiFi C++ does not support custom relationship names in Python processors, the only available relationships are "success", "failure" and "original".
- MiNiFi C++ only supports expression language with flow file attributes, so only FLOWFILE_ATTRIBUTES expression language scope is supported, otherwise the expression language will not be evaluated.
- MiNiFi C++ does not support property dependencies, so the property dependencies will be ignored. If a property depends on another property, the property will not be required.
- MiNiFi C++ does not support the use of self.jvm member in Python processors that provides JVM bindings in NiFi, it is set to None in MiNiFi C++.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ def onTrigger(self, context: ProcessContext, session: ProcessSession):
if result_content is not None:
session.write(flow_file, WriteCallback(result_content))

session.transfer(flow_file, self.REL_SUCCESS)
if result.getRelationship() == "success":
session.transfer(flow_file, self.REL_SUCCESS)
else:
session.transferToCustomRelationship(flow_file, result.getRelationship())
session.transfer(original_flow_file, self.REL_ORIGINAL)

@abstractmethod
Expand Down
48 changes: 48 additions & 0 deletions extensions/python/types/PyProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ void PyProcessSession::transfer(const std::shared_ptr<core::FlowFile>& flow_file
session_->transfer(flow_file, relationship);
}

void PyProcessSession::transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow_file, const std::string& relationship_name) {
if (!session_) {
throw std::runtime_error("Access of ProcessSession after it has been released");
}

if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}

session_->transferToCustomRelationship(flow_file, relationship_name);
}

void PyProcessSession::read(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject input_stream_callback) {
if (!session_) {
throw std::runtime_error("Access of ProcessSession after it has been released");
Expand Down Expand Up @@ -153,6 +165,7 @@ static PyMethodDef PyProcessSessionObject_methods[] = { // NOLINT(cppcoreguidel
{"read", (PyCFunction) PyProcessSessionObject::read, METH_VARARGS, nullptr},
{"write", (PyCFunction) PyProcessSessionObject::write, METH_VARARGS, nullptr},
{"transfer", (PyCFunction) PyProcessSessionObject::transfer, METH_VARARGS, nullptr},
{"transferToCustomRelationship", (PyCFunction) PyProcessSessionObject::transferToCustomRelationship, METH_VARARGS, nullptr},
{"remove", (PyCFunction) PyProcessSessionObject::remove, METH_VARARGS, nullptr},
{"getContentsAsBytes", (PyCFunction) PyProcessSessionObject::getContentsAsBytes, METH_VARARGS, nullptr},
{} /* Sentinel */
Expand Down Expand Up @@ -308,6 +321,41 @@ PyObject* PyProcessSessionObject::transfer(PyProcessSessionObject* self, PyObjec
Py_RETURN_NONE;
}

PyObject* PyProcessSessionObject::transferToCustomRelationship(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
Py_RETURN_NONE;
}

PyObject* script_flow_file = nullptr;
const char* relationship_name = nullptr;
if (!PyArg_ParseTuple(args, "O!s", PyScriptFlowFile::typeObject(), &script_flow_file, &relationship_name)) {
throw PyException();
}

if (!relationship_name) {
PyErr_SetString(PyExc_AttributeError, "Custom relationship name is invalid!");
return nullptr;
}

std::string relationship_name_str(relationship_name);
if (relationship_name_str.empty()) {
PyErr_SetString(PyExc_AttributeError, "Custom relationship name is empty!");
return nullptr;
}

const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
if (!flow_file) {
PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'");
Py_RETURN_NONE;
}

BorrowedStr name = BorrowedStr::fromTuple(args, 0);
session->transferToCustomRelationship(flow_file, relationship_name_str);
Py_RETURN_NONE;
}

PyObject* PyProcessSessionObject::getContentsAsBytes(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
Expand Down
2 changes: 2 additions & 0 deletions extensions/python/types/PyProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class PyProcessSession {
std::shared_ptr<core::FlowFile> create(const std::shared_ptr<core::FlowFile>& flow_file = nullptr);
std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile>& flow_file);
void transfer(const std::shared_ptr<core::FlowFile>& flow_file, const core::Relationship& relationship);
void transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow_file, const std::string& relationship_name);
void read(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject input_stream_callback);
void write(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject output_stream_callback);
void remove(const std::shared_ptr<core::FlowFile>& flow_file);
Expand All @@ -59,6 +60,7 @@ struct PyProcessSessionObject {
static PyObject* read(PyProcessSessionObject* self, PyObject* args);
static PyObject* write(PyProcessSessionObject* self, PyObject* args);
static PyObject* transfer(PyProcessSessionObject* self, PyObject* args);
static PyObject* transferToCustomRelationship(PyProcessSessionObject* self, PyObject* args);
static PyObject* remove(PyProcessSessionObject* self, PyObject* args);
static PyObject* getContentsAsBytes(PyProcessSessionObject* self, PyObject* args);

Expand Down
1 change: 1 addition & 0 deletions libminifi/include/core/ProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class ProcessSession : public ReferenceContainer {
std::shared_ptr<core::FlowFile> clone(const core::FlowFile& parent, int64_t offset, int64_t size);
// Transfer the FlowFile to the relationship
virtual void transfer(const std::shared_ptr<core::FlowFile>& flow, const Relationship& relationship);
void transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow, const std::string& relationship_name);

void putAttribute(core::FlowFile& flow, std::string_view key, const std::string& value);
void removeAttribute(core::FlowFile& flow, std::string_view key);
Expand Down
4 changes: 4 additions & 0 deletions libminifi/src/core/ProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ void ProcessSession::transfer(const std::shared_ptr<core::FlowFile>& flow, const
flow->setDeleted(false);
}

void ProcessSession::transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow, const std::string& relationship_name) {
transfer(flow, Relationship{relationship_name, relationship_name});
}

void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) {
gsl_ExpectsAudit(updated_flowfiles_.contains(flow->getUUID())
|| added_flowfiles_.contains(flow->getUUID())
Expand Down

0 comments on commit 21dcd4d

Please sign in to comment.