Skip to content

Commit

Permalink
MINIFICPP-2530 Fix various issue and cleanup OPC processors
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Mar 5, 2025
1 parent d46804a commit 0536d7d
Show file tree
Hide file tree
Showing 20 changed files with 1,655 additions and 725 deletions.
79 changes: 47 additions & 32 deletions PROCESSORS.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions cmake/BundledOpen62541.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ function(use_bundled_open62541 SOURCE_DIR BINARY_DIR)
# Build project
ExternalProject_Add(
open62541-external
URL "https://github.com/open62541/open62541/archive/v1.3.3.tar.gz"
URL_HASH "SHA256=52c049c0f107b4cc382c9e480d677a6360cdd96c472f84689af91b423bd108cb"
URL "https://github.com/open62541/open62541/archive/refs/tags/v1.4.10.tar.gz"
URL_HASH "SHA256=1a2e762e50bb6dae8d80029dfb66fdbc432876a004e62d618f7cf1bb5b4f495f"
SOURCE_DIR "${BINARY_DIR}/thirdparty/open62541-src"
PATCH_COMMAND ${PC}
LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ def __init__(self, feature_context, name, vols, network, image_store, command=No
super().__init__(feature_context, name, 'opcua-server', vols, network, image_store, command)

def get_startup_finished_log_entry(self):
return "TCP network layer listening on"
return "New DiscoveryUrl added: opc.tcp://"

def deploy(self):
if not self.set_deployed():
return

logging.info('Creating and running OPC UA server docker container...')
self.client.containers.run(
"lordgamez/open62541:1.3.5",
"lordgamez/open62541:1.4.10",
detach=True,
name=self.name,
network=self.network.name,
Expand Down
42 changes: 22 additions & 20 deletions docker/test/integration/features/opcua.feature
Original file line number Diff line number Diff line change
Expand Up @@ -106,26 +106,28 @@ Feature: Putting and fetching data to OPC UA server
And a FetchOPCProcessor processor in the "fetch-opc-ua-node" flow
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "fetch-opc-ua-node" flow
And these processor properties are set:
| processor name | property name | property value |
| PutOPCProcessor | Parent node ID | 85 |
| PutOPCProcessor | Parent node ID type | Int |
| PutOPCProcessor | Target node ID | 9999 |
| PutOPCProcessor | Target node ID type | Int |
| PutOPCProcessor | Target node namespace index | 1 |
| PutOPCProcessor | Value type | String |
| PutOPCProcessor | OPC server endpoint | opc.tcp://opcua-server-${feature_id}:4840/ |
| PutOPCProcessor | Target node browse name | testnodename |
| PutOPCProcessor | Certificate path | /tmp/resources/opcua/opcua_client_cert.der |
| PutOPCProcessor | Key path | /tmp/resources/opcua/opcua_client_key.der |
| PutOPCProcessor | Application URI | urn:open62541.server.application |
| FetchOPCProcessor | Node ID | 9999 |
| FetchOPCProcessor | Node ID type | Int |
| FetchOPCProcessor | Namespace index | 1 |
| FetchOPCProcessor | OPC server endpoint | opc.tcp://opcua-server-${feature_id}:4840/ |
| FetchOPCProcessor | Max depth | 1 |
| FetchOPCProcessor | Certificate path | /tmp/resources/opcua/opcua_client_cert.der |
| FetchOPCProcessor | Key path | /tmp/resources/opcua/opcua_client_key.der |
| FetchOPCProcessor | Application URI | urn:open62541.server.application |
| processor name | property name | property value |
| PutOPCProcessor | Parent node ID | 85 |
| PutOPCProcessor | Parent node ID type | Int |
| PutOPCProcessor | Target node ID | 9999 |
| PutOPCProcessor | Target node ID type | Int |
| PutOPCProcessor | Target node namespace index | 1 |
| PutOPCProcessor | Value type | String |
| PutOPCProcessor | OPC server endpoint | opc.tcp://opcua-server-${feature_id}:4840/ |
| PutOPCProcessor | Target node browse name | testnodename |
| PutOPCProcessor | Certificate path | /tmp/resources/opcua/opcua_client_cert.der |
| PutOPCProcessor | Key path | /tmp/resources/opcua/opcua_client_key.der |
| PutOPCProcessor | Trusted server certificate path | /tmp/resources/opcua/opcua_client_cert.der |
| PutOPCProcessor | Application URI | urn:open62541.server.application |
| FetchOPCProcessor | Node ID | 9999 |
| FetchOPCProcessor | Node ID type | Int |
| FetchOPCProcessor | Namespace index | 1 |
| FetchOPCProcessor | OPC server endpoint | opc.tcp://opcua-server-${feature_id}:4840/ |
| FetchOPCProcessor | Max depth | 1 |
| FetchOPCProcessor | Certificate path | /tmp/resources/opcua/opcua_client_cert.der |
| FetchOPCProcessor | Key path | /tmp/resources/opcua/opcua_client_key.der |
| FetchOPCProcessor | Trusted server certificate path | /tmp/resources/opcua/opcua_client_cert.der |
| FetchOPCProcessor | Application URI | urn:open62541.server.application |

And the "success" relationship of the GetFile processor is connected to the PutOPCProcessor
And the "success" relationship of the FetchOPCProcessor processor is connected to the PutFile
Expand Down
Binary file modified docker/test/integration/resources/opcua/opcua_client_cert.der
Binary file not shown.
Binary file modified docker/test/integration/resources/opcua/opcua_client_key.der
Binary file not shown.
2 changes: 1 addition & 1 deletion extensions/opc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ add_minifi_library(minifi-opc-extensions SHARED ${SOURCES})
target_link_libraries(minifi-opc-extensions ${LIBMINIFI} Threads::Threads)
target_link_libraries(minifi-opc-extensions ${CMAKE_DL_LIBS} spdlog::spdlog open62541::open62541)

register_extension(minifi-opc-extensions "OPC EXTENSIONS" OPC-EXTENSIONS "This enables OPC-UA support")
register_extension(minifi-opc-extensions "OPC EXTENSIONS" OPC-EXTENSIONS "This enables OPC-UA support" "extensions/opc/tests")
register_extension_linter(minifi-opc-extensions-linter)
49 changes: 32 additions & 17 deletions extensions/opc/include/fetchopc.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@

namespace org::apache::nifi::minifi::processors {

enum class LazyModeOptions {
On,
Off
};

class FetchOPCProcessor : public BaseOPCProcessor {
public:
explicit FetchOPCProcessor(std::string_view name, const utils::Identifier& uuid = {})
Expand All @@ -48,30 +53,32 @@ class FetchOPCProcessor : public BaseOPCProcessor {

EXTENSIONAPI static constexpr const char* Description = "Fetches OPC-UA node";

EXTENSIONAPI static constexpr auto NodeIDType = core::PropertyDefinitionBuilder<3>::createProperty("Node ID type")
EXTENSIONAPI static constexpr auto NodeIDType = core::PropertyDefinitionBuilder<magic_enum::enum_count<opc::OPCNodeIDType>()>::createProperty("Node ID type")
.withDescription("Specifies the type of the provided node ID")
.isRequired(true)
.withAllowedValues({"Path", "Int", "String"})
.withAllowedValues(magic_enum::enum_names<opc::OPCNodeIDType>())
.build();
EXTENSIONAPI static constexpr auto NodeID = core::PropertyDefinitionBuilder<>::createProperty("Node ID")
.withDescription("Specifies the ID of the root node to traverse")
.withDescription("Specifies the ID of the root node to traverse. In case of a Path Node ID Type, the path should be provided in the format of 'path/to/node'.")
.isRequired(true)
.build();
EXTENSIONAPI static constexpr auto NameSpaceIndex = core::PropertyDefinitionBuilder<>::createProperty("Namespace index")
.withDescription("The index of the namespace. Used only if node ID type is not path.")
.withDescription("The index of the namespace.")
.withPropertyType(core::StandardPropertyTypes::INTEGER_TYPE)
.withDefaultValue("0")
.isRequired(true)
.build();
EXTENSIONAPI static constexpr auto MaxDepth = core::PropertyDefinitionBuilder<>::createProperty("Max depth")
.withDescription("Specifiec the max depth of browsing. 0 means unlimited.")
.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE)
.withDefaultValue("0")
.isRequired(true)
.build();
EXTENSIONAPI static constexpr auto Lazy = core::PropertyDefinitionBuilder<2>::createProperty("Lazy mode")
EXTENSIONAPI static constexpr auto Lazy = core::PropertyDefinitionBuilder<magic_enum::enum_count<LazyModeOptions>()>::createProperty("Lazy mode")
.withDescription("Only creates flowfiles from nodes with new timestamp from the server.")
.withDefaultValue("Off")
.isRequired(true)
.withAllowedValues({"On", "Off"})
.withAllowedValues(magic_enum::enum_names<LazyModeOptions>())
.withDefaultValue(magic_enum::enum_name(LazyModeOptions::Off))
.build();
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(BaseOPCProcessor::Properties, std::to_array<core::PropertyReference>({
NodeIDType,
Expand All @@ -86,6 +93,18 @@ class FetchOPCProcessor : public BaseOPCProcessor {
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Retrieved OPC-UA nodes where value cannot be extracted (only if enabled)"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure};

EXTENSIONAPI static constexpr auto NodeIDAttr = core::OutputAttributeDefinition<>{"NodeID", { Success }, "ID of the node."};
EXTENSIONAPI static constexpr auto NodeIDTypeAttr = core::OutputAttributeDefinition<>{"NodeID type", { Success }, "Type of the node ID."};
EXTENSIONAPI static constexpr auto BrowsenameAttr = core::OutputAttributeDefinition<>{"Browsename", { Success }, "The browse name of the node."};
EXTENSIONAPI static constexpr auto FullPathAttr = core::OutputAttributeDefinition<>{"Full path", { Success }, "The full path of the node."};
EXTENSIONAPI static constexpr auto SourcetimestampAttr = core::OutputAttributeDefinition<>{"Sourcetimestamp", { Success },
"The timestamp of when the node was created in the server as 'MM-dd-yyyy HH:mm:ss.mmm'."};
EXTENSIONAPI static constexpr auto TypenameAttr = core::OutputAttributeDefinition<>{"Typename", { Success }, "The type name of the node data."};
EXTENSIONAPI static constexpr auto DatasizeAttr = core::OutputAttributeDefinition<>{"Datasize", { Success }, "The size of the node data."};

EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 7> {NodeIDAttr, NodeIDTypeAttr, BrowsenameAttr, FullPathAttr, SourcetimestampAttr,
TypenameAttr, DatasizeAttr};

EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
Expand All @@ -98,21 +117,17 @@ class FetchOPCProcessor : public BaseOPCProcessor {
void initialize() override;

protected:
bool nodeFoundCallBack(opc::Client& client, const UA_ReferenceDescription *ref, const std::string& path,
core::ProcessContext& context, core::ProcessSession& session);
bool nodeFoundCallBack(const UA_ReferenceDescription *ref, const std::string& path,
core::ProcessContext& context, core::ProcessSession& session,
size_t& nodes_found, size_t& variables_found);

void OPCData2FlowFile(const opc::NodeData& opcnode, core::ProcessContext& context, core::ProcessSession& session);
void OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext& context, core::ProcessSession& session);

std::string nodeID_;
int32_t nameSpaceIdx_ = 0;
opc::OPCNodeIDType idType_{};
uint32_t nodesFound_ = 0;
uint32_t variablesFound_ = 0;
uint64_t maxDepth_ = 0;
uint64_t max_depth_ = 0;
bool lazy_mode_ = false;

private:
std::vector<UA_NodeId> translatedNodeIDs_; // Only used when user provides path, path->nodeid translation is only done once
std::vector<UA_NodeId> translated_node_ids_; // Only used when user provides path, path->nodeid translation is only done once
std::unordered_map<std::string, std::string> node_timestamp_; // Key = Full path, Value = Timestamp
};

Expand Down
71 changes: 39 additions & 32 deletions extensions/opc/include/opc.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <memory>
#include <string_view>
#include <utility>
#include <optional>

#include "open62541/client.h"
#include "core/logging/Logger.h"
Expand All @@ -35,56 +36,71 @@ namespace org::apache::nifi::minifi::opc {

class OPCException : public minifi::Exception {
public:
OPCException(ExceptionType type, std::string &&errorMsg)
: Exception(type, errorMsg) {
OPCException(ExceptionType type, std::string &&error_msg)
: Exception(type, error_msg) {
}
};

enum class OPCNodeIDType{ Path, Int, String };
enum class OPCNodeIDType{
Path,
Int,
String
};

enum class OPCNodeDataType{ Int64, UInt64, Int32, UInt32, Boolean, Float, Double, String };
enum class OPCNodeDataType{
Int64,
UInt64,
Int32,
UInt32,
Boolean,
Float,
Double,
String
};

struct NodeData;

class Client;

using nodeFoundCallBackFunc = bool(Client& client, const UA_ReferenceDescription*, const std::string&);
using NodeFoundCallBackFunc = bool(const UA_ReferenceDescription*, const std::string&);

class Client {
public:
~Client();
bool isConnected();
UA_StatusCode connect(const std::string& url, const std::string& username = "", const std::string& password = "");
~Client();
NodeData getNodeData(const UA_ReferenceDescription *ref, const std::string& basePath = "");
UA_ReferenceDescription * getNodeReference(UA_NodeId nodeId);
void traverse(UA_NodeId nodeId, const std::function<nodeFoundCallBackFunc>& cb, const std::string& basePath = "", uint64_t maxDepth = 0, bool fetchRoot = true);
bool exists(UA_NodeId nodeId);
UA_StatusCode translateBrowsePathsToNodeIdsRequest(const std::string& path, std::vector<UA_NodeId>& foundNodeIDs, const std::shared_ptr<core::logging::Logger>& logger);
NodeData getNodeData(const UA_ReferenceDescription *ref, const std::string& base_path = "");
UA_ReferenceDescription * getNodeReference(UA_NodeId node_id);
void traverse(UA_NodeId node_id, const std::function<NodeFoundCallBackFunc>& cb, const std::string& base_path = "", uint64_t max_depth = 0, bool fetch_root = true);
bool exists(UA_NodeId node_id);
UA_StatusCode translateBrowsePathsToNodeIdsRequest(const std::string& path, std::vector<UA_NodeId>& found_node_ids, int32_t namespace_index,
const std::vector<UA_UInt32>& path_reference_types, const std::shared_ptr<core::logging::Logger>& logger);

template<typename T>
UA_StatusCode update_node(const UA_NodeId nodeId, T value);
UA_StatusCode update_node(const UA_NodeId node_id, T value);

template<typename T>
UA_StatusCode add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, T value, UA_NodeId *receivedNodeId);
UA_StatusCode add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, T value, UA_NodeId *received_node_id);

static std::unique_ptr<Client> createClient(const std::shared_ptr<core::logging::Logger>& logger, const std::string& applicationURI,
const std::vector<char>& certBuffer, const std::vector<char>& keyBuffer,
const std::vector<std::vector<char>>& trustBuffers);
static std::unique_ptr<Client> createClient(const std::shared_ptr<core::logging::Logger>& logger, const std::string& application_uri,
const std::vector<char>& cert_buffer, const std::vector<char>& key_buffer,
const std::vector<std::vector<char>>& trust_buffers);

private:
Client(const std::shared_ptr<core::logging::Logger>& logger, const std::string& applicationURI,
const std::vector<char>& certBuffer, const std::vector<char>& keyBuffer,
const std::vector<std::vector<char>>& trustBuffers);
Client(const std::shared_ptr<core::logging::Logger>& logger, const std::string& application_uri,
const std::vector<char>& cert_buffer, const std::vector<char>& key_buffer,
const std::vector<std::vector<char>>& trust_buffers);

UA_Client *client_;
std::shared_ptr<core::logging::Logger> logger_;
UA_Logger minifi_ua_logger_{};
};

using ClientPtr = std::unique_ptr<Client>;

struct NodeData {
std::vector<uint8_t> data;
UA_DataTypeKind dataTypeID;
UA_DataTypeKind data_type_id;
std::map<std::string, std::string> attributes;

virtual ~NodeData() {
Expand All @@ -98,7 +114,7 @@ struct NodeData {
NodeData& operator= (NodeData &&) = delete;

NodeData(NodeData&& rhs) : data(rhs.data), attributes(rhs.attributes) {
dataTypeID = rhs.dataTypeID;
data_type_id = rhs.data_type_id;
this->var_ = rhs.var_;
rhs.var_ = nullptr;
}
Expand All @@ -120,22 +136,13 @@ struct NodeData {
friend std::string nodeValue2String(const NodeData&);
};

inline constexpr std::array<std::pair<std::string_view, OPCNodeDataType>, 8> StringToOPCDataTypeMap = {{
{"Int64", OPCNodeDataType::Int64},
{"UInt64", OPCNodeDataType::UInt64 },
{"Int32", OPCNodeDataType::Int32},
{"UInt32", OPCNodeDataType::UInt32},
{"Boolean", OPCNodeDataType::Boolean},
{"Float", OPCNodeDataType::Float},
{"Double", OPCNodeDataType::Double},
{"String", OPCNodeDataType::String}
}};

std::string nodeValue2String(const NodeData& nd);

std::string OPCDateTime2String(UA_DateTime raw_date);

void logFunc(void *context, UA_LogLevel level, UA_LogCategory category, const char *msg, va_list args);

std::optional<UA_UInt32> mapOpcReferenceType(const std::string& ref_type);

} // namespace org::apache::nifi::minifi::opc

Loading

0 comments on commit 0536d7d

Please sign in to comment.