diff --git a/PROCESSORS.md b/PROCESSORS.md index 1cd186de32..02c0de4cc2 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -916,20 +916,21 @@ Fetches OPC-UA node In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|---------------------------------|---------------|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **OPC server endpoint** | | | Specifies the address, port and relative path of an OPC endpoint | -| Application URI | | | Application URI of the client in the format 'urn:unconfigured:application'. Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names. | -| Username | | | Username to log in with. | -| Password | | | Password to log in with.
**Sensitive Property: true** | -| Certificate path | | | Path to the DER-encoded cert file | -| Key path | | | Path to the DER-encoded key file | -| Trusted server certificate path | | | Path to the DER-encoded trusted server certificate | -| **Node ID type** | | Path
Int
String | Specifies the type of the provided node ID | -| **Node ID** | | | Specifies the ID of the root node to traverse | -| Namespace index | 0 | | The index of the namespace. Used only if node ID type is not path. | -| Max depth | 0 | | Specifiec the max depth of browsing. 0 means unlimited. | -| **Lazy mode** | Off | On
Off | Only creates flowfiles from nodes with new timestamp from the server. | +| Name | Default Value | Allowable Values | Description | +|---------------------------------|---------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **OPC server endpoint** | | | Specifies the address, port and relative path of an OPC endpoint | +| Application URI | | | Application URI of the client in the format 'urn:unconfigured:application'. Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names. | +| Username | | | Username to log in with. | +| Password | | | Password to log in with.
**Sensitive Property: true** | +| Certificate path | | | Path to the DER-encoded cert file | +| Key path | | | Path to the DER-encoded key file | +| Trusted server certificate path | | | Comma separated list of paths to the DER-encoded trusted server certificates | +| Path reference types | | | Specify the reference types between nodes in the path if Path Node ID type is used. If not provided, all reference types are assumed to be Organizes. The format is 'referenceType1/referenceType2/.../referenceTypeN' and the supported reference types are Organizes, HasComponent, HasProperty, and HasSubtype. | +| **Node ID type** | | Path
Int
String | Specifies the type of the provided node ID | +| **Node ID** | | | Specifies the ID of the root node to traverse. In case of a Path Node ID Type, the path should be provided in the format of 'path/to/node'. | +| **Namespace index** | 0 | | The index of the namespace. | +| **Max depth** | 0 | | Specifiec the max depth of browsing. 0 means unlimited. | +| **Lazy mode** | Off | On
Off | Only creates flowfiles from nodes with new timestamp from the server. | ### Relationships @@ -938,6 +939,18 @@ In the list below, the names of required properties appear in bold. Any other pr | success | Successfully retrieved OPC-UA nodes | | failure | Retrieved OPC-UA nodes where value cannot be extracted (only if enabled) | +### Output Attributes + +| Attribute | Relationship | Description | +|-----------------|--------------|----------------------------------------------------------------------------------------| +| NodeID | success | ID of the node. | +| NodeID type | success | Type of the node ID. | +| Browsename | success | The browse name of the node. | +| Full path | success | The full path of the node. | +| Sourcetimestamp | success | The timestamp of when the node was created in the server as 'MM-dd-yyyy HH:mm:ss.mmm'. | +| Typename | success | The type name of the node data. | +| Datasize | success | The size of the node data. | + ## FetchS3Object @@ -2378,29 +2391,31 @@ In the list below, the names of required properties appear in bold. Any other pr ### Description -Creates/updates OPC nodes +Creates/updates OPC nodes ### Properties In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|---------------------------------|---------------|-----------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **OPC server endpoint** | | | Specifies the address, port and relative path of an OPC endpoint | -| Application URI | | | Application URI of the client in the format 'urn:unconfigured:application'. Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names. | -| Username | | | Username to log in with. | -| Password | | | Password to log in with.
**Sensitive Property: true** | -| Certificate path | | | Path to the DER-encoded cert file | -| Key path | | | Path to the DER-encoded key file | -| Trusted server certificate path | | | Path to the DER-encoded trusted server certificate | -| **Parent node ID type** | | Path
Int
String | Specifies the type of the provided node ID | -| **Parent node ID** | | | Specifies the ID of the root node to traverse | -| Parent node namespace index | 0 | | The index of the namespace. Used only if node ID type is not path. | -| **Value type** | | Int64
UInt64
Int32
UInt32
Boolean
Float
Double
String | Set the OPC value type of the created nodes | -| Target node ID type | | | ID type of target node. Allowed values are: Int, String.
**Supports Expression Language: true** | -| Target node ID | | | ID of target node.
**Supports Expression Language: true** | -| Target node browse name | | | Browse name of target node. Only used when new node is created.
**Supports Expression Language: true** | -| Target node namespace index | | | The index of the namespace. Used only if node ID type is not path.
**Supports Expression Language: true** | +| Name | Default Value | Allowable Values | Description | +|---------------------------------|---------------|-----------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **OPC server endpoint** | | | Specifies the address, port and relative path of an OPC endpoint | +| Application URI | | | Application URI of the client in the format 'urn:unconfigured:application'. Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names. | +| Username | | | Username to log in with. | +| Password | | | Password to log in with.
**Sensitive Property: true** | +| Certificate path | | | Path to the DER-encoded cert file | +| Key path | | | Path to the DER-encoded key file | +| Trusted server certificate path | | | Comma separated list of paths to the DER-encoded trusted server certificates | +| Path reference types | | | Specify the reference types between nodes in the path if Path Node ID type is used. If not provided, all reference types are assumed to be Organizes. The format is 'referenceType1/referenceType2/.../referenceTypeN' and the supported reference types are Organizes, HasComponent, HasProperty, and HasSubtype. | +| **Parent node ID type** | | Path
Int
String | Specifies the type of the provided node ID | +| **Parent node ID** | | | Specifies the ID of the root node to traverse | +| **Parent node namespace index** | 0 | | The index of the namespace of the parent node. | +| **Value type** | | Int64
UInt64
Int32
UInt32
Boolean
Float
Double
String | Set the OPC value type of the created nodes | +| **Target node ID type** | | Int
String | ID type of target node. Allowed values are: Int, String.
**Supports Expression Language: true** | +| **Target node ID** | | | ID of target node.
**Supports Expression Language: true** | +| Target node browse name | | | Browse name of target node. Only used when new node is created.
**Supports Expression Language: true** | +| **Target node namespace index** | 0 | | The index of the namespace of the target node.
**Supports Expression Language: true** | +| Create node reference type | HasComponent | Organizes
HasComponent
HasProperty
HasSubtype | Reference type used when a new node is created. | ### Relationships diff --git a/cmake/BundledOpen62541.cmake b/cmake/BundledOpen62541.cmake index b2aa068e4e..9a48490aab 100644 --- a/cmake/BundledOpen62541.cmake +++ b/cmake/BundledOpen62541.cmake @@ -43,8 +43,8 @@ function(use_bundled_open62541 SOURCE_DIR BINARY_DIR) # Build project ExternalProject_Add( open62541-external - URL "https://github.com/open62541/open62541/archive/v1.3.3.tar.gz" - URL_HASH "SHA256=52c049c0f107b4cc382c9e480d677a6360cdd96c472f84689af91b423bd108cb" + URL "https://github.com/open62541/open62541/archive/refs/tags/v1.4.10.tar.gz" + URL_HASH "SHA256=1a2e762e50bb6dae8d80029dfb66fdbc432876a004e62d618f7cf1bb5b4f495f" SOURCE_DIR "${BINARY_DIR}/thirdparty/open62541-src" PATCH_COMMAND ${PC} LIST_SEPARATOR % # This is needed for passing semicolon-separated lists diff --git a/docker/test/integration/cluster/containers/OPCUAServerContainer.py b/docker/test/integration/cluster/containers/OPCUAServerContainer.py index b112e9c87d..fefb8759ce 100644 --- a/docker/test/integration/cluster/containers/OPCUAServerContainer.py +++ b/docker/test/integration/cluster/containers/OPCUAServerContainer.py @@ -23,7 +23,7 @@ def __init__(self, feature_context, name, vols, network, image_store, command=No super().__init__(feature_context, name, 'opcua-server', vols, network, image_store, command) def get_startup_finished_log_entry(self): - return "TCP network layer listening on" + return "New DiscoveryUrl added: opc.tcp://" def deploy(self): if not self.set_deployed(): @@ -31,7 +31,7 @@ def deploy(self): logging.info('Creating and running OPC UA server docker container...') self.client.containers.run( - "lordgamez/open62541:1.3.5", + "lordgamez/open62541:1.4.10", detach=True, name=self.name, network=self.network.name, diff --git a/docker/test/integration/features/opcua.feature b/docker/test/integration/features/opcua.feature index e4217a53d5..8f561f3568 100644 --- a/docker/test/integration/features/opcua.feature +++ b/docker/test/integration/features/opcua.feature @@ -106,26 +106,28 @@ Feature: Putting and fetching data to OPC UA server And a FetchOPCProcessor processor in the "fetch-opc-ua-node" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "fetch-opc-ua-node" flow And these processor properties are set: - | processor name | property name | property value | - | PutOPCProcessor | Parent node ID | 85 | - | PutOPCProcessor | Parent node ID type | Int | - | PutOPCProcessor | Target node ID | 9999 | - | PutOPCProcessor | Target node ID type | Int | - | PutOPCProcessor | Target node namespace index | 1 | - | PutOPCProcessor | Value type | String | - | PutOPCProcessor | OPC server endpoint | opc.tcp://opcua-server-${feature_id}:4840/ | - | PutOPCProcessor | Target node browse name | testnodename | - | PutOPCProcessor | Certificate path | /tmp/resources/opcua/opcua_client_cert.der | - | PutOPCProcessor | Key path | /tmp/resources/opcua/opcua_client_key.der | - | PutOPCProcessor | Application URI | urn:open62541.server.application | - | FetchOPCProcessor | Node ID | 9999 | - | FetchOPCProcessor | Node ID type | Int | - | FetchOPCProcessor | Namespace index | 1 | - | FetchOPCProcessor | OPC server endpoint | opc.tcp://opcua-server-${feature_id}:4840/ | - | FetchOPCProcessor | Max depth | 1 | - | FetchOPCProcessor | Certificate path | /tmp/resources/opcua/opcua_client_cert.der | - | FetchOPCProcessor | Key path | /tmp/resources/opcua/opcua_client_key.der | - | FetchOPCProcessor | Application URI | urn:open62541.server.application | + | processor name | property name | property value | + | PutOPCProcessor | Parent node ID | 85 | + | PutOPCProcessor | Parent node ID type | Int | + | PutOPCProcessor | Target node ID | 9999 | + | PutOPCProcessor | Target node ID type | Int | + | PutOPCProcessor | Target node namespace index | 1 | + | PutOPCProcessor | Value type | String | + | PutOPCProcessor | OPC server endpoint | opc.tcp://opcua-server-${feature_id}:4840/ | + | PutOPCProcessor | Target node browse name | testnodename | + | PutOPCProcessor | Certificate path | /tmp/resources/opcua/opcua_client_cert.der | + | PutOPCProcessor | Key path | /tmp/resources/opcua/opcua_client_key.der | + | PutOPCProcessor | Trusted server certificate path | /tmp/resources/opcua/opcua_client_cert.der | + | PutOPCProcessor | Application URI | urn:open62541.server.application | + | FetchOPCProcessor | Node ID | 9999 | + | FetchOPCProcessor | Node ID type | Int | + | FetchOPCProcessor | Namespace index | 1 | + | FetchOPCProcessor | OPC server endpoint | opc.tcp://opcua-server-${feature_id}:4840/ | + | FetchOPCProcessor | Max depth | 1 | + | FetchOPCProcessor | Certificate path | /tmp/resources/opcua/opcua_client_cert.der | + | FetchOPCProcessor | Key path | /tmp/resources/opcua/opcua_client_key.der | + | FetchOPCProcessor | Trusted server certificate path | /tmp/resources/opcua/opcua_client_cert.der | + | FetchOPCProcessor | Application URI | urn:open62541.server.application | And the "success" relationship of the GetFile processor is connected to the PutOPCProcessor And the "success" relationship of the FetchOPCProcessor processor is connected to the PutFile diff --git a/docker/test/integration/resources/opcua/opcua_client_cert.der b/docker/test/integration/resources/opcua/opcua_client_cert.der index 44be253352..d4bc99ee5f 100644 Binary files a/docker/test/integration/resources/opcua/opcua_client_cert.der and b/docker/test/integration/resources/opcua/opcua_client_cert.der differ diff --git a/docker/test/integration/resources/opcua/opcua_client_key.der b/docker/test/integration/resources/opcua/opcua_client_key.der index 91b30bfadb..acc743fbc2 100644 Binary files a/docker/test/integration/resources/opcua/opcua_client_key.der and b/docker/test/integration/resources/opcua/opcua_client_key.der differ diff --git a/extensions/opc/CMakeLists.txt b/extensions/opc/CMakeLists.txt index 0d120de83d..567799bb58 100644 --- a/extensions/opc/CMakeLists.txt +++ b/extensions/opc/CMakeLists.txt @@ -40,5 +40,5 @@ add_minifi_library(minifi-opc-extensions SHARED ${SOURCES}) target_link_libraries(minifi-opc-extensions ${LIBMINIFI} Threads::Threads) target_link_libraries(minifi-opc-extensions ${CMAKE_DL_LIBS} spdlog::spdlog open62541::open62541) -register_extension(minifi-opc-extensions "OPC EXTENSIONS" OPC-EXTENSIONS "This enables OPC-UA support") +register_extension(minifi-opc-extensions "OPC EXTENSIONS" OPC-EXTENSIONS "This enables OPC-UA support" "extensions/opc/tests") register_extension_linter(minifi-opc-extensions-linter) diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h index 92b9032646..904e583780 100644 --- a/extensions/opc/include/fetchopc.h +++ b/extensions/opc/include/fetchopc.h @@ -39,6 +39,11 @@ namespace org::apache::nifi::minifi::processors { +enum class LazyModeOptions { + On, + Off +}; + class FetchOPCProcessor : public BaseOPCProcessor { public: explicit FetchOPCProcessor(std::string_view name, const utils::Identifier& uuid = {}) @@ -48,30 +53,32 @@ class FetchOPCProcessor : public BaseOPCProcessor { EXTENSIONAPI static constexpr const char* Description = "Fetches OPC-UA node"; - EXTENSIONAPI static constexpr auto NodeIDType = core::PropertyDefinitionBuilder<3>::createProperty("Node ID type") + EXTENSIONAPI static constexpr auto NodeIDType = core::PropertyDefinitionBuilder()>::createProperty("Node ID type") .withDescription("Specifies the type of the provided node ID") .isRequired(true) - .withAllowedValues({"Path", "Int", "String"}) + .withAllowedValues(magic_enum::enum_names()) .build(); EXTENSIONAPI static constexpr auto NodeID = core::PropertyDefinitionBuilder<>::createProperty("Node ID") - .withDescription("Specifies the ID of the root node to traverse") + .withDescription("Specifies the ID of the root node to traverse. In case of a Path Node ID Type, the path should be provided in the format of 'path/to/node'.") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto NameSpaceIndex = core::PropertyDefinitionBuilder<>::createProperty("Namespace index") - .withDescription("The index of the namespace. Used only if node ID type is not path.") + .withDescription("The index of the namespace.") .withPropertyType(core::StandardPropertyTypes::INTEGER_TYPE) .withDefaultValue("0") + .isRequired(true) .build(); EXTENSIONAPI static constexpr auto MaxDepth = core::PropertyDefinitionBuilder<>::createProperty("Max depth") .withDescription("Specifiec the max depth of browsing. 0 means unlimited.") .withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE) .withDefaultValue("0") + .isRequired(true) .build(); - EXTENSIONAPI static constexpr auto Lazy = core::PropertyDefinitionBuilder<2>::createProperty("Lazy mode") + EXTENSIONAPI static constexpr auto Lazy = core::PropertyDefinitionBuilder()>::createProperty("Lazy mode") .withDescription("Only creates flowfiles from nodes with new timestamp from the server.") - .withDefaultValue("Off") .isRequired(true) - .withAllowedValues({"On", "Off"}) + .withAllowedValues(magic_enum::enum_names()) + .withDefaultValue(magic_enum::enum_name(LazyModeOptions::Off)) .build(); EXTENSIONAPI static constexpr auto Properties = utils::array_cat(BaseOPCProcessor::Properties, std::to_array({ NodeIDType, @@ -86,6 +93,18 @@ class FetchOPCProcessor : public BaseOPCProcessor { EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Retrieved OPC-UA nodes where value cannot be extracted (only if enabled)"}; EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; + EXTENSIONAPI static constexpr auto NodeIDAttr = core::OutputAttributeDefinition<>{"NodeID", { Success }, "ID of the node."}; + EXTENSIONAPI static constexpr auto NodeIDTypeAttr = core::OutputAttributeDefinition<>{"NodeID type", { Success }, "Type of the node ID."}; + EXTENSIONAPI static constexpr auto BrowsenameAttr = core::OutputAttributeDefinition<>{"Browsename", { Success }, "The browse name of the node."}; + EXTENSIONAPI static constexpr auto FullPathAttr = core::OutputAttributeDefinition<>{"Full path", { Success }, "The full path of the node."}; + EXTENSIONAPI static constexpr auto SourcetimestampAttr = core::OutputAttributeDefinition<>{"Sourcetimestamp", { Success }, + "The timestamp of when the node was created in the server as 'MM-dd-yyyy HH:mm:ss.mmm'."}; + EXTENSIONAPI static constexpr auto TypenameAttr = core::OutputAttributeDefinition<>{"Typename", { Success }, "The type name of the node data."}; + EXTENSIONAPI static constexpr auto DatasizeAttr = core::OutputAttributeDefinition<>{"Datasize", { Success }, "The size of the node data."}; + + EXTENSIONAPI static constexpr auto OutputAttributes = std::array {NodeIDAttr, NodeIDTypeAttr, BrowsenameAttr, FullPathAttr, SourcetimestampAttr, + TypenameAttr, DatasizeAttr}; + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN; @@ -98,21 +117,17 @@ class FetchOPCProcessor : public BaseOPCProcessor { void initialize() override; protected: - bool nodeFoundCallBack(opc::Client& client, const UA_ReferenceDescription *ref, const std::string& path, - core::ProcessContext& context, core::ProcessSession& session); + bool nodeFoundCallBack(const UA_ReferenceDescription *ref, const std::string& path, + core::ProcessContext& context, core::ProcessSession& session, + size_t& nodes_found, size_t& variables_found); - void OPCData2FlowFile(const opc::NodeData& opcnode, core::ProcessContext& context, core::ProcessSession& session); + void OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext& context, core::ProcessSession& session); - std::string nodeID_; - int32_t nameSpaceIdx_ = 0; - opc::OPCNodeIDType idType_{}; - uint32_t nodesFound_ = 0; - uint32_t variablesFound_ = 0; - uint64_t maxDepth_ = 0; + uint64_t max_depth_ = 0; bool lazy_mode_ = false; private: - std::vector translatedNodeIDs_; // Only used when user provides path, path->nodeid translation is only done once + std::vector translated_node_ids_; // Only used when user provides path, path->nodeid translation is only done once std::unordered_map node_timestamp_; // Key = Full path, Value = Timestamp }; diff --git a/extensions/opc/include/opc.h b/extensions/opc/include/opc.h index 04cda4f85a..3f8ad07eef 100644 --- a/extensions/opc/include/opc.h +++ b/extensions/opc/include/opc.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "open62541/client.h" #include "core/logging/Logger.h" @@ -35,56 +36,71 @@ namespace org::apache::nifi::minifi::opc { class OPCException : public minifi::Exception { public: - OPCException(ExceptionType type, std::string &&errorMsg) - : Exception(type, errorMsg) { + OPCException(ExceptionType type, std::string &&error_msg) + : Exception(type, error_msg) { } }; -enum class OPCNodeIDType{ Path, Int, String }; +enum class OPCNodeIDType{ + Path, + Int, + String +}; -enum class OPCNodeDataType{ Int64, UInt64, Int32, UInt32, Boolean, Float, Double, String }; +enum class OPCNodeDataType{ + Int64, + UInt64, + Int32, + UInt32, + Boolean, + Float, + Double, + String +}; struct NodeData; class Client; -using nodeFoundCallBackFunc = bool(Client& client, const UA_ReferenceDescription*, const std::string&); +using NodeFoundCallBackFunc = bool(const UA_ReferenceDescription*, const std::string&); class Client { public: + ~Client(); bool isConnected(); UA_StatusCode connect(const std::string& url, const std::string& username = "", const std::string& password = ""); - ~Client(); - NodeData getNodeData(const UA_ReferenceDescription *ref, const std::string& basePath = ""); - UA_ReferenceDescription * getNodeReference(UA_NodeId nodeId); - void traverse(UA_NodeId nodeId, const std::function& cb, const std::string& basePath = "", uint64_t maxDepth = 0, bool fetchRoot = true); - bool exists(UA_NodeId nodeId); - UA_StatusCode translateBrowsePathsToNodeIdsRequest(const std::string& path, std::vector& foundNodeIDs, const std::shared_ptr& logger); + NodeData getNodeData(const UA_ReferenceDescription *ref, const std::string& base_path = ""); + UA_ReferenceDescription * getNodeReference(UA_NodeId node_id); + void traverse(UA_NodeId node_id, const std::function& cb, const std::string& base_path = "", uint64_t max_depth = 0, bool fetch_root = true); + bool exists(UA_NodeId node_id); + UA_StatusCode translateBrowsePathsToNodeIdsRequest(const std::string& path, std::vector& found_node_ids, int32_t namespace_index, + const std::vector& path_reference_types, const std::shared_ptr& logger); template - UA_StatusCode update_node(const UA_NodeId nodeId, T value); + UA_StatusCode update_node(const UA_NodeId node_id, T value); template - UA_StatusCode add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, T value, UA_NodeId *receivedNodeId); + UA_StatusCode add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, T value, UA_NodeId *received_node_id); - static std::unique_ptr createClient(const std::shared_ptr& logger, const std::string& applicationURI, - const std::vector& certBuffer, const std::vector& keyBuffer, - const std::vector>& trustBuffers); + static std::unique_ptr createClient(const std::shared_ptr& logger, const std::string& application_uri, + const std::vector& cert_buffer, const std::vector& key_buffer, + const std::vector>& trust_buffers); private: - Client(const std::shared_ptr& logger, const std::string& applicationURI, - const std::vector& certBuffer, const std::vector& keyBuffer, - const std::vector>& trustBuffers); + Client(const std::shared_ptr& logger, const std::string& application_uri, + const std::vector& cert_buffer, const std::vector& key_buffer, + const std::vector>& trust_buffers); UA_Client *client_; std::shared_ptr logger_; + UA_Logger minifi_ua_logger_{}; }; using ClientPtr = std::unique_ptr; struct NodeData { std::vector data; - UA_DataTypeKind dataTypeID; + UA_DataTypeKind data_type_id; std::map attributes; virtual ~NodeData() { @@ -98,7 +114,7 @@ struct NodeData { NodeData& operator= (NodeData &&) = delete; NodeData(NodeData&& rhs) : data(rhs.data), attributes(rhs.attributes) { - dataTypeID = rhs.dataTypeID; + data_type_id = rhs.data_type_id; this->var_ = rhs.var_; rhs.var_ = nullptr; } @@ -120,22 +136,13 @@ struct NodeData { friend std::string nodeValue2String(const NodeData&); }; -inline constexpr std::array, 8> StringToOPCDataTypeMap = {{ - {"Int64", OPCNodeDataType::Int64}, - {"UInt64", OPCNodeDataType::UInt64 }, - {"Int32", OPCNodeDataType::Int32}, - {"UInt32", OPCNodeDataType::UInt32}, - {"Boolean", OPCNodeDataType::Boolean}, - {"Float", OPCNodeDataType::Float}, - {"Double", OPCNodeDataType::Double}, - {"String", OPCNodeDataType::String} -}}; - std::string nodeValue2String(const NodeData& nd); std::string OPCDateTime2String(UA_DateTime raw_date); void logFunc(void *context, UA_LogLevel level, UA_LogCategory category, const char *msg, va_list args); +std::optional mapOpcReferenceType(const std::string& ref_type); + } // namespace org::apache::nifi::minifi::opc diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h index c60cbd2b8d..cf54a5b061 100644 --- a/extensions/opc/include/opcbase.h +++ b/extensions/opc/include/opcbase.h @@ -56,7 +56,11 @@ class BaseOPCProcessor : public core::ProcessorImpl { .withDescription("Path to the DER-encoded key file") .build(); EXTENSIONAPI static constexpr auto TrustedPath = core::PropertyDefinitionBuilder<>::createProperty("Trusted server certificate path") - .withDescription("Path to the DER-encoded trusted server certificate") + .withDescription("Comma separated list of paths to the DER-encoded trusted server certificates") + .build(); + EXTENSIONAPI static constexpr auto PathReferenceTypes = core::PropertyDefinitionBuilder<>::createProperty("Path reference types") + .withDescription("Specify the reference types between nodes in the path if Path Node ID type is used. If not provided, all reference types are assumed to be Organizes. " + "The format is 'referenceType1/referenceType2/.../referenceTypeN' and the supported reference types are Organizes, HasComponent, HasProperty, and HasSubtype.") .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array({ OPCServerEndPoint, @@ -65,7 +69,8 @@ class BaseOPCProcessor : public core::ProcessorImpl { Password, CertificatePath, KeyPath, - TrustedPath + TrustedPath, + PathReferenceTypes }); @@ -77,23 +82,30 @@ class BaseOPCProcessor : public core::ProcessorImpl { protected: virtual bool reconnect(); + void readPathReferenceTypes(core::ProcessContext& context, const std::string& node_id); + void parseIdType(core::ProcessContext& context, const core::PropertyReference& prop); std::shared_ptr logger_; + std::string node_id_; + int32_t namespace_idx_ = 0; + opc::OPCNodeIDType id_type_{}; + opc::ClientPtr connection_; - std::string endPointURL_; + std::string endpoint_url; - std::string applicationURI_; + std::string application_uri_; std::string username_; std::string password_; std::string certpath_; std::string keypath_; std::string trustpath_; - std::vector certBuffer_; - std::vector keyBuffer_; - std::vector> trustBuffers_; + std::vector cert_buffer_; + std::vector key_buffer_; + std::vector> trust_buffers_; + std::vector path_reference_types_; }; } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h index 7e3088d8e3..83bc6d0e5e 100644 --- a/extensions/opc/include/putopc.h +++ b/extensions/opc/include/putopc.h @@ -34,12 +34,13 @@ #include "core/logging/LoggerFactory.h" #include "utils/ArrayUtils.h" #include "utils/Id.h" +#include "utils/expected.h" namespace org::apache::nifi::minifi::processors { class PutOPCProcessor : public BaseOPCProcessor { public: - EXTENSIONAPI static constexpr const char* Description = "Creates/updates OPC nodes"; + EXTENSIONAPI static constexpr const char* Description = "Creates/updates OPC nodes"; EXTENSIONAPI static constexpr auto ParentNodeIDType = core::PropertyDefinitionBuilder<3>::createProperty("Parent node ID type") .withDescription("Specifies the type of the provided node ID") @@ -51,30 +52,41 @@ class PutOPCProcessor : public BaseOPCProcessor { .isRequired(true) .build(); EXTENSIONAPI static constexpr auto ParentNameSpaceIndex = core::PropertyDefinitionBuilder<>::createProperty("Parent node namespace index") - .withDescription("The index of the namespace. Used only if node ID type is not path.") + .withDescription("The index of the namespace of the parent node.") .withPropertyType(core::StandardPropertyTypes::INTEGER_TYPE) .withDefaultValue("0") + .isRequired(true) .build(); - EXTENSIONAPI static constexpr auto ValueType = core::PropertyDefinitionBuilder::createProperty("Value type") + EXTENSIONAPI static constexpr auto ValueType = core::PropertyDefinitionBuilder()>::createProperty("Value type") .withDescription("Set the OPC value type of the created nodes") - .withAllowedValues(utils::getKeys(opc::StringToOPCDataTypeMap)) + .withAllowedValues(magic_enum::enum_names()) .isRequired(true) .build(); - EXTENSIONAPI static constexpr auto TargetNodeIDType = core::PropertyDefinitionBuilder<>::createProperty("Target node ID type") + EXTENSIONAPI static constexpr auto TargetNodeIDType = core::PropertyDefinitionBuilder<2>::createProperty("Target node ID type") .withDescription("ID type of target node. Allowed values are: Int, String.") + .withAllowedValues({"Int", "String"}) .supportsExpressionLanguage(true) + .isRequired(true) .build(); EXTENSIONAPI static constexpr auto TargetNodeID = core::PropertyDefinitionBuilder<>::createProperty("Target node ID") .withDescription("ID of target node.") .supportsExpressionLanguage(true) + .isRequired(true) .build(); EXTENSIONAPI static constexpr auto TargetNodeBrowseName = core::PropertyDefinitionBuilder<>::createProperty("Target node browse name") .withDescription("Browse name of target node. Only used when new node is created.") .supportsExpressionLanguage(true) .build(); EXTENSIONAPI static constexpr auto TargetNodeNameSpaceIndex = core::PropertyDefinitionBuilder<>::createProperty("Target node namespace index") - .withDescription("The index of the namespace. Used only if node ID type is not path.") + .withDescription("The index of the namespace of the target node.") .supportsExpressionLanguage(true) + .withDefaultValue("0") + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto CreateNodeReferenceType = core::PropertyDefinitionBuilder<4>::createProperty("Create node reference type") + .withDescription("Reference type used when a new node is created.") + .withAllowedValues({"Organizes", "HasComponent", "HasProperty", "HasSubtype"}) + .withDefaultValue("HasComponent") .build(); EXTENSIONAPI static constexpr auto Properties = utils::array_cat(BaseOPCProcessor::Properties, std::to_array({ ParentNodeIDType, @@ -84,7 +96,8 @@ class PutOPCProcessor : public BaseOPCProcessor { TargetNodeIDType, TargetNodeID, TargetNodeBrowseName, - TargetNodeNameSpaceIndex + TargetNodeNameSpaceIndex, + CreateNodeReferenceType })); @@ -109,12 +122,14 @@ class PutOPCProcessor : public BaseOPCProcessor { void initialize() override; private: - std::string nodeID_; - int32_t nameSpaceIdx_{}; - opc::OPCNodeIDType idType_{}; - UA_NodeId parentNodeID_{}; - bool parentExists_{}; - opc::OPCNodeDataType nodeDataType_{}; + bool readParentNodeId(); + nonstd::expected, std::string> configureTargetNode(core::ProcessContext& context, core::FlowFile& flow_file) const; + void updateNode(const UA_NodeId& target_node, const std::string& contentstr, core::ProcessSession& session, const std::shared_ptr& flow_file) const; + void createNode(const UA_NodeId& target_node, const std::string& contentstr, core::ProcessContext& context, core::ProcessSession& session, const std::shared_ptr& flow_file) const; + + UA_NodeId parent_node_id_{}; + opc::OPCNodeDataType node_data_type_{}; + UA_UInt32 create_node_reference_type_ = UA_NS0ID_HASCOMPONENT; }; } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp index 878a909e52..77496c3851 100644 --- a/extensions/opc/src/fetchopc.cpp +++ b/extensions/opc/src/fetchopc.cpp @@ -27,161 +27,142 @@ #include "utils/StringUtils.h" #include "utils/Enum.h" #include "core/ProcessContext.h" +#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { - void FetchOPCProcessor::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); - } +void FetchOPCProcessor::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} - void FetchOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) { - logger_->log_trace("FetchOPCProcessor::onSchedule"); +void FetchOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) { + logger_->log_trace("FetchOPCProcessor::onSchedule"); - translatedNodeIDs_.clear(); // Path might has changed during restart + translated_node_ids_.clear(); // Path might has changed during restart - BaseOPCProcessor::onSchedule(context, factory); + BaseOPCProcessor::onSchedule(context, factory); - std::string value; - context.getProperty(NodeID, nodeID_); - context.getProperty(NodeIDType, value); + context.getProperty(NodeID, node_id_); - maxDepth_ = 0; - context.getProperty(MaxDepth, maxDepth_); + max_depth_ = 0; + context.getProperty(MaxDepth, max_depth_); - if (value == "String") { - idType_ = opc::OPCNodeIDType::String; - } else if (value == "Int") { - idType_ = opc::OPCNodeIDType::Int; - } else if (value == "Path") { - idType_ = opc::OPCNodeIDType::Path; - } else { - // Where have our validators gone? - auto error_msg = utils::string::join_pack(value, " is not a valid node ID type!"); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); - } + parseIdType(context, NodeIDType); - if (idType_ == opc::OPCNodeIDType::Int) { - try { - // ensure that nodeID_ can be parsed as an int - static_cast(std::stoi(nodeID_)); - } catch(...) { - auto error_msg = utils::string::join_pack(nodeID_, " cannot be used as an int type node ID"); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); - } - } - if (idType_ != opc::OPCNodeIDType::Path) { - if (!context.getProperty(NameSpaceIndex, nameSpaceIdx_)) { - auto error_msg = utils::string::join_pack(NameSpaceIndex.name, " is mandatory in case ", NodeIDType.name, " is not Path"); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); - } - } + context.getProperty(NameSpaceIndex, namespace_idx_); + + lazy_mode_ = utils::parseEnumProperty(context, Lazy) == LazyModeOptions::On; - context.getProperty(Lazy, value); - lazy_mode_ = value == "On"; + if (id_type_ == opc::OPCNodeIDType::Path) { + readPathReferenceTypes(context, node_id_); } +} - void FetchOPCProcessor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { - logger_->log_trace("FetchOPCProcessor::onTrigger"); +void FetchOPCProcessor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + logger_->log_trace("FetchOPCProcessor::onTrigger"); - if (!reconnect()) { + if (!reconnect()) { + yield(); + return; + } + + size_t nodes_found = 0; + size_t variables_found = 0; + + auto found_cb = [this, &context, &session, &nodes_found, &variables_found](const UA_ReferenceDescription* ref, const std::string& path) { + return nodeFoundCallBack(ref, path, context, session, nodes_found, variables_found); }; + + if (id_type_ != opc::OPCNodeIDType::Path) { + UA_NodeId my_id; + my_id.namespaceIndex = namespace_idx_; + if (id_type_ == opc::OPCNodeIDType::Int) { + my_id.identifierType = UA_NODEIDTYPE_NUMERIC; + my_id.identifier.numeric = std::stoi(node_id_); // NOLINT(cppcoreguidelines-pro-type-union-access) + } else if (id_type_ == opc::OPCNodeIDType::String) { + my_id.identifierType = UA_NODEIDTYPE_STRING; + my_id.identifier.string = UA_STRING_ALLOC(node_id_.c_str()); // NOLINT(cppcoreguidelines-pro-type-union-access) + } else { + logger_->log_error("Unhandled id type: '{}'. No flowfiles are generated.", magic_enum::enum_underlying(id_type_)); yield(); return; } - - nodesFound_ = 0; - variablesFound_ = 0; - - auto f = [this, &context, &session](opc::Client& client, const UA_ReferenceDescription* ref, const std::string& path) { return nodeFoundCallBack(client, ref, path, context, session); }; - if (idType_ != opc::OPCNodeIDType::Path) { - UA_NodeId myID; - myID.namespaceIndex = nameSpaceIdx_; - if (idType_ == opc::OPCNodeIDType::Int) { - myID.identifierType = UA_NODEIDTYPE_NUMERIC; - myID.identifier.numeric = std::stoi(nodeID_); // NOLINT(cppcoreguidelines-pro-type-union-access) - } else if (idType_ == opc::OPCNodeIDType::String) { - myID.identifierType = UA_NODEIDTYPE_STRING; - myID.identifier.string = UA_STRING_ALLOC(nodeID_.c_str()); // NOLINT(cppcoreguidelines-pro-type-union-access) - } else { - logger_->log_error("Unhandled id type: '{}'. No flowfiles are generated.", magic_enum::enum_underlying(idType_)); + connection_->traverse(my_id, found_cb, "", max_depth_); + } else { + if (translated_node_ids_.empty()) { + auto sc = connection_->translateBrowsePathsToNodeIdsRequest(node_id_, translated_node_ids_, namespace_idx_, path_reference_types_, logger_); + if (sc != UA_STATUSCODE_GOOD) { + logger_->log_error("Failed to translate {} to node id, no flow files will be generated ({})", node_id_.c_str(), UA_StatusCode_name(sc)); yield(); return; } - connection_->traverse(myID, f, "", maxDepth_); - } else { - if (translatedNodeIDs_.empty()) { - auto sc = connection_->translateBrowsePathsToNodeIdsRequest(nodeID_, translatedNodeIDs_, logger_); - if (sc != UA_STATUSCODE_GOOD) { - logger_->log_error("Failed to translate {} to node id, no flow files will be generated ({})", nodeID_.c_str(), UA_StatusCode_name(sc)); - yield(); - return; - } - } - for (auto& nodeID : translatedNodeIDs_) { - connection_->traverse(nodeID, f, nodeID_, maxDepth_); - } } - if (nodesFound_ == 0) { - logger_->log_warn("Connected to OPC server, but no variable nodes were found. Configuration might be incorrect! Yielding..."); - yield(); - } else if (variablesFound_ == 0) { - logger_->log_warn("Found no variables when traversing the specified node. No flowfiles are generated. Yielding..."); - yield(); + for (auto& node_id : translated_node_ids_) { + connection_->traverse(node_id, found_cb, node_id_, max_depth_); } } + if (nodes_found == 0) { + logger_->log_warn("Connected to OPC server, but no variable nodes were found. Configuration might be incorrect! Yielding..."); + yield(); + } else if (variables_found == 0) { + logger_->log_warn("Found no variables when traversing the specified node. No flowfiles are generated. Yielding..."); + yield(); + } +} - bool FetchOPCProcessor::nodeFoundCallBack(opc::Client& /*client*/, const UA_ReferenceDescription *ref, const std::string& path, - core::ProcessContext& context, core::ProcessSession& session) { - nodesFound_++; - if (ref->nodeClass == UA_NODECLASS_VARIABLE) { - try { - opc::NodeData nodedata = connection_->getNodeData(ref, path); - bool write = true; - if (lazy_mode_) { - write = false; - std::string nodeid = nodedata.attributes["Full path"]; - std::string cur_timestamp = node_timestamp_[nodeid]; - std::string new_timestamp = nodedata.attributes["Sourcetimestamp"]; - if (cur_timestamp != new_timestamp) { - node_timestamp_[nodeid] = new_timestamp; - logger_->log_debug("Node {} has new source timestamp {}", nodeid, new_timestamp); - write = true; - } - } - if (write) { - OPCData2FlowFile(nodedata, context, session); - variablesFound_++; - } - } catch (const std::exception& exception) { - std::string browse_name(reinterpret_cast(ref->browseName.name.data), ref->browseName.name.length); - logger_->log_warn("Caught Exception while trying to get data from node {}: {}", path + "/" + browse_name, exception.what()); - } - } +bool FetchOPCProcessor::nodeFoundCallBack(const UA_ReferenceDescription *ref, const std::string& path, + core::ProcessContext& context, core::ProcessSession& session, size_t& nodes_found, size_t& variables_found) { + ++nodes_found; + if (ref->nodeClass != UA_NODECLASS_VARIABLE) { return true; } - - void FetchOPCProcessor::OPCData2FlowFile(const opc::NodeData& opcnode, core::ProcessContext&, core::ProcessSession& session) { - auto flowFile = session.create(); - if (flowFile == nullptr) { - logger_->log_error("Failed to create flowfile!"); - return; + try { + opc::NodeData nodedata = connection_->getNodeData(ref, path); + bool write = true; + if (lazy_mode_) { + write = false; + std::string nodeid = nodedata.attributes["Full path"]; + std::string cur_timestamp = node_timestamp_[nodeid]; + std::string new_timestamp = nodedata.attributes["Sourcetimestamp"]; + if (cur_timestamp != new_timestamp) { + node_timestamp_[nodeid] = new_timestamp; + logger_->log_debug("Node {} has new source timestamp {}", nodeid, new_timestamp); + write = true; + } } - for (const auto& attr : opcnode.attributes) { - flowFile->setAttribute(attr.first, attr.second); + if (write) { + OPCData2FlowFile(nodedata, context, session); + ++variables_found; } - if (!opcnode.data.empty()) { - try { - session.writeBuffer(flowFile, opc::nodeValue2String(opcnode)); - } catch (const std::exception& e) { - std::string browsename; - flowFile->getAttribute("Browsename", browsename); - logger_->log_info("Failed to extract data of OPC node {}: {}", browsename, e.what()); - session.transfer(flowFile, Failure); - return; - } + } catch (const std::exception& exception) { + std::string browse_name(reinterpret_cast(ref->browseName.name.data), ref->browseName.name.length); + logger_->log_warn("Caught Exception while trying to get data from node {}: {}", path + "/" + browse_name, exception.what()); + } + return true; +} + +void FetchOPCProcessor::OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext&, core::ProcessSession& session) { + auto flow_file = session.create(); + if (flow_file == nullptr) { + logger_->log_error("Failed to create flowfile!"); + return; + } + for (const auto& attr : opc_node.attributes) { + flow_file->setAttribute(attr.first, attr.second); + } + if (!opc_node.data.empty()) { + try { + session.writeBuffer(flow_file, opc::nodeValue2String(opc_node)); + } catch (const std::exception& e) { + std::string browsename; + flow_file->getAttribute("Browsename", browsename); + logger_->log_info("Failed to extract data of OPC node {}: {}", browsename, e.what()); + session.transfer(flow_file, Failure); + return; } - session.transfer(flowFile, Success); } + session.transfer(flow_file, Success); +} REGISTER_RESOURCE(FetchOPCProcessor, Processor); diff --git a/extensions/opc/src/opc.cpp b/extensions/opc/src/opc.cpp index ffc466e828..3332b5daa0 100644 --- a/extensions/opc/src/opc.cpp +++ b/extensions/opc/src/opc.cpp @@ -109,44 +109,44 @@ core::logging::LOG_LEVEL MapOPCLogLevel(UA_LogLevel ualvl) { * End of internal functions */ -Client::Client(const std::shared_ptr& logger, const std::string& applicationURI, - const std::vector& certBuffer, const std::vector& keyBuffer, - const std::vector>& trustBuffers) +Client::Client(const std::shared_ptr& logger, const std::string& application_uri, + const std::vector& cert_buffer, const std::vector& key_buffer, + const std::vector>& trust_buffers) : client_(UA_Client_new()) { - if (certBuffer.empty()) { + if (cert_buffer.empty()) { UA_ClientConfig_setDefault(UA_Client_getConfig(client_)); } else { UA_ClientConfig *cc = UA_Client_getConfig(client_); cc->securityMode = UA_MESSAGESECURITYMODE_SIGNANDENCRYPT; // Certificate - UA_ByteString certByteString = UA_STRING_NULL; - certByteString.length = certBuffer.size(); - certByteString.data = reinterpret_cast(UA_malloc(certByteString.length * sizeof(UA_Byte))); // NOLINT(cppcoreguidelines-owning-memory) - memcpy(certByteString.data, certBuffer.data(), certByteString.length); + UA_ByteString cert_byte_string = UA_STRING_NULL; + cert_byte_string.length = cert_buffer.size(); + cert_byte_string.data = reinterpret_cast(UA_malloc(cert_byte_string.length * sizeof(UA_Byte))); // NOLINT(cppcoreguidelines-owning-memory) + memcpy(cert_byte_string.data, cert_buffer.data(), cert_byte_string.length); // Key - UA_ByteString keyByteString = UA_STRING_NULL; - keyByteString.length = keyBuffer.size(); - keyByteString.data = reinterpret_cast(UA_malloc(keyByteString.length * sizeof(UA_Byte))); // NOLINT(cppcoreguidelines-owning-memory) - memcpy(keyByteString.data, keyBuffer.data(), keyByteString.length); + UA_ByteString key_byte_string = UA_STRING_NULL; + key_byte_string.length = key_buffer.size(); + key_byte_string.data = reinterpret_cast(UA_malloc(key_byte_string.length * sizeof(UA_Byte))); // NOLINT(cppcoreguidelines-owning-memory) + memcpy(key_byte_string.data, key_buffer.data(), key_byte_string.length); // Trusted certificates - std::vector trustList; - trustList.resize(trustBuffers.size()); - for (size_t i = 0; i < trustBuffers.size(); i++) { - trustList[i] = UA_STRING_NULL; - trustList[i].length = trustBuffers[i].size(); - trustList[i].data = reinterpret_cast(UA_malloc(trustList[i].length * sizeof(UA_Byte))); // NOLINT(cppcoreguidelines-owning-memory) - memcpy(trustList[i].data, trustBuffers[i].data(), trustList[i].length); + std::vector trust_list; + trust_list.resize(trust_buffers.size()); + for (size_t i = 0; i < trust_buffers.size(); i++) { + trust_list[i] = UA_STRING_NULL; + trust_list[i].length = trust_buffers[i].size(); + trust_list[i].data = reinterpret_cast(UA_malloc(trust_list[i].length * sizeof(UA_Byte))); // NOLINT(cppcoreguidelines-owning-memory) + memcpy(trust_list[i].data, trust_buffers[i].data(), trust_list[i].length); } - UA_StatusCode sc = UA_ClientConfig_setDefaultEncryption(cc, certByteString, keyByteString, - trustList.data(), trustBuffers.size(), + UA_StatusCode sc = UA_ClientConfig_setDefaultEncryption(cc, cert_byte_string, key_byte_string, + trust_list.data(), trust_buffers.size(), nullptr, 0); - UA_ByteString_clear(&certByteString); - UA_ByteString_clear(&keyByteString); - for (size_t i = 0; i < trustBuffers.size(); i++) { - UA_ByteString_clear(&trustList[i]); + UA_ByteString_clear(&cert_byte_string); + UA_ByteString_clear(&key_byte_string); + for (size_t i = 0; i < trust_buffers.size(); i++) { + UA_ByteString_clear(&trust_list[i]); } if (sc != UA_STATUSCODE_GOOD) { logger->log_error("Configuring the client for encryption failed: {}", UA_StatusCode_name(sc)); @@ -155,14 +155,14 @@ Client::Client(const std::shared_ptr& logger, const std:: } } - const UA_Logger MinifiUALogger = {logFunc, logger.get(), [](void*){}}; + minifi_ua_logger_ = {logFunc, logger.get(), [](UA_Logger*){}}; - UA_ClientConfig *configPtr = UA_Client_getConfig(client_); - configPtr->logger = MinifiUALogger; + UA_ClientConfig *config_ptr = UA_Client_getConfig(client_); + config_ptr->logging = &minifi_ua_logger_; - if (applicationURI.length() > 0) { - UA_String_clear(&configPtr->clientDescription.applicationUri); - configPtr->clientDescription.applicationUri = UA_STRING_ALLOC(applicationURI.c_str()); + if (application_uri.length() > 0) { + UA_String_clear(&config_ptr->clientDescription.applicationUri); + config_ptr->clientDescription.applicationUri = UA_STRING_ALLOC(application_uri.c_str()); } logger_ = logger; @@ -202,7 +202,7 @@ UA_StatusCode Client::connect(const std::string& url, const std::string& usernam } } -NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const std::string& basePath) { +NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const std::string& base_path) { if (ref->nodeClass == UA_NODECLASS_VARIABLE) { opc::NodeData nodedata; std::string browsename(reinterpret_cast(ref->browseName.name.data), ref->browseName.name.length); @@ -222,8 +222,14 @@ NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const std::stri nodedata.attributes["NodeID type"] = "numeric"; } nodedata.attributes["Browsename"] = browsename; - nodedata.attributes["Full path"] = basePath + "/" + browsename; - nodedata.dataTypeID = static_cast(UA_DATATYPEKINDS); + + auto splitted_base_path = utils::string::splitAndTrimRemovingEmpty(base_path, "/"); + if (!splitted_base_path.empty() && splitted_base_path.back() == browsename) { + nodedata.attributes["Full path"] = base_path; + } else { + nodedata.attributes["Full path"] = base_path + "/" + browsename; + } + nodedata.data_type_id = static_cast(UA_DATATYPEKINDS); UA_Variant* var = UA_Variant_new(); if (UA_Client_readValueAttribute(client_, ref->nodeId.nodeId, var) == UA_STATUSCODE_GOOD && var->type != nullptr && var->data != nullptr) { // Because the timestamps are eliminated in readValueAttribute for simplification @@ -245,7 +251,7 @@ NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const std::stri nodedata.attributes["Sourcetimestamp"] = source_timestamp; UA_ReadResponse_clear(&response); - nodedata.dataTypeID = static_cast(var->type->typeKind); + nodedata.data_type_id = static_cast(var->type->typeKind); nodedata.addVariant(var); if (var->type->typeName) { nodedata.attributes["Typename"] = std::string(var->type->typeName); @@ -264,59 +270,60 @@ NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const std::stri } } -UA_ReferenceDescription * Client::getNodeReference(UA_NodeId nodeId) { +UA_ReferenceDescription * Client::getNodeReference(UA_NodeId node_id) { UA_ReferenceDescription *ref = UA_ReferenceDescription_new(); UA_ReferenceDescription_init(ref); - UA_NodeId_copy(&nodeId, &ref->nodeId.nodeId); - auto sc = UA_Client_readNodeClassAttribute(client_, nodeId, &ref->nodeClass); + UA_NodeId_copy(&node_id, &ref->nodeId.nodeId); + auto sc = UA_Client_readNodeClassAttribute(client_, node_id, &ref->nodeClass); if (sc == UA_STATUSCODE_GOOD) { - sc = UA_Client_readBrowseNameAttribute(client_, nodeId, &ref->browseName); + sc = UA_Client_readBrowseNameAttribute(client_, node_id, &ref->browseName); } if (sc == UA_STATUSCODE_GOOD) { - UA_Client_readDisplayNameAttribute(client_, nodeId, &ref->displayName); + UA_Client_readDisplayNameAttribute(client_, node_id, &ref->displayName); } return ref; } -void Client::traverse(UA_NodeId nodeId, const std::function& cb, const std::string& basePath, uint64_t maxDepth, bool fetchRoot) { - if (fetchRoot) { - UA_ReferenceDescription *rootRef = getNodeReference(nodeId); +void Client::traverse(UA_NodeId node_id, const std::function& cb, const std::string& base_path, uint64_t max_depth, bool fetch_root) { + if (fetch_root) { + UA_ReferenceDescription *rootRef = getNodeReference(node_id); if ((rootRef->nodeClass == UA_NODECLASS_VARIABLE || rootRef->nodeClass == UA_NODECLASS_OBJECT) && rootRef->browseName.name.length > 0) { - cb(*this, rootRef, basePath); + cb(rootRef, base_path); } UA_ReferenceDescription_delete(rootRef); } - if (maxDepth != 0) { - maxDepth--; - if (maxDepth == 0) { + if (max_depth != 0) { + max_depth--; + if (max_depth == 0) { return; } } - UA_BrowseRequest bReq; - UA_BrowseRequest_init(&bReq); - bReq.requestedMaxReferencesPerNode = 0; - bReq.nodesToBrowse = UA_BrowseDescription_new(); - bReq.nodesToBrowseSize = 1; + UA_BrowseRequest browse_request; + UA_BrowseRequest_init(&browse_request); + browse_request.requestedMaxReferencesPerNode = 0; + browse_request.nodesToBrowse = UA_BrowseDescription_new(); + browse_request.nodesToBrowseSize = 1; - UA_NodeId_copy(&nodeId, &bReq.nodesToBrowse[0].nodeId); - bReq.nodesToBrowse[0].resultMask = UA_BROWSERESULTMASK_ALL; + UA_NodeId_copy(&node_id, &browse_request.nodesToBrowse[0].nodeId); + browse_request.nodesToBrowse[0].resultMask = UA_BROWSERESULTMASK_ALL; - UA_BrowseResponse bResp = UA_Client_Service_browse(client_, bReq); + UA_BrowseResponse browse_response = UA_Client_Service_browse(client_, browse_request); - const auto guard = gsl::finally([&bResp]() { - UA_BrowseResponse_clear(&bResp); + const auto guard = gsl::finally([&browse_response]() { + UA_BrowseResponse_clear(&browse_response); }); - UA_BrowseRequest_clear(&bReq); + UA_BrowseRequest_clear(&browse_request); - for (size_t i = 0; i < bResp.resultsSize; ++i) { - for (size_t j = 0; j < bResp.results[i].referencesSize; ++j) { - UA_ReferenceDescription *ref = &(bResp.results[i].references[j]); - if (cb(*this, ref, basePath)) { + for (size_t i = 0; i < browse_response.resultsSize; ++i) { + for (size_t j = 0; j < browse_response.results[i].referencesSize; ++j) { + UA_ReferenceDescription *ref = &(browse_response.results[i].references[j]); + if (cb(ref, base_path)) { if (ref->nodeClass == UA_NODECLASS_VARIABLE || ref->nodeClass == UA_NODECLASS_OBJECT) { - std::string browse_name(reinterpret_cast(ref->browseName.name.data), ref->browseName.name.length); - traverse(ref->nodeId.nodeId, cb, basePath + browse_name, maxDepth, false); + std::string new_base_path = base_path; + new_base_path.append("/").append(reinterpret_cast(ref->browseName.name.data), ref->browseName.name.length); + traverse(ref->nodeId.nodeId, cb, new_base_path, max_depth, false); } } else { return; @@ -325,72 +332,80 @@ void Client::traverse(UA_NodeId nodeId, const std::function bool { + auto callback = [&retval](const UA_ReferenceDescription* /*ref*/, const std::string& /*pat*/) -> bool { retval = true; return false; // If any node is found, the given node exists, so traverse can be stopped }; - traverse(nodeId, callback, "", 1); + traverse(node_id, callback, "", 1); return retval; } -UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string& path, std::vector& foundNodeIDs, const std::shared_ptr& logger) { - logger->log_trace("Trying to find node id for {}", path.c_str()); +UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string& path, std::vector& found_node_ids, int32_t namespace_index, + const std::vector& path_reference_types, const std::shared_ptr& logger) { + logger->log_trace("Trying to find node ids for {}", path.c_str()); - auto tokens = utils::string::split(path, "/"); - std::vector ids; - for (size_t i = 0; i < tokens.size(); ++i) { - UA_UInt32 val = (i ==0) ? UA_NS0ID_ORGANIZES : UA_NS0ID_HASCOMPONENT; - ids.push_back(val); - } + auto tokens = utils::string::splitAndTrimRemovingEmpty(path, "/"); - UA_BrowsePath browsePath; - UA_BrowsePath_init(&browsePath); - browsePath.startingNode = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER); + UA_BrowsePath browse_path; + UA_BrowsePath_init(&browse_path); + browse_path.startingNode = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER); - browsePath.relativePath.elements = reinterpret_cast(UA_Array_new(tokens.size(), &UA_TYPES[UA_TYPES_RELATIVEPATHELEMENT])); - browsePath.relativePath.elementsSize = tokens.size(); + browse_path.relativePath.elements = reinterpret_cast(UA_Array_new(tokens.size(), &UA_TYPES[UA_TYPES_RELATIVEPATHELEMENT])); + browse_path.relativePath.elementsSize = tokens.size(); + + std::vector ids; + ids.push_back(UA_NS0ID_ORGANIZES); + for (size_t i = 0; i < tokens.size() - 1; ++i) { + if (!path_reference_types.empty()) { + ids.push_back(path_reference_types[i]); + } else { + ids.push_back(UA_NS0ID_ORGANIZES); + } + } for (size_t i = 0; i < tokens.size(); ++i) { - UA_RelativePathElement *elem = &browsePath.relativePath.elements[i]; + UA_RelativePathElement *elem = &browse_path.relativePath.elements[i]; elem->referenceTypeId = UA_NODEID_NUMERIC(0, ids[i]); - elem->targetName = UA_QUALIFIEDNAME_ALLOC(0, tokens[i].c_str()); + elem->targetName = UA_QUALIFIEDNAME_ALLOC(namespace_index, tokens[i].c_str()); } UA_TranslateBrowsePathsToNodeIdsRequest request; UA_TranslateBrowsePathsToNodeIdsRequest_init(&request); - request.browsePaths = &browsePath; + request.browsePaths = &browse_path; request.browsePathsSize = 1; UA_TranslateBrowsePathsToNodeIdsResponse response = UA_Client_Service_translateBrowsePathsToNodeIds(client_, request); - const auto guard = gsl::finally([&browsePath]() { - UA_BrowsePath_clear(&browsePath); + const auto guard = gsl::finally([&browse_path, &tokens, &response]() { + for (size_t i = 0; i < tokens.size(); ++i) { + UA_RelativePathElement *elem = &browse_path.relativePath.elements[i]; + UA_QualifiedName_clear(&elem->targetName); + } + UA_BrowsePath_clear(&browse_path); + UA_TranslateBrowsePathsToNodeIdsResponse_clear(&response); }); - if (response.resultsSize < 1) { + if (response.resultsSize < 1 || response.results[0].statusCode != UA_STATUSCODE_GOOD) { logger->log_warn("No node id in response for {}", path.c_str()); return UA_STATUSCODE_BADNODATAAVAILABLE; } - bool foundData = false; + bool found_data = false; for (size_t i = 0; i < response.resultsSize; ++i) { UA_BrowsePathResult res = response.results[i]; for (size_t j = 0; j < res.targetsSize; ++j) { - foundData = true; + found_data = true; UA_NodeId resultId; UA_NodeId_copy(&res.targets[j].targetId.nodeId, &resultId); - foundNodeIDs.push_back(resultId); - std::string namespaceUri(reinterpret_cast(res.targets[j].targetId.namespaceUri.data), res.targets[j].targetId.namespaceUri.length); + found_node_ids.push_back(resultId); } } - UA_TranslateBrowsePathsToNodeIdsResponse_clear(&response); - - if (foundData) { - logger->log_debug("Found {} nodes for path {}", foundNodeIDs.size(), path.c_str()); + if (found_data) { + logger->log_debug("Found {} nodes for path {}", found_node_ids.size(), path.c_str()); return UA_STATUSCODE_GOOD; } else { logger->log_warn("No node id found for path {}", path.c_str()); @@ -399,67 +414,74 @@ UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string& pa } template -UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, T value, UA_NodeId *receivedNodeId) { +UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, T value, UA_NodeId *received_node_id) { UA_VariableAttributes attr = UA_VariableAttributes_default; add_value_to_variant(&attr.value, value); char local[6] = "en-US"; // NOLINT(cppcoreguidelines-avoid-c-arrays) - attr.displayName = UA_LOCALIZEDTEXT(local, const_cast(browseName.data())); + attr.displayName = UA_LOCALIZEDTEXT(local, const_cast(browse_name.data())); UA_StatusCode sc = UA_Client_addVariableNode(client_, - targetNodeId, - parentNodeId, - UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), - UA_QUALIFIEDNAME(1, const_cast(browseName.data())), + target_node_id, + parent_node_id, + UA_NODEID_NUMERIC(0, ref_type_id), + UA_QUALIFIEDNAME(target_node_id.namespaceIndex, const_cast(browse_name.data())), UA_NODEID_NULL, - attr, receivedNodeId); + attr, received_node_id); UA_Variant_clear(&attr.value); return sc; } template -UA_StatusCode Client::update_node(const UA_NodeId nodeId, T value) { +UA_StatusCode Client::update_node(const UA_NodeId node_id, T value) { UA_Variant *variant = UA_Variant_new(); add_value_to_variant(variant, value); - UA_StatusCode sc = UA_Client_writeValueAttribute(client_, nodeId, variant); + UA_StatusCode sc = UA_Client_writeValueAttribute(client_, node_id, variant); UA_Variant_delete(variant); return sc; } -std::unique_ptr Client::createClient(const std::shared_ptr& logger, const std::string& applicationURI, - const std::vector& certBuffer, const std::vector& keyBuffer, - const std::vector>& trustBuffers) { +std::unique_ptr Client::createClient(const std::shared_ptr& logger, const std::string& application_uri, + const std::vector& cert_buffer, const std::vector& key_buffer, + const std::vector>& trust_buffers) { try { - return ClientPtr(new Client(logger, applicationURI, certBuffer, keyBuffer, trustBuffers)); + return ClientPtr(new Client(logger, application_uri, cert_buffer, key_buffer, trust_buffers)); } catch (const std::exception& exception) { logger->log_error("Failed to create client: {}", exception.what()); } return nullptr; } -template UA_StatusCode Client::update_node(const UA_NodeId nodeId, int64_t value); -template UA_StatusCode Client::update_node(const UA_NodeId nodeId, uint64_t value); -template UA_StatusCode Client::update_node(const UA_NodeId nodeId, int32_t value); -template UA_StatusCode Client::update_node(const UA_NodeId nodeId, uint32_t value); -template UA_StatusCode Client::update_node(const UA_NodeId nodeId, float value); -template UA_StatusCode Client::update_node(const UA_NodeId nodeId, double value); -template UA_StatusCode Client::update_node(const UA_NodeId nodeId, bool value); -template UA_StatusCode Client::update_node(const UA_NodeId nodeId, const char * value); -template UA_StatusCode Client::update_node(const UA_NodeId nodeId, std::string value); - -template UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, int64_t value, UA_NodeId *receivedNodeId); -template UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, uint64_t value, UA_NodeId *receivedNodeId); -template UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, int32_t value, UA_NodeId *receivedNodeId); -template UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, uint32_t value, UA_NodeId *receivedNodeId); -template UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, float value, UA_NodeId *receivedNodeId); -template UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, double value, UA_NodeId *receivedNodeId); -template UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, bool value, UA_NodeId *receivedNodeId); -template UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, - const char * value, UA_NodeId *receivedNodeId); -template UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string_view browseName, - std::string value, UA_NodeId *receivedNodeId); +template UA_StatusCode Client::update_node(const UA_NodeId node_id, int64_t value); +template UA_StatusCode Client::update_node(const UA_NodeId node_id, uint64_t value); +template UA_StatusCode Client::update_node(const UA_NodeId node_id, int32_t value); +template UA_StatusCode Client::update_node(const UA_NodeId node_id, uint32_t value); +template UA_StatusCode Client::update_node(const UA_NodeId node_id, float value); +template UA_StatusCode Client::update_node(const UA_NodeId node_id, double value); +template UA_StatusCode Client::update_node(const UA_NodeId node_id, bool value); +template UA_StatusCode Client::update_node(const UA_NodeId node_id, const char * value); +template UA_StatusCode Client::update_node(const UA_NodeId node_id, std::string value); + +template UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, + int64_t value, UA_NodeId *received_node_id); +template UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, + uint64_t value, UA_NodeId *received_node_id); +template UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, + int32_t value, UA_NodeId *received_node_id); +template UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, + uint32_t value, UA_NodeId *received_node_id); +template UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, + float value, UA_NodeId *received_node_id); +template UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, + double value, UA_NodeId *received_node_id); +template UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, + bool value, UA_NodeId *received_node_id); +template UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, + const char * value, UA_NodeId *received_node_id); +template UA_StatusCode Client::add_node(const UA_NodeId parent_node_id, const UA_NodeId target_node_id, const UA_UInt32 ref_type_id, std::string_view browse_name, + std::string value, UA_NodeId *received_node_id); std::string nodeValue2String(const NodeData& nd) { std::string ret_val; - switch (nd.dataTypeID) { + switch (nd.data_type_id) { case UA_DATATYPEKIND_STRING: case UA_DATATYPEKIND_LOCALIZEDTEXT: case UA_DATATYPEKIND_BYTESTRING: { @@ -569,4 +591,18 @@ void logFunc(void *context, UA_LogLevel level, UA_LogCategory /*category*/, cons loggerPtr->log_string(MapOPCLogLevel(level), buffer.data()); } +std::optional mapOpcReferenceType(const std::string& ref_type) { + if (ref_type == "Organizes") { + return UA_NS0ID_ORGANIZES; + } else if (ref_type == "HasComponent") { + return UA_NS0ID_HASCOMPONENT; + } else if (ref_type == "HasProperty") { + return UA_NS0ID_HASPROPERTY; + } else if (ref_type == "HasSubtype") { + return UA_NS0ID_HASSUBTYPE; + } + + return std::nullopt; +} + } // namespace org::apache::nifi::minifi::opc diff --git a/extensions/opc/src/opcbase.cpp b/extensions/opc/src/opcbase.cpp index a0d09255ee..64be81685a 100644 --- a/extensions/opc/src/opcbase.cpp +++ b/extensions/opc/src/opcbase.cpp @@ -24,85 +24,121 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { - void BaseOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - logger_->log_trace("BaseOPCProcessor::onSchedule"); +void BaseOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + logger_->log_trace("BaseOPCProcessor::onSchedule"); - applicationURI_.clear(); - certBuffer_.clear(); - keyBuffer_.clear(); - password_.clear(); - username_.clear(); - trustBuffers_.clear(); + application_uri_.clear(); + cert_buffer_.clear(); + key_buffer_.clear(); + password_.clear(); + username_.clear(); + trust_buffers_.clear(); - context.getProperty(OPCServerEndPoint, endPointURL_); - context.getProperty(ApplicationURI, applicationURI_); + context.getProperty(OPCServerEndPoint, endpoint_url); + context.getProperty(ApplicationURI, application_uri_); + context.getProperty(Username, username_); + context.getProperty(Password, password_); - if (context.getProperty(Username, username_) != context.getProperty(Password, password_)) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Both or neither of Username and Password should be provided!"); - } + if (username_.empty() != password_.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Both or neither of Username and Password should be provided!"); + } - auto certificatePathRes = context.getProperty(CertificatePath, certpath_); - auto keyPathRes = context.getProperty(KeyPath, keypath_); - context.getProperty(TrustedPath, trustpath_); - if (certificatePathRes != keyPathRes) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "All or none of Certificate path and Key path should be provided!"); - } + context.getProperty(CertificatePath, certpath_); + context.getProperty(KeyPath, keypath_); + context.getProperty(TrustedPath, trustpath_); + if (certpath_.empty() != keypath_.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "All or none of Certificate path and Key path should be provided!"); + } - if (certpath_.empty()) { - return; - } - if (applicationURI_.empty()) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Application URI must be provided if Certificate path is provided!"); - } + if (certpath_.empty()) { + return; + } + if (application_uri_.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Application URI must be provided if Certificate path is provided!"); + } - std::ifstream input_cert(certpath_, std::ios::binary); - if (input_cert.good()) { - certBuffer_ = std::vector(std::istreambuf_iterator(input_cert), {}); - } - std::ifstream input_key(keypath_, std::ios::binary); - if (input_key.good()) { - keyBuffer_ = std::vector(std::istreambuf_iterator(input_key), {}); - } + std::ifstream input_cert(certpath_, std::ios::binary); + if (input_cert.good()) { + cert_buffer_ = std::vector(std::istreambuf_iterator(input_cert), {}); + } + std::ifstream input_key(keypath_, std::ios::binary); + if (input_key.good()) { + key_buffer_ = std::vector(std::istreambuf_iterator(input_key), {}); + } - if (certBuffer_.empty()) { - auto error_msg = utils::string::join_pack("Failed to load cert from path: ", certpath_); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); - } - if (keyBuffer_.empty()) { - auto error_msg = utils::string::join_pack("Failed to load key from path: ", keypath_); + if (cert_buffer_.empty()) { + auto error_msg = utils::string::join_pack("Failed to load cert from path: ", certpath_); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); + } + if (key_buffer_.empty()) { + auto error_msg = utils::string::join_pack("Failed to load key from path: ", keypath_); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); + } + + auto trusted_cert_paths = utils::string::splitAndTrimRemovingEmpty(trustpath_, ","); + for (const auto& trust_path : trusted_cert_paths) { + std::ifstream input_trust(trust_path, std::ios::binary); + if (input_trust.good()) { + trust_buffers_.push_back(std::vector(std::istreambuf_iterator(input_trust), {})); + } else { + auto error_msg = utils::string::join_pack("Failed to load trusted server certs from path: ", trust_path); throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } + } +} - if (!trustpath_.empty()) { - std::ifstream input_trust(trustpath_, std::ios::binary); - if (input_trust.good()) { - trustBuffers_.push_back(std::vector(std::istreambuf_iterator(input_trust), {})); - } else { - auto error_msg = utils::string::join_pack("Failed to load trusted server certs from path: ", trustpath_); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); - } - } +bool BaseOPCProcessor::reconnect() { + if (connection_ == nullptr) { + connection_ = opc::Client::createClient(logger_, application_uri_, cert_buffer_, key_buffer_, trust_buffers_); } - bool BaseOPCProcessor::reconnect() { - if (connection_ == nullptr) { - connection_ = opc::Client::createClient(logger_, applicationURI_, certBuffer_, keyBuffer_, trustBuffers_); - } + if (connection_->isConnected()) { + return true; + } - if (connection_->isConnected()) { - return true; + auto sc = connection_->connect(endpoint_url, username_, password_); + if (sc != UA_STATUSCODE_GOOD) { + logger_->log_error("Failed to connect: {}!", UA_StatusCode_name(sc)); + return false; + } + logger_->log_debug("Successfully connected."); + return true; +} + +void BaseOPCProcessor::readPathReferenceTypes(core::ProcessContext& context, const std::string& node_id) { + std::string value; + context.getProperty(PathReferenceTypes, value); + if (value.empty()) { + return; + } + auto path_reference_types = utils::string::splitAndTrimRemovingEmpty(value, "/"); + if (path_reference_types.size() != utils::string::splitAndTrimRemovingEmpty(node_id, "/").size() - 1) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Path reference types must be provided for each node pair in the path!"); + } + for (const auto& reference_type : path_reference_types) { + if (auto ua_ref_type = opc::mapOpcReferenceType(reference_type)) { + path_reference_types_.push_back(*ua_ref_type); + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Unsupported reference type set in 'Path reference types' property: '{}'.", reference_type)); } + } +} - auto sc = connection_->connect(endPointURL_, username_, password_); - if (sc != UA_STATUSCODE_GOOD) { - logger_->log_error("Failed to connect: {}!", UA_StatusCode_name(sc)); - return false; +void BaseOPCProcessor::parseIdType(core::ProcessContext& context, const core::PropertyReference& prop) { + id_type_ = utils::parseEnumProperty(context, prop); + + if (id_type_ == opc::OPCNodeIDType::Int) { + try { + static_cast(std::stoi(node_id_)); + } catch(const std::exception&) { + auto error_msg = utils::string::join_pack(node_id_, " cannot be used as an int type node ID"); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } - logger_->log_debug("Successfully connected."); - return true; } +} } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp index 1ca976a047..404904cb78 100644 --- a/extensions/opc/src/putopc.cpp +++ b/extensions/opc/src/putopc.cpp @@ -24,311 +24,290 @@ #include "core/Resource.h" #include "utils/StringUtils.h" #include "core/ProcessContext.h" +#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { - void PutOPCProcessor::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); - } +void PutOPCProcessor::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void PutOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { + logger_->log_trace("PutOPCProcessor::onSchedule"); - void PutOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { - logger_->log_trace("PutOPCProcessor::onSchedule"); + BaseOPCProcessor::onSchedule(context, session_factory); - parentExists_ = false; + std::string value; + context.getProperty(ParentNodeID, node_id_); - BaseOPCProcessor::onSchedule(context, session_factory); + parseIdType(context, ParentNodeIDType); - std::string value; - context.getProperty(ParentNodeID, nodeID_); - context.getProperty(ParentNodeIDType, value); + context.getProperty(ParentNameSpaceIndex, namespace_idx_); + node_data_type_ = utils::parseEnumProperty(context, ValueType); + + if (id_type_ == opc::OPCNodeIDType::Path) { + readPathReferenceTypes(context, node_id_); + } - if (value == "String") { - idType_ = opc::OPCNodeIDType::String; - } else if (value == "Int") { - idType_ = opc::OPCNodeIDType::Int; - } else if (value == "Path") { - idType_ = opc::OPCNodeIDType::Path; + if (context.getProperty(CreateNodeReferenceType, value)) { + if (auto ref_type = opc::mapOpcReferenceType(value)) { + create_node_reference_type_ = ref_type.value(); } else { - // Where have our validators gone? - auto error_msg = utils::string::join_pack(value, " is not a valid node ID type!"); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); + logger_->log_error("Invalid reference type: {}", value); } + } +} - if (idType_ == opc::OPCNodeIDType::Int) { - try { - // ensure that nodeID_ can be parsed as an int - static_cast(std::stoi(nodeID_)); - } catch(...) { - auto error_msg = utils::string::join_pack(nodeID_, " cannot be used as an int type node ID"); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); - } +bool PutOPCProcessor::readParentNodeId() { + if (id_type_ == opc::OPCNodeIDType::Path) { + std::vector translated_node_ids; + if (connection_->translateBrowsePathsToNodeIdsRequest(node_id_, translated_node_ids, namespace_idx_, path_reference_types_, logger_) != + UA_STATUSCODE_GOOD) { + logger_->log_error("Failed to translate {} to node id, no flow files will be put", node_id_.c_str()); + return false; + } else if (translated_node_ids.size() != 1) { + logger_->log_error("{} was translated to multiple node ids, no flow files will be put", node_id_.c_str()); + return false; + } else { + parent_node_id_ = translated_node_ids[0]; } - if (idType_ != opc::OPCNodeIDType::Path) { - if (!context.getProperty(ParentNameSpaceIndex, nameSpaceIdx_)) { - auto error_msg = utils::string::join_pack(ParentNameSpaceIndex.name, " is mandatory in case ", ParentNodeIDType.name, " is not Path"); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); - } + } else { + parent_node_id_.namespaceIndex = namespace_idx_; + if (id_type_ == opc::OPCNodeIDType::Int) { + parent_node_id_.identifierType = UA_NODEIDTYPE_NUMERIC; + parent_node_id_.identifier.numeric = std::stoi(node_id_); // NOLINT(cppcoreguidelines-pro-type-union-access) + } else { // idType_ == opc::OPCNodeIDType::String + parent_node_id_.identifierType = UA_NODEIDTYPE_STRING; + parent_node_id_.identifier.string = UA_STRING_ALLOC(node_id_.c_str()); // NOLINT(cppcoreguidelines-pro-type-union-access) + } + if (!connection_->exists(parent_node_id_)) { + logger_->log_error("Parent node doesn't exist, no flow files will be put"); + return false; } + } + return true; +} - std::string typestr; - context.getProperty(ValueType, typestr); - nodeDataType_ = utils::at(opc::StringToOPCDataTypeMap, typestr); // This throws, but allowed values are generated based on this map -> that's a really unexpected error +nonstd::expected, std::string> PutOPCProcessor::configureTargetNode(core::ProcessContext& context, core::FlowFile& flow_file) const { + std::string namespaceidx; + if (!context.getProperty(TargetNodeNameSpaceIndex, namespaceidx, &flow_file) || namespaceidx.empty()) { + return nonstd::make_unexpected(fmt::format("Flowfile {} had no target namespace index specified, routing to failure!", flow_file.getUUIDStr())); + } + int32_t nsi = 0; + try { + nsi = std::stoi(namespaceidx); + } catch (const std::exception&) { + return nonstd::make_unexpected(fmt::format("Flowfile {} has invalid namespace index ({}), routing to failure!", + flow_file.getUUIDStr(), namespaceidx)); } - void PutOPCProcessor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { - logger_->log_trace("PutOPCProcessor::onTrigger"); + std::string target_id_type; + if (!context.getProperty(TargetNodeIDType, target_id_type, &flow_file) || target_id_type.empty()) { + return nonstd::make_unexpected(fmt::format("Flowfile {} has invalid target node id type ({}), routing to failure!", + flow_file.getUUIDStr(), target_id_type)); + } - if (!reconnect()) { - yield(); - return; + std::string target_id; + if (!context.getProperty(TargetNodeID, target_id, &flow_file) || target_id.empty()) { + return nonstd::make_unexpected(fmt::format("Flowfile {} had target node ID type specified ({}) without ID, routing to failure!", + flow_file.getUUIDStr(), target_id_type)); + } + + UA_NodeId target_node; + target_node.namespaceIndex = nsi; + if (target_id_type == "Int") { + target_node.identifierType = UA_NODEIDTYPE_NUMERIC; + try { + target_node.identifier.numeric = std::stoi(target_id); // NOLINT(cppcoreguidelines-pro-type-union-access) + } catch (const std::exception&) { + return nonstd::make_unexpected(fmt::format("Flowfile {}: target node ID is not a valid integer: {}. Routing to failure!", + flow_file.getUUIDStr(), target_id)); } + } else if (target_id_type == "String") { + target_node.identifierType = UA_NODEIDTYPE_STRING; + target_node.identifier.string = UA_STRING_ALLOC(target_id.c_str()); // NOLINT(cppcoreguidelines-pro-type-union-access) + } else { + return nonstd::make_unexpected(fmt::format("Flowfile {}: target node ID type is invalid: {}. Routing to failure!", + flow_file.getUUIDStr(), target_id_type)); + } + return std::make_pair(connection_->exists(target_node), target_node); +} - if (!parentExists_) { - if (idType_ == opc::OPCNodeIDType::Path) { - std::vector translatedNodeIDs; - if (connection_->translateBrowsePathsToNodeIdsRequest(nodeID_, translatedNodeIDs, logger_) != - UA_STATUSCODE_GOOD) { - logger_->log_error("Failed to translate {} to node id, no flow files will be put", nodeID_.c_str()); - yield(); - return; - } else if (translatedNodeIDs.size() != 1) { - logger_->log_error("{} was translated to multiple node ids, no flow files will be put", nodeID_.c_str()); - yield(); - return; +void PutOPCProcessor::updateNode(const UA_NodeId& target_node, const std::string& contentstr, core::ProcessSession& session, const std::shared_ptr& flow_file) const { + logger_->log_trace("Node exists, trying to update it"); + try { + UA_StatusCode sc = 0; + switch (node_data_type_) { + case opc::OPCNodeDataType::Int64: { + int64_t value = std::stoll(contentstr); + sc = connection_->update_node(target_node, value); + break; + } + case opc::OPCNodeDataType::UInt64: { + uint64_t value = std::stoull(contentstr); + sc = connection_->update_node(target_node, value); + break; + } + case opc::OPCNodeDataType::Int32: { + int32_t value = std::stoi(contentstr); + sc = connection_->update_node(target_node, value); + break; + } + case opc::OPCNodeDataType::UInt32: { + uint32_t value = std::stoul(contentstr); + sc = connection_->update_node(target_node, value); + break; + } + case opc::OPCNodeDataType::Boolean: { + if (auto contentstr_parsed = utils::string::toBool(contentstr)) { + sc = connection_->update_node(target_node, contentstr_parsed.value()); } else { - parentNodeID_ = translatedNodeIDs[0]; - parentExists_ = true; - } - } else { - parentNodeID_.namespaceIndex = nameSpaceIdx_; - if (idType_ == opc::OPCNodeIDType::Int) { - parentNodeID_.identifierType = UA_NODEIDTYPE_NUMERIC; - parentNodeID_.identifier.numeric = std::stoi(nodeID_); // NOLINT(cppcoreguidelines-pro-type-union-access) - } else { // idType_ == opc::OPCNodeIDType::String - parentNodeID_.identifierType = UA_NODEIDTYPE_STRING; - parentNodeID_.identifier.string = UA_STRING_ALLOC(nodeID_.c_str()); // NOLINT(cppcoreguidelines-pro-type-union-access) + throw std::runtime_error("Content cannot be converted to bool"); } - if (!connection_->exists(parentNodeID_)) { - logger_->log_error("Parent node doesn't exist, no flow files will be put"); - yield(); - return; - } - parentExists_ = true; + break; + } + case opc::OPCNodeDataType::Float: { + float value = std::stof(contentstr); + sc = connection_->update_node(target_node, value); + break; + } + case opc::OPCNodeDataType::Double: { + double value = std::stod(contentstr); + sc = connection_->update_node(target_node, value); + break; + } + case opc::OPCNodeDataType::String: { + sc = connection_->update_node(target_node, contentstr); + break; } + default: + logger_->log_error("Unhandled data type: {}", magic_enum::enum_name(node_data_type_)); + gsl_Assert(false); } - - auto flowFile = session.get(); - - // Do nothing if there are no incoming files - if (!flowFile) { + if (sc != UA_STATUSCODE_GOOD) { + logger_->log_error("Failed to update node: {}", UA_StatusCode_name(sc)); + session.transfer(flow_file, Failure); return; } - std::string targetidtype; - - bool targetNodeExists = false; - bool targetNodeValid = false; - UA_NodeId targetnode; - - if (context.getProperty(TargetNodeIDType, targetidtype, flowFile.get())) { - std::string targetid; - std::string namespaceidx; + logger_->log_trace("Node successfully updated!"); + session.transfer(flow_file, Success); + } catch (const std::exception&) { + logger_->log_error("Failed to convert {} to data type {}", contentstr, magic_enum::enum_name(node_data_type_)); + session.transfer(flow_file, Failure); + } +} +void PutOPCProcessor::createNode(const UA_NodeId& target_node, const std::string& contentstr, core::ProcessContext& context, core::ProcessSession& session, + const std::shared_ptr& flow_file) const { + logger_->log_trace("Node doesn't exist, trying to create new node"); + std::string browse_name; + if (!context.getProperty(TargetNodeBrowseName, browse_name, flow_file.get()) || browse_name.empty()) { + logger_->log_error("Target node browse name is required for flowfile ({}) as new node is to be created", + flow_file->getUUIDStr()); + session.transfer(flow_file, Failure); + return; + } - if (!context.getProperty(TargetNodeID, targetid, flowFile.get())) { - logger_->log_error("Flowfile {} had target node ID type specified ({}) without ID, routing to failure!", - flowFile->getUUIDStr(), targetidtype); - session.transfer(flowFile, Failure); - return; + try { + UA_StatusCode sc = 0; + UA_NodeId result_node; + switch (node_data_type_) { + case opc::OPCNodeDataType::Int64: { + int64_t value = std::stoll(contentstr); + sc = connection_->add_node(parent_node_id_, target_node, create_node_reference_type_, browse_name, value, &result_node); + break; } - - if (!context.getProperty(TargetNodeNameSpaceIndex, namespaceidx, flowFile.get())) { - logger_->log_error("Flowfile {} had target node ID type specified ({}) without namespace index, routing to failure!", flowFile->getUUIDStr(), targetidtype); - session.transfer(flowFile, Failure); - return; + case opc::OPCNodeDataType::UInt64: { + uint64_t value = std::stoull(contentstr); + sc = connection_->add_node(parent_node_id_, target_node, create_node_reference_type_, browse_name, value, &result_node); + break; } - int32_t nsi = 0; - try { - nsi = std::stoi(namespaceidx); - } catch (...) { - logger_->log_error("Flowfile {} has invalid namespace index ({}), routing to failure!", - flowFile->getUUIDStr(), namespaceidx); - session.transfer(flowFile, Failure); - return; + case opc::OPCNodeDataType::Int32: { + int32_t value = std::stoi(contentstr); + sc = connection_->add_node(parent_node_id_, target_node, create_node_reference_type_, browse_name, value, &result_node); + break; } - - targetnode.namespaceIndex = nsi; - if (targetidtype == "Int") { - targetnode.identifierType = UA_NODEIDTYPE_NUMERIC; - try { - targetnode.identifier.numeric = std::stoi(targetid); // NOLINT(cppcoreguidelines-pro-type-union-access) - targetNodeValid = true; - } catch (...) { - logger_->log_error("Flowfile {}: target node ID is not a valid integer: {}. Routing to failure!", - flowFile->getUUIDStr(), targetid); - session.transfer(flowFile, Failure); - return; - } - } else if (targetidtype == "String") { - targetnode.identifierType = UA_NODEIDTYPE_STRING; - targetnode.identifier.string = UA_STRING_ALLOC(targetid.c_str()); // NOLINT(cppcoreguidelines-pro-type-union-access) - targetNodeValid = true; - } else { - logger_->log_error("Flowfile {}: target node ID type is invalid: {}. Routing to failure!", - flowFile->getUUIDStr(), targetidtype); - session.transfer(flowFile, Failure); - return; + case opc::OPCNodeDataType::UInt32: { + uint32_t value = std::stoul(contentstr); + sc = connection_->add_node(parent_node_id_, target_node, create_node_reference_type_, browse_name, value, &result_node); + break; } - targetNodeExists = connection_->exists(targetnode); - } - - const auto contentstr = to_string(session.readBuffer(flowFile)); - if (targetNodeExists) { - logger_->log_trace("Node exists, trying to update it"); - try { - UA_StatusCode sc = 0; - switch (nodeDataType_) { - case opc::OPCNodeDataType::Int64: { - int64_t value = std::stoll(contentstr); - sc = connection_->update_node(targetnode, value); - break; - } - case opc::OPCNodeDataType::UInt64: { - uint64_t value = std::stoull(contentstr); - sc = connection_->update_node(targetnode, value); - break; - } - case opc::OPCNodeDataType::Int32: { - int32_t value = std::stoi(contentstr); - sc = connection_->update_node(targetnode, value); - break; - } - case opc::OPCNodeDataType::UInt32: { - uint32_t value = std::stoul(contentstr); - sc = connection_->update_node(targetnode, value); - break; - } - case opc::OPCNodeDataType::Boolean: { - const auto contentstr_parsed = utils::string::toBool(contentstr); - if (contentstr_parsed) { - sc = connection_->update_node(targetnode, contentstr_parsed.value()); - } else { - throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be converted to bool"); - } - break; - } - case opc::OPCNodeDataType::Float: { - float value = std::stof(contentstr); - sc = connection_->update_node(targetnode, value); - break; - } - case opc::OPCNodeDataType::Double: { - double value = std::stod(contentstr); - sc = connection_->update_node(targetnode, value); - break; - } - case opc::OPCNodeDataType::String: { - sc = connection_->update_node(targetnode, contentstr); - break; - } - default: - throw opc::OPCException(GENERAL_EXCEPTION, "This should never happen!"); - } - if (sc != UA_STATUSCODE_GOOD) { - logger_->log_error("Failed to update node: {}", UA_StatusCode_name(sc)); - session.transfer(flowFile, Failure); - return; + case opc::OPCNodeDataType::Boolean: { + if (auto contentstr_parsed = utils::string::toBool(contentstr)) { + sc = connection_->add_node(parent_node_id_, target_node, create_node_reference_type_, browse_name, contentstr_parsed.value(), &result_node); + } else { + throw std::runtime_error("Content cannot be converted to bool"); } - } catch (...) { - std::string typestr; - context.getProperty(ValueType, typestr); - logger_->log_error("Failed to convert {} to data type {}", contentstr, typestr); - session.transfer(flowFile, Failure); - return; + break; } - logger_->log_trace("Node successfully updated!"); - session.transfer(flowFile, Success); - return; - } else { - logger_->log_trace("Node doesn't exist, trying to create new node"); - std::string browsename; - if (!context.getProperty(TargetNodeBrowseName, browsename, flowFile.get())) { - logger_->log_error("Target node browse name is required for flowfile ({}) as new node is to be created", - flowFile->getUUIDStr()); - session.transfer(flowFile, Failure); - return; + case opc::OPCNodeDataType::Float: { + float value = std::stof(contentstr); + sc = connection_->add_node(parent_node_id_, target_node, create_node_reference_type_, browse_name, value, &result_node); + break; } - if (!targetNodeValid) { - targetnode = UA_NODEID_NUMERIC(1, 0); + case opc::OPCNodeDataType::Double: { + double value = std::stod(contentstr); + sc = connection_->add_node(parent_node_id_, target_node, create_node_reference_type_, browse_name, value, &result_node); + break; } - try { - UA_StatusCode sc = 0; - UA_NodeId resultnode; - switch (nodeDataType_) { - case opc::OPCNodeDataType::Int64: { - int64_t value = std::stoll(contentstr); - sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode); - break; - } - case opc::OPCNodeDataType::UInt64: { - uint64_t value = std::stoull(contentstr); - sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode); - break; - } - case opc::OPCNodeDataType::Int32: { - int32_t value = std::stoi(contentstr); - sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode); - break; - } - case opc::OPCNodeDataType::UInt32: { - uint32_t value = std::stoul(contentstr); - sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode); - break; - } - case opc::OPCNodeDataType::Boolean: { - const auto contentstr_parsed = utils::string::toBool(contentstr); - if (contentstr_parsed) { - sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr_parsed.value(), &resultnode); - } else { - throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be converted to bool"); - } - break; - } - case opc::OPCNodeDataType::Float: { - float value = std::stof(contentstr); - sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode); - break; - } - case opc::OPCNodeDataType::Double: { - double value = std::stod(contentstr); - sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode); - break; - } - case opc::OPCNodeDataType::String: { - sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr, &resultnode); - break; - } - default: - throw opc::OPCException(GENERAL_EXCEPTION, "This should never happen!"); - } - if (sc != UA_STATUSCODE_GOOD) { - logger_->log_error("Failed to create node: {}", UA_StatusCode_name(sc)); - session.transfer(flowFile, Failure); - return; - } - } catch (...) { - std::string typestr; - context.getProperty(ValueType, typestr); - logger_->log_error("Failed to convert {} to data type {}", contentstr, typestr); - session.transfer(flowFile, Failure); - return; + case opc::OPCNodeDataType::String: { + sc = connection_->add_node(parent_node_id_, target_node, create_node_reference_type_, browse_name, contentstr, &result_node); + break; } - logger_->log_trace("Node successfully created!"); - session.transfer(flowFile, Success); + default: + logger_->log_error("Unhandled data type: {}", magic_enum::enum_name(node_data_type_)); + gsl_Assert(false); + } + if (sc != UA_STATUSCODE_GOOD) { + logger_->log_error("Failed to create node: {}", UA_StatusCode_name(sc)); + session.transfer(flow_file, Failure); return; } + + logger_->log_trace("Node successfully created!"); + session.transfer(flow_file, Success); + } catch (const std::exception&) { + logger_->log_error("Failed to convert {} to data type {}", contentstr, magic_enum::enum_name(node_data_type_)); + session.transfer(flow_file, Failure); + } +} + +void PutOPCProcessor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + logger_->log_trace("PutOPCProcessor::onTrigger"); + + if (!reconnect()) { + logger_->log_warn("Could not connect to OPC server, yielding"); + yield(); + return; + } + + if (!readParentNodeId()) { + yield(); + return; + } + + auto flow_file = session.get(); + if (!flow_file) { + return; + } + + auto target_node_result = configureTargetNode(context, *flow_file); + if (!target_node_result.has_value()) { + logger_->log_error("{}", target_node_result.error()); + session.transfer(flow_file, Failure); + return; + } + + const auto& [target_node_exists, target_node] = target_node_result.value(); + const auto contentstr = to_string(session.readBuffer(flow_file)); + if (target_node_exists) { + updateNode(target_node, contentstr, session, flow_file); + } else { + createNode(target_node, contentstr, context, session, flow_file); } +} REGISTER_RESOURCE(PutOPCProcessor, Processor); diff --git a/extensions/opc/tests/CMakeLists.txt b/extensions/opc/tests/CMakeLists.txt new file mode 100644 index 0000000000..df99e09929 --- /dev/null +++ b/extensions/opc/tests/CMakeLists.txt @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB OPC_UA_TESTS "*.cpp") + +SET(OPC_UA_TEST_COUNT 0) +FOREACH(testfile ${OPC_UA_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_minifi_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/opc") + createTests("${testfilename}") + target_link_libraries(${testfilename} Catch2WithMain) + target_link_libraries(${testfilename} minifi-opc-extensions) + MATH(EXPR OPC_UA_TEST_COUNT "${OPC_UA_TEST_COUNT}+1") + add_test(NAME "${testfilename}" COMMAND "${testfilename}") + set_tests_properties("${testfilename}" PROPERTIES LABELS "opc;memchecked") +ENDFOREACH() +message("-- Finished building ${OPC_UA_TEST_COUNT} OPC UA related test file(s)...") diff --git a/extensions/opc/tests/FetchOpcProcessorTests.cpp b/extensions/opc/tests/FetchOpcProcessorTests.cpp new file mode 100644 index 0000000000..3ea436f69d --- /dev/null +++ b/extensions/opc/tests/FetchOpcProcessorTests.cpp @@ -0,0 +1,182 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "OpcUaTestServer.h" +#include "unit/SingleProcessorTestController.h" +#include "include/fetchopc.h" + +namespace org::apache::nifi::minifi::test { + +TEST_CASE("Test fetching using path node id", "[fetchopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("FetchOPCProcessor")}; + auto fetch_opc_processor = controller.getProcessor(); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType, "Path"); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID, "Simulator/Default/Device1"); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex, std::to_string(server.getNamespaceIndex())); + + const auto results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4); + for (size_t i = 0; i < 3; i++) { + auto flow_file = results.at(processors::FetchOPCProcessor::Success)[i]; + CHECK(flow_file->getAttribute("Browsename") == "INT" + std::to_string(i + 1)); + CHECK(flow_file->getAttribute("Datasize") == "4"); + CHECK(flow_file->getAttribute("Full path") == "Simulator/Default/Device1/INT" + std::to_string(i + 1)); + CHECK(flow_file->getAttribute("NodeID")); + CHECK(flow_file->getAttribute("NodeID type") == "numeric"); + CHECK(flow_file->getAttribute("Typename") == "Int32"); + CHECK(flow_file->getAttribute("Sourcetimestamp")); + CHECK(controller.plan->getContent(flow_file) == std::to_string(i + 1)); + } + + auto flow_file = results.at(processors::FetchOPCProcessor::Success)[3]; + CHECK(flow_file->getAttribute("Browsename") == "INT4"); + CHECK(flow_file->getAttribute("Datasize") == "4"); + CHECK(flow_file->getAttribute("Full path") == "Simulator/Default/Device1/INT3/INT4"); + CHECK(flow_file->getAttribute("NodeID")); + CHECK(flow_file->getAttribute("NodeID type") == "numeric"); + CHECK(flow_file->getAttribute("Typename") == "Int32"); + CHECK(flow_file->getAttribute("Sourcetimestamp")); + CHECK(controller.plan->getContent(flow_file) == "4"); +} + +TEST_CASE("Test fetching using custom reference type id path", "[fetchopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("FetchOPCProcessor")}; + auto fetch_opc_processor = controller.getProcessor(); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType, "Path"); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID, "Simulator/Default/Device1/INT3"); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex, std::to_string(server.getNamespaceIndex())); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::PathReferenceTypes, "Organizes/Organizes/HasComponent"); + + const auto results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 2); + auto flow_file = results.at(processors::FetchOPCProcessor::Success)[0]; + CHECK(flow_file->getAttribute("Browsename") == "INT3"); + CHECK(flow_file->getAttribute("Datasize") == "4"); + CHECK(flow_file->getAttribute("Full path") == "Simulator/Default/Device1/INT3"); + CHECK(flow_file->getAttribute("NodeID")); + CHECK(flow_file->getAttribute("NodeID type") == "numeric"); + CHECK(flow_file->getAttribute("Typename") == "Int32"); + CHECK(flow_file->getAttribute("Sourcetimestamp")); + CHECK(controller.plan->getContent(flow_file) == "3"); + flow_file = results.at(processors::FetchOPCProcessor::Success)[1]; + CHECK(flow_file->getAttribute("Browsename") == "INT4"); + CHECK(flow_file->getAttribute("Datasize") == "4"); + CHECK(flow_file->getAttribute("Full path") == "Simulator/Default/Device1/INT3/INT4"); + CHECK(flow_file->getAttribute("NodeID")); + CHECK(flow_file->getAttribute("NodeID type") == "numeric"); + CHECK(flow_file->getAttribute("Typename") == "Int32"); + CHECK(flow_file->getAttribute("Sourcetimestamp")); + CHECK(controller.plan->getContent(flow_file) == "4"); +} + +TEST_CASE("Test missing path reference types", "[fetchopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("FetchOPCProcessor")}; + auto fetch_opc_processor = controller.getProcessor(); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType, "Path"); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID, "Simulator/Default/Device1/INT3"); + fetch_opc_processor->setProperty(processors::FetchOPCProcessor::PathReferenceTypes, "Organizes/Organizes"); + REQUIRE_THROWS_WITH(controller.trigger(), "Process Schedule Operation: Path reference types must be provided for each node pair in the path!"); +} + +TEST_CASE("Test username and password should both be provided", "[fetchopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("FetchOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::Username, "user"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::Password, ""); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Both or neither of Username and Password should be provided!"); +} + +TEST_CASE("Test certificate path and key path should both be provided", "[fetchopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("FetchOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath, "cert"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath, ""); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: All or none of Certificate path and Key path should be provided!"); +} + +TEST_CASE("Test application uri should be provided if certificate is provided", "[fetchopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("FetchOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath, "cert"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath, "key"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Application URI must be provided if Certificate path is provided!"); +} + +TEST_CASE("Test certificate path must be valid", "[fetchopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("FetchOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath, "/invalid/cert/path"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath, "key"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::ApplicationURI, "appuri"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load cert from path: /invalid/cert/path"); +} + +TEST_CASE("Test key path must be valid", "[fetchopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("FetchOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + auto test_cert_path = controller.createTempDirectory() / "test_cert.pem"; + { + std::ofstream cert_file(test_cert_path); + cert_file << "test"; + cert_file.close(); + } + put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath, test_cert_path.string()); + put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath, "/invalid/key"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::ApplicationURI, "appuri"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load key from path: /invalid/key"); +} + +TEST_CASE("Test trusted certs path must be valid", "[fetchopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("FetchOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + auto test_cert_path = controller.createTempDirectory() / "test_cert.pem"; + { + std::ofstream cert_file(test_cert_path); + cert_file << "test"; + cert_file.close(); + } + put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath, test_cert_path.string()); + put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath, test_cert_path.string()); + put_opc_processor->setProperty(processors::FetchOPCProcessor::TrustedPath, "/invalid/trusted"); + put_opc_processor->setProperty(processors::FetchOPCProcessor::ApplicationURI, "appuri"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load trusted server certs from path: /invalid/trusted"); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/opc/tests/OpcUaTestServer.h b/extensions/opc/tests/OpcUaTestServer.h new file mode 100644 index 0000000000..8672024836 --- /dev/null +++ b/extensions/opc/tests/OpcUaTestServer.h @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include "unit/TestUtils.h" +#include "unit/Catch.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::test { + +class OpcUaTestServer { + public: + OpcUaTestServer() : server_(UA_Server_new()) { + UA_ServerConfig_setDefault(UA_Server_getConfig(server_)); + + auto config = UA_Server_getConfig(server_); + config->logging->log = [] (void *log_context, UA_LogLevel level, UA_LogCategory /*category*/, const char *msg, va_list args) { + char buffer[1024]; + vsnprintf(buffer, sizeof(buffer), msg, args); + + std::string level_str; + switch (level) { + case UA_LOGLEVEL_TRACE: return; + case UA_LOGLEVEL_DEBUG: level_str = "DEBUG"; break; + case UA_LOGLEVEL_INFO: level_str = "INFO"; break; + case UA_LOGLEVEL_WARNING: level_str = "WARNING"; break; + case UA_LOGLEVEL_ERROR: level_str = "ERROR"; break; + case UA_LOGLEVEL_FATAL: level_str = "FATAL"; break; + default: level_str = "UNKNOWN"; break; + } + + std::string log_message = "[" + level_str + "] " + buffer + "\n"; + auto server = static_cast(log_context); + server->addLog(log_message); + }; + + config->logging->context = this; + + ns_index_ = UA_Server_addNamespace(server_, "custom.namespace"); + + UA_NodeId simulator_node = addObject("Simulator", UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER)); + UA_NodeId default_node = addObject("Default", simulator_node); + UA_NodeId device1_node = addObject("Device1", default_node); + + addIntVariable("INT1", device1_node, 1); + addIntVariable("INT2", device1_node, 2); + auto int3_node = addIntVariable("INT3", device1_node, 3); + addIntVariable("INT4", int3_node, 4); + } + + void start() { + std::lock_guard lock(mutex_); + running_ = true; + server_thread_ = std::thread([this]() { + UA_Server_run(server_, &running_); + }); + ensureConnection(); + } + + void stop() { + std::lock_guard lock(mutex_); + if (!running_) { + return; + } + running_ = false; + if (server_thread_.joinable()) { + server_thread_.join(); + } + } + + ~OpcUaTestServer() { + stop(); + UA_Server_delete(server_); + } + + UA_UInt16 getNamespaceIndex() const { + return ns_index_; + } + + void addLog(const std::string& log) { + std::lock_guard lock(server_logs_mutex_); + server_logs_.push_back(log); + } + + std::vector getLogs() const { + std::lock_guard lock(server_logs_mutex_); + return server_logs_; + } + + private: + UA_NodeId addObject(const char *name, UA_NodeId parent) { + UA_NodeId object_id; + UA_ObjectAttributes attr = UA_ObjectAttributes_default; + attr.displayName = UA_LOCALIZEDTEXT_ALLOC("en-US", name); + + auto status = UA_Server_addObjectNode( + server_, UA_NODEID_NULL, parent, + UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), + UA_QUALIFIEDNAME(ns_index_, const_cast(name)), + UA_NODEID_NUMERIC(0, UA_NS0ID_BASEOBJECTTYPE), + attr, nullptr, &object_id); + + if (status != UA_STATUSCODE_GOOD) { + UA_LocalizedText_clear(&attr.displayName); + throw std::runtime_error("Failed to add object node"); + } + + UA_LocalizedText_clear(&attr.displayName); + return object_id; + } + + UA_NodeId addIntVariable(const char *name, UA_NodeId parent, UA_Int32 value) { + UA_VariableAttributes attr = UA_VariableAttributes_default; + attr.displayName = UA_LOCALIZEDTEXT_ALLOC("en-US", name); + attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE; + + UA_Variant_setScalar(&attr.value, &value, &UA_TYPES[UA_TYPES_INT32]); + + UA_NodeId node_id; + auto status = UA_Server_addVariableNode( + server_, UA_NODEID_NULL, parent, + UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), + UA_QUALIFIEDNAME(ns_index_, const_cast(name)), + UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), + attr, nullptr, &node_id); + + if (status != UA_STATUSCODE_GOOD) { + UA_LocalizedText_clear(&attr.displayName); + throw std::runtime_error("Failed to add variable node"); + } + + UA_LocalizedText_clear(&attr.displayName); + return node_id; + } + + void ensureConnection() { + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [&]() { + auto logs = getLogs(); + return std::find_if(logs.begin(), logs.end(), [](const std::string& message) { return message.find("New DiscoveryUrl added") != std::string::npos;}) != logs.end(); + }, + 100ms)); + } + + UA_Server* server_; + UA_UInt16 ns_index_; + UA_Boolean running_ = false; + std::mutex mutex_; + std::thread server_thread_; + mutable std::mutex server_logs_mutex_; + std::vector server_logs_; +}; + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/opc/tests/PutOpcProcessorTests.cpp b/extensions/opc/tests/PutOpcProcessorTests.cpp new file mode 100644 index 0000000000..d9326e2be0 --- /dev/null +++ b/extensions/opc/tests/PutOpcProcessorTests.cpp @@ -0,0 +1,441 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "OpcUaTestServer.h" +#include "unit/SingleProcessorTestController.h" +#include "include/putopc.h" +#include "utils/StringUtils.h" +#include "unit/TestUtils.h" + +namespace org::apache::nifi::minifi::test { + +struct NodeData { + uint8_t data; + uint16_t namespace_index; + uint32_t node_id; + std::string browse_name; + std::string path; + std::string path_reference_types; + std::string target_reference_type = "HasComponent"; +}; + +void verifyCreatedNode(const NodeData& expected_node, SingleProcessorTestController& controller) { + auto client = minifi::opc::Client::createClient(controller.getLogger(), "", {}, {}, {}); + REQUIRE(client->connect("opc.tcp://127.0.0.1:4840/") == UA_STATUSCODE_GOOD); + std::vector found_node_ids; + std::vector reference_types; + + if (!expected_node.path_reference_types.empty()) { + auto ref_types = minifi::utils::string::split(expected_node.path_reference_types, "/"); + for (const auto& ref_type : ref_types) { + reference_types.push_back(opc::mapOpcReferenceType(ref_type).value()); + } + } else { + for (size_t i = 0; i < minifi::utils::string::split(expected_node.path, "/").size() - 1; ++i) { + reference_types.push_back(UA_NS0ID_ORGANIZES); + } + } + reference_types.push_back(opc::mapOpcReferenceType(expected_node.target_reference_type).value()); + + REQUIRE(utils::verifyEventHappenedInPollTime(5s, [&] { + client->translateBrowsePathsToNodeIdsRequest(expected_node.path + "/" + expected_node.browse_name, found_node_ids, expected_node.namespace_index, reference_types, controller.getLogger()); + return !found_node_ids.empty(); + }, 100ms)); + + REQUIRE(found_node_ids.size() == 1); + REQUIRE(found_node_ids[0].namespaceIndex == expected_node.namespace_index); + REQUIRE(found_node_ids[0].identifierType == UA_NODEIDTYPE_NUMERIC); + REQUIRE(found_node_ids[0].identifier.numeric == expected_node.node_id); // NOLINT(cppcoreguidelines-pro-type-union-access) + + UA_ReferenceDescription ref_desc; + ref_desc.isForward = true; + ref_desc.referenceTypeId = UA_NODEID_NUMERIC(0, UA_NODEIDTYPE_NUMERIC); + ref_desc.nodeId.nodeId = found_node_ids[0]; + ref_desc.browseName = UA_QUALIFIEDNAME_ALLOC(expected_node.namespace_index, expected_node.browse_name.c_str()); + ref_desc.displayName = UA_LOCALIZEDTEXT_ALLOC("en-US", expected_node.browse_name.c_str()); + ref_desc.nodeClass = UA_NODECLASS_VARIABLE; + ref_desc.typeDefinition.nodeId = UA_NODEID_NUMERIC(0, UA_NODEIDTYPE_NUMERIC); + auto data = client->getNodeData(&ref_desc, expected_node.path); + CHECK(data.data[0] == expected_node.data); +} + +TEST_CASE("Test creating a new node with path node id", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + + NodeData expected_node{42, server.getNamespaceIndex(), 9999, "everything", "Simulator/Default/Device1", {}}; + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, expected_node.path); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(expected_node.namespace_index)); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "Int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, std::to_string(expected_node.node_id)); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, std::to_string(expected_node.namespace_index)); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, expected_node.browse_name); + + const auto results = controller.trigger(std::to_string(expected_node.data)); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Success).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Success)[0]; + CHECK(controller.plan->getContent(flow_file) == std::to_string(expected_node.data)); + verifyCreatedNode(expected_node, controller); +} + +TEST_CASE("Test fetching using custom reference type id path", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + + NodeData expected_node{42, server.getNamespaceIndex(), 9999, "everything", "Simulator/Default/Device1/INT3/INT4", "Organizes/Organizes/HasComponent/HasComponent"}; + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, expected_node.path); + put_opc_processor->setProperty(processors::PutOPCProcessor::PathReferenceTypes, expected_node.path_reference_types); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(expected_node.namespace_index)); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "Int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, std::to_string(expected_node.node_id)); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, std::to_string(expected_node.namespace_index)); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, expected_node.browse_name); + + const auto results = controller.trigger(std::to_string(expected_node.data)); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Success).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Success)[0]; + CHECK(controller.plan->getContent(flow_file) == std::to_string(expected_node.data)); + verifyCreatedNode(expected_node, controller); +} + +TEST_CASE("Test fetching using custom target reference type id", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + + NodeData expected_node{42, server.getNamespaceIndex(), 9999, "everything", "Simulator/Default/Device1/INT3", "Organizes/Organizes/HasComponent", "Organizes"}; + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, expected_node.path); + put_opc_processor->setProperty(processors::PutOPCProcessor::PathReferenceTypes, expected_node.path_reference_types); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(expected_node.namespace_index)); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "Int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, std::to_string(expected_node.node_id)); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, std::to_string(expected_node.namespace_index)); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, expected_node.browse_name); + put_opc_processor->setProperty(processors::PutOPCProcessor::CreateNodeReferenceType, expected_node.target_reference_type); + + const auto results = controller.trigger(std::to_string(expected_node.data)); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Success).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Success)[0]; + CHECK(controller.plan->getContent(flow_file) == std::to_string(expected_node.data)); + verifyCreatedNode(expected_node, controller); +} + +TEST_CASE("Test missing path reference types", "[putopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1/INT3/INT4"); + put_opc_processor->setProperty(processors::PutOPCProcessor::PathReferenceTypes, "Organizes/Organizes/HasComponent"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "Int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, "9999"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, "2"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, "everything"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Path reference types must be provided for each node pair in the path!"); +} + +TEST_CASE("Test namespace cannot be empty", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + LogTestController::getInstance().setTrace(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1/INT3/INT4"); + put_opc_processor->setProperty(processors::PutOPCProcessor::PathReferenceTypes, "Organizes/Organizes/HasComponent/HasComponent"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, "2"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, "everything"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, "${missing}"); + + const auto results = controller.trigger("42"); + REQUIRE(results.at(processors::PutOPCProcessor::Success).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Failure)[0]; + CHECK(controller.plan->getContent(flow_file) == "42"); + REQUIRE(LogTestController::getInstance().contains("had no target namespace index specified, routing to failure")); +} + +TEST_CASE("Test valid namespace being required", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + LogTestController::getInstance().setTrace(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1/INT3/INT4"); + put_opc_processor->setProperty(processors::PutOPCProcessor::PathReferenceTypes, "Organizes/Organizes/HasComponent/HasComponent"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, "everything"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, "invalid_index"); + + const auto results = controller.trigger("42"); + REQUIRE(results.at(processors::PutOPCProcessor::Success).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Failure)[0]; + CHECK(controller.plan->getContent(flow_file) == "42"); + REQUIRE(LogTestController::getInstance().contains("has invalid namespace index (invalid_index), routing to failure")); +} + +TEST_CASE("Test username and password should both be provided", "[putopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::Username, "user"); + put_opc_processor->setProperty(processors::PutOPCProcessor::Password, ""); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Both or neither of Username and Password should be provided!"); +} + +TEST_CASE("Test certificate path and key path should both be provided", "[putopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::CertificatePath, "cert"); + put_opc_processor->setProperty(processors::PutOPCProcessor::KeyPath, ""); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: All or none of Certificate path and Key path should be provided!"); +} + +TEST_CASE("Test application uri should be provided if certificate is provided", "[putopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::CertificatePath, "cert"); + put_opc_processor->setProperty(processors::PutOPCProcessor::KeyPath, "key"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Application URI must be provided if Certificate path is provided!"); +} + +TEST_CASE("Test certificate path must be valid", "[putopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::CertificatePath, "/invalid/cert/path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::KeyPath, "key"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ApplicationURI, "appuri"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load cert from path: /invalid/cert/path"); +} + +TEST_CASE("Test key path must be valid", "[putopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + auto test_cert_path = controller.createTempDirectory() / "test_cert.pem"; + { + std::ofstream cert_file(test_cert_path); + cert_file << "test"; + cert_file.close(); + } + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::CertificatePath, test_cert_path.string()); + put_opc_processor->setProperty(processors::PutOPCProcessor::KeyPath, "/invalid/key"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ApplicationURI, "appuri"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load key from path: /invalid/key"); +} + +TEST_CASE("Test trusted certs path must be valid", "[putopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + auto test_cert_path = controller.createTempDirectory() / "test_cert.pem"; + { + std::ofstream cert_file(test_cert_path); + cert_file << "test"; + cert_file.close(); + } + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::CertificatePath, test_cert_path.string()); + put_opc_processor->setProperty(processors::PutOPCProcessor::KeyPath, test_cert_path.string()); + put_opc_processor->setProperty(processors::PutOPCProcessor::TrustedPath, "/invalid/trusted"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ApplicationURI, "appuri"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load trusted server certs from path: /invalid/trusted"); +} + +TEST_CASE("Test invalid int node id", "[putopcprocessor]") { + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + + REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Simulator/Default/Device1 cannot be used as an int type node ID"); +} + +TEST_CASE("Test invalid parent node id path", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1/INT99"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "Int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, "9999"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, "everything"); + + const auto results = controller.trigger("42"); + REQUIRE(results.at(processors::PutOPCProcessor::Success).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).empty()); + REQUIRE(LogTestController::getInstance().contains("to node id, no flow files will be put")); +} + +TEST_CASE("Test missing target node id", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "Int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, "${missing}"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, "everything"); + + const auto results = controller.trigger("42"); + REQUIRE(results.at(processors::PutOPCProcessor::Success).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Failure)[0]; + CHECK(controller.plan->getContent(flow_file) == "42"); + REQUIRE(LogTestController::getInstance().contains("had target node ID type specified (Int) without ID, routing to failure")); +} + +TEST_CASE("Test invalid target node id", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "Int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, "invalid_int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, "everything"); + + const auto results = controller.trigger("42"); + REQUIRE(results.at(processors::PutOPCProcessor::Success).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Failure)[0]; + CHECK(controller.plan->getContent(flow_file) == "42"); + REQUIRE(LogTestController::getInstance().contains("target node ID is not a valid integer: invalid_int. Routing to failure")); +} + +TEST_CASE("Test missing target node type", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "${missing}"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, "9999"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, "everything"); + + const auto results = controller.trigger("42", {{"invalid_type", "invalid"}}); + REQUIRE(results.at(processors::PutOPCProcessor::Success).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Failure)[0]; + CHECK(controller.plan->getContent(flow_file) == "42"); + REQUIRE(LogTestController::getInstance().contains("has invalid target node id type (), routing to failure")); +} + +TEST_CASE("Test invalid target node type", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Int32"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "${invalid_type}"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, "9999"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, "everything"); + + const auto results = controller.trigger("42", {{"invalid_type", "invalid"}}); + REQUIRE(results.at(processors::PutOPCProcessor::Success).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Failure)[0]; + CHECK(controller.plan->getContent(flow_file) == "42"); + REQUIRE(LogTestController::getInstance().contains("target node ID type is invalid: invalid. Routing to failure")); +} + +TEST_CASE("Test value type mismatch", "[putopcprocessor]") { + OpcUaTestServer server; + server.start(); + SingleProcessorTestController controller{std::make_unique("PutOPCProcessor")}; + auto put_opc_processor = controller.getProcessor(); + put_opc_processor->setProperty(processors::PutOPCProcessor::OPCServerEndPoint, "opc.tcp://127.0.0.1:4840/"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeIDType, "Path"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNodeID, "Simulator/Default/Device1"); + put_opc_processor->setProperty(processors::PutOPCProcessor::ParentNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::ValueType, "Boolean"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeIDType, "Int"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeID, "9999"); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeNameSpaceIndex, std::to_string(server.getNamespaceIndex())); + put_opc_processor->setProperty(processors::PutOPCProcessor::TargetNodeBrowseName, "everything"); + + const auto results = controller.trigger("42"); + REQUIRE(results.at(processors::PutOPCProcessor::Success).empty()); + REQUIRE(results.at(processors::PutOPCProcessor::Failure).size() == 1); + auto flow_file = results.at(processors::PutOPCProcessor::Failure)[0]; + CHECK(controller.plan->getContent(flow_file) == "42"); + REQUIRE(LogTestController::getInstance().contains("Failed to convert 42 to data type Boolean")); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/thirdparty/open62541/open62541.patch b/thirdparty/open62541/open62541.patch index 3ad611fe0c..a2cfee5ee8 100644 --- a/thirdparty/open62541/open62541.patch +++ b/thirdparty/open62541/open62541.patch @@ -1,18 +1,18 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt -index 1934374e..4babdd20 100644 +index d0e9d75e2..bbf1b4c9a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,7 +11,7 @@ endif() - + string(TOLOWER "${CMAKE_BUILD_TYPE}" BUILD_TYPE_LOWER_CASE) - + -set(CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/tools/cmake") +list(APPEND CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/tools/cmake") - find_package(Python3 REQUIRED) - set(PYTHON_EXECUTABLE ${Python3_EXECUTABLE}) - find_package(Git) -@@ -721,17 +721,17 @@ if(NOT UA_FORCE_CPP AND (CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID}" ST - + if(${CMAKE_VERSION} VERSION_LESS 3.12) + set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${PROJECT_SOURCE_DIR}/tools/cmake3.12") + endif() +@@ -642,17 +642,17 @@ if(CMAKE_COMPILER_IS_GNUCC OR CMAKE_C_COMPILER_ID STREQUAL "Clang") + # IPO requires too much memory for unit tests # GCC docu recommends to compile all files with the same options, therefore ignore it completely - if(NOT UA_BUILD_UNIT_TESTS AND NOT DEFINED CMAKE_INTERPROCEDURAL_OPTIMIZATION) @@ -26,17 +26,17 @@ index 1934374e..4babdd20 100644 - endif() - endif() - endif() -+ # if(NOT UA_BUILD_UNIT_TESTS AND NOT DEFINED CMAKE_INTERPROCEDURAL_OPTIMIZATION) -+ # # needed to check if IPO is supported (check needs cmake > 3.9) -+ # if("${CMAKE_VERSION}" VERSION_GREATER 3.9) -+ # cmake_policy(SET CMP0069 NEW) # needed as long as required cmake < 3.9 -+ # include(CheckIPOSupported) -+ # check_ipo_supported(RESULT CC_HAS_IPO) # Inter Procedural Optimization / Link Time Optimization (should be same as -flto) -+ # if(CC_HAS_IPO) -+ # set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON) -+ # endif() -+ # endif() -+ # endif() ++ # if(NOT UA_BUILD_UNIT_TESTS AND NOT DEFINED CMAKE_INTERPROCEDURAL_OPTIMIZATION) ++ # # needed to check if IPO is supported (check needs cmake > 3.9) ++ # if("${CMAKE_VERSION}" VERSION_GREATER 3.9) ++ # cmake_policy(SET CMP0069 NEW) # needed as long as required cmake < 3.9 ++ # include(CheckIPOSupported) ++ # check_ipo_supported(RESULT CC_HAS_IPO) # Inter Procedural Optimization / Link Time Optimization (should be same as -flto) ++ # if(CC_HAS_IPO) ++ # set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON) ++ # endif() ++ # endif() ++ # endif() endif() - + if(UA_ENABLE_AMALGAMATION)