diff --git a/asyncua/ua/uatypes.py b/asyncua/ua/uatypes.py index f29d42af5..a691ed124 100644 --- a/asyncua/ua/uatypes.py +++ b/asyncua/ua/uatypes.py @@ -3,20 +3,21 @@ """ import binascii -import sys -from typing import Optional, Any, Union, Generic, List import collections +import itertools import logging -from enum import Enum, IntEnum -import uuid import re -import itertools -from datetime import datetime, timedelta, timezone +import sys +import uuid +from base64 import b64decode, b64encode from dataclasses import dataclass, field -from base64 import b64encode, b64decode +from datetime import datetime, timedelta, timezone +from enum import IntEnum +from typing import Any, Generic, List, Optional, Union # hack to support python < 3.8 if sys.version_info.minor < 10: + def get_origin(tp): if hasattr(tp, "__origin__"): return tp.__origin__ @@ -32,12 +33,12 @@ def get_args(tp): return res return () else: - from typing import get_origin, get_args # type: ignore + from typing import get_args, get_origin # type: ignore from asyncua.ua import status_codes -from .uaerrors import UaError, UaStatusCodeError, UaStringParsingError +from .uaerrors import UaError, UaStatusCodeError, UaStringParsingError _logger = logging.getLogger(__name__) @@ -48,7 +49,7 @@ def get_args(tp): MAX_FILETIME_EPOCH_DATETIME = datetime(9999, 12, 31, 23, 59, 59) MAX_FILETIME_EPOCH_AS_UTC_DATETIME = MAX_FILETIME_EPOCH_DATETIME.replace(tzinfo=timezone.utc) MAX_OPC_FILETIME = int((MAX_FILETIME_EPOCH_DATETIME - FILETIME_EPOCH_AS_DATETIME).total_seconds()) * HUNDREDS_OF_NANOSECONDS -MAX_INT64 = 2 ** 63 - 1 +MAX_INT64 = 2**63 - 1 def type_is_union(uatype): @@ -67,14 +68,14 @@ def types_or_list_from_union(uatype): # returns the type of a union or the list of type if a list is inside the union types = [] for subtype in get_args(uatype): - if hasattr(subtype, '_paramspec_tvars'): + if hasattr(subtype, "_paramspec_tvars"): # @hack how to check if a parameter is a list: # check if have _paramspec_tvars works for type[X] return True, subtype - elif hasattr(subtype, '_name'): + elif hasattr(subtype, "_name"): # @hack how to check if parameter is union or list # if _name is not List, it is Union - if subtype._name == 'List': + if subtype._name == "List": return True, subtype elif not isinstance(None, subtype): types.append(subtype) @@ -119,7 +120,8 @@ def type_string_from_type(uatype): @dataclass class UaUnion: - ''' class to identify unions ''' + """class to identify unions""" + pass @@ -253,7 +255,7 @@ class ValueRank(IntEnum): class _MaskEnum(IntEnum): @classmethod def parse_bitfield(cls, the_int): - """ Take an integer and interpret it as a set of enum values. """ + """Take an integer and interpret it as a set of enum values.""" if not isinstance(the_int, int): raise ValueError( f"Argument should be an int, we received {the_int} fo type {type(the_int)}" @@ -262,7 +264,7 @@ def parse_bitfield(cls, the_int): @classmethod def to_bitfield(cls, collection): - """ Takes some enum values and creates an integer from them. """ + """Takes some enum values and creates an integer from them.""" # make sure all elements are of the correct type (use itertools.tee in case we get passed an # iterator) iter1, iter2 = itertools.tee(iter(collection)) @@ -391,7 +393,7 @@ def is_bad(self): return True if status is Bad (10) or (11). """ mask = 3 << 30 - if mask & self.value in (0x80000000, 0xc0000000): + if mask & self.value in (0x80000000, 0xC0000000): return True return False @@ -458,7 +460,7 @@ def __post_init__(self): if isinstance(self.Identifier, int): if self.Identifier < 255 and self.NamespaceIndex == 0: object.__setattr__(self, "NodeIdType", NodeIdType.TwoByte) - elif self.Identifier < 2 ** 16 and self.NamespaceIndex < 255: + elif self.Identifier < 2**16 and self.NamespaceIndex < 255: object.__setattr__(self, "NodeIdType", NodeIdType.FourByte) else: object.__setattr__(self, "NodeIdType", NodeIdType.Numeric) @@ -554,7 +556,7 @@ def _from_string(string): identifier = uuid.UUID(f"urn:uuid:{v}") elif k == "b": ntype = NodeIdType.ByteString - if v[0:2] == '0x': + if v[0:2] == "0x": # our custom handling, normaly all bytestring nodes are base64 identifier = bytes.fromhex(v[2:]) else: @@ -563,7 +565,7 @@ def _from_string(string): identifier = b64decode(v.encode("ascii")) except (binascii.Error, ValueError): # fallback for backwards compatiblity just use the bytestring - identifier = v.encode('ascii') + identifier = v.encode("ascii") elif k == "srv": srv = int(v.strip()) elif k == "nsu": @@ -592,7 +594,7 @@ def to_string(self): ntype = "g" elif self.NodeIdType == NodeIdType.ByteString: ntype = "b" - identifier = b64encode(identifier).decode('ascii') + identifier = b64encode(identifier).decode("ascii") string.append(f"{ntype}={identifier}") return ";".join(string) @@ -819,7 +821,7 @@ def __bool__(self): return self.Body is not None -class VariantType(Enum): +class VariantType(IntEnum): """ The possible types of a variant. @@ -932,7 +934,6 @@ class Variant: is_array: Optional[bool] = None def __post_init__(self): - if self.is_array is None: if isinstance(self.Value, (list, tuple)) or self.Dimensions: object.__setattr__(self, "is_array", True) @@ -958,11 +959,11 @@ def __post_init__(self): # why do we rewrite this case and not the others? object.__setattr__(self, "Value", NodeId(0, 0)) elif self.VariantType not in ( - VariantType.Null, - VariantType.String, - VariantType.DateTime, - VariantType.ExtensionObject, - VariantType.ByteString, + VariantType.Null, + VariantType.String, + VariantType.DateTime, + VariantType.ExtensionObject, + VariantType.ByteString, ): raise UaError( f"Non array Variant of type {self.VariantType} cannot have value None" @@ -1274,21 +1275,21 @@ def get_extensionobject_class_type(typeid): return None -class SecurityPolicyType(Enum): +class SecurityPolicyType(IntEnum): """ The supported types of SecurityPolicy. - "None" + "NoSecurity" "Basic128Rsa15_Sign" "Basic128Rsa15_SignAndEncrypt" "Basic256_Sign" "Basic256_SignAndEncrypt" "Basic256Sha256_Sign" "Basic256Sha256_SignAndEncrypt" - "Aes128_Sha256_RsaOaep_Sign" - "Aes128_Sha256_RsaOaep_SignAndEncrypt" - "Aes256_Sha256_RsaPss_Sign" - "Aes256_Sha256_RsaPss_SignAndEncrypt" + "Aes128Sha256RsaOaep_Sign" + "Aes128Sha256RsaOaep_SignAndEncrypt" + "Aes256Sha256RsaPss_Sign" + "Aes256Sha256RsaPss_SignAndEncrypt" """ NoSecurity = 0