From a5ca68a6f56e7a9e67cb813f28e49b4d02af3a64 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 19 Mar 2025 12:45:34 +0200 Subject: [PATCH 1/5] Experimental: multiprocessing for speeding up query map decodes. --- async_substrate_interface/async_substrate.py | 119 +++++++++--------- async_substrate_interface/sync_substrate.py | 4 +- .../utils/decoding_attempt.py | 99 +++++++++++++++ 3 files changed, 159 insertions(+), 63 deletions(-) create mode 100644 async_substrate_interface/utils/decoding_attempt.py diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 502b743..3f04485 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -5,6 +5,7 @@ """ import asyncio +from concurrent.futures import ProcessPoolExecutor import inspect import logging import random @@ -56,6 +57,7 @@ ) from async_substrate_interface.utils.storage import StorageKey from async_substrate_interface.type_registry import _TYPE_REGISTRY +from async_substrate_interface.utils.decoding_attempt import _decode_query_map, _decode_scale_with_runtime if TYPE_CHECKING: from websockets.asyncio.client import ClientConnection @@ -413,6 +415,7 @@ def __init__( last_key: Optional[str] = None, max_results: Optional[int] = None, ignore_decoding_errors: bool = False, + executor: Optional["ProcessPoolExecutor"] = None ): self.records = records self.page_size = page_size @@ -425,6 +428,7 @@ def __init__( self.params = params self.ignore_decoding_errors = ignore_decoding_errors self.loading_complete = False + self.executor = executor self._buffer = iter(self.records) # Initialize the buffer with initial records async def retrieve_next_page(self, start_key) -> list: @@ -437,6 +441,7 @@ async def retrieve_next_page(self, start_key) -> list: start_key=start_key, max_results=self.max_results, ignore_decoding_errors=self.ignore_decoding_errors, + executor=self.executor ) if len(result.records) < self.page_size: self.loading_complete = True @@ -862,6 +867,7 @@ async def encode_scale( await self._wait_for_registry(_attempt, _retries) return self._encode_scale(type_string, value) + async def decode_scale( self, type_string: str, @@ -898,7 +904,7 @@ async def decode_scale( else: return obj - async def load_runtime(self, runtime): + def load_runtime(self, runtime): self.runtime = runtime # Update type registry @@ -954,7 +960,7 @@ async def init_runtime( ) if self.runtime and runtime_version == self.runtime.runtime_version: - return + return self.runtime runtime = self.runtime_cache.retrieve(runtime_version=runtime_version) if not runtime: @@ -990,7 +996,7 @@ async def init_runtime( runtime_version=runtime_version, runtime=runtime ) - await self.load_runtime(runtime) + self.load_runtime(runtime) if self.ss58_format is None: # Check and apply runtime constants @@ -1000,6 +1006,7 @@ async def init_runtime( if ss58_prefix_constant: self.ss58_format = ss58_prefix_constant + return runtime async def create_storage_key( self, @@ -2858,6 +2865,7 @@ async def query_map( page_size: int = 100, ignore_decoding_errors: bool = False, reuse_block_hash: bool = False, + executor: Optional["ProcessPoolExecutor"] = None ) -> AsyncQueryMapResult: """ Iterates over all key-pairs located at the given module and storage_function. The storage @@ -2892,12 +2900,11 @@ async def query_map( Returns: AsyncQueryMapResult object """ - hex_to_bytes_ = hex_to_bytes params = params or [] block_hash = await self._get_current_block_hash(block_hash, reuse_block_hash) if block_hash: self.last_block_hash = block_hash - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) metadata_pallet = self.runtime.metadata.get_metadata_pallet(module) if not metadata_pallet: @@ -2952,19 +2959,6 @@ async def query_map( result = [] last_key = None - def concat_hash_len(key_hasher: str) -> int: - """ - Helper function to avoid if statements - """ - if key_hasher == "Blake2_128Concat": - return 16 - elif key_hasher == "Twox64Concat": - return 8 - elif key_hasher == "Identity": - return 0 - else: - raise ValueError("Unsupported hash type") - if len(result_keys) > 0: last_key = result_keys[-1] @@ -2975,51 +2969,51 @@ def concat_hash_len(key_hasher: str) -> int: if "error" in response: raise SubstrateRequestException(response["error"]["message"]) - for result_group in response["result"]: - for item in result_group["changes"]: - try: - # Determine type string - key_type_string = [] - for n in range(len(params), len(param_types)): - key_type_string.append( - f"[u8; {concat_hash_len(key_hashers[n])}]" - ) - key_type_string.append(param_types[n]) - - item_key_obj = await self.decode_scale( - type_string=f"({', '.join(key_type_string)})", - scale_bytes=bytes.fromhex(item[0][len(prefix) :]), - return_scale_obj=True, - ) - - # strip key_hashers to use as item key - if len(param_types) - len(params) == 1: - item_key = item_key_obj[1] - else: - item_key = tuple( - item_key_obj[key + 1] - for key in range(len(params), len(param_types) + 1, 2) - ) - - except Exception as _: - if not ignore_decoding_errors: - raise - item_key = None - - try: - item_bytes = hex_to_bytes_(item[1]) - - item_value = await self.decode_scale( - type_string=value_type, - scale_bytes=item_bytes, - return_scale_obj=True, - ) - except Exception as _: - if not ignore_decoding_errors: - raise - item_value = None - result.append([item_key, item_value]) + if executor: + # print( + # ("prefix", type("prefix")), + # ("runtime_registry", type(runtime.registry)), + # ("param_types", type(param_types)), + # ("params", type(params)), + # ("value_type", type(value_type)), + # ("key_hasher", type(key_hashers)), + # ("ignore_decoding_errors", type(ignore_decoding_errors)), + # ) + result = await asyncio.get_running_loop().run_in_executor( + executor, + _decode_query_map, + result_group["changes"], + prefix, + runtime.registry.registry, + param_types, + params, + value_type, key_hashers, ignore_decoding_errors + ) + # max_workers = executor._max_workers + # result_group_changes_groups = [result_group["changes"][i:i + max_workers] for i in range(0, len(result_group["changes"]), max_workers)] + # all_results = executor.map( + # self._decode_query_map, + # result_group["changes"], + # repeat(prefix), + # repeat(runtime.registry), + # repeat(param_types), + # repeat(params), + # repeat(value_type), + # repeat(key_hashers), + # repeat(ignore_decoding_errors) + # ) + # for r in all_results: + # result.extend(r) + else: + result = _decode_query_map( + result_group["changes"], + prefix, + runtime.registry.registry, + param_types, + params, + value_type, key_hashers, ignore_decoding_errors + ) return AsyncQueryMapResult( records=result, page_size=page_size, @@ -3031,6 +3025,7 @@ def concat_hash_len(key_hasher: str) -> int: last_key=last_key, max_results=max_results, ignore_decoding_errors=ignore_decoding_errors, + executor=executor ) async def submit_extrinsic( diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index c2c9b3c..9ff45a9 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -525,7 +525,9 @@ def __enter__(self): return self def __del__(self): - self.close() + self.ws.close() + print("DELETING SUBSTATE") + # self.ws.protocol.fail(code=1006) # ABNORMAL_CLOSURE def initialize(self): """ diff --git a/async_substrate_interface/utils/decoding_attempt.py b/async_substrate_interface/utils/decoding_attempt.py new file mode 100644 index 0000000..c9d3718 --- /dev/null +++ b/async_substrate_interface/utils/decoding_attempt.py @@ -0,0 +1,99 @@ +from scalecodec import ss58_encode + +from async_substrate_interface.utils import hex_to_bytes +from bt_decode import decode as decode_by_type_string, PortableRegistry +from bittensor_wallet.utils import SS58_FORMAT + + +class ScaleObj: + def __init__(self, value): + self.value = value + +def _decode_scale_with_runtime( + type_string: str, + scale_bytes: bytes, + runtime_registry: "Runtime", + return_scale_obj: bool = False +): + if scale_bytes == b"": + return None + if type_string == "scale_info::0": # Is an AccountId + # Decode AccountId bytes to SS58 address + return ss58_encode(scale_bytes, SS58_FORMAT) + else: + obj = decode_by_type_string(type_string, runtime_registry, scale_bytes) + if return_scale_obj: + return ScaleObj(obj) + else: + return obj + +def _decode_query_map(result_group_changes, prefix, runtime_registry, + param_types, params, value_type, key_hashers, ignore_decoding_errors): + def concat_hash_len(key_hasher: str) -> int: + """ + Helper function to avoid if statements + """ + if key_hasher == "Blake2_128Concat": + return 16 + elif key_hasher == "Twox64Concat": + return 8 + elif key_hasher == "Identity": + return 0 + else: + raise ValueError("Unsupported hash type") + + hex_to_bytes_ = hex_to_bytes + runtime_registry = PortableRegistry.from_json(runtime_registry) + + result = [] + for item in result_group_changes: + try: + # Determine type string + key_type_string = [] + for n in range(len(params), len(param_types)): + key_type_string.append( + f"[u8; {concat_hash_len(key_hashers[n])}]" + ) + key_type_string.append(param_types[n]) + + item_key_obj = _decode_scale_with_runtime( + f"({', '.join(key_type_string)})", + bytes.fromhex(item[0][len(prefix):]), + runtime_registry, + False + ) + + # strip key_hashers to use as item key + if len(param_types) - len(params) == 1: + item_key = item_key_obj[1] + else: + item_key = tuple( + item_key_obj[key + 1] + for key in range(len(params), len(param_types) + 1, 2) + ) + + except Exception as _: + if not ignore_decoding_errors: + raise + item_key = None + + try: + item_bytes = hex_to_bytes_(item[1]) + + item_value = _decode_scale_with_runtime( + value_type, + item_bytes, + runtime_registry, + True + ) + + except Exception as _: + if not ignore_decoding_errors: + raise + item_value = None + result.append([item_key, item_value]) + return result + + +if __name__ == "__main__": + pass From c13dc44d3c16f1d65426615fb22d4af62f7ffa36 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 19 Mar 2025 12:52:53 +0200 Subject: [PATCH 2/5] Formatting --- async_substrate_interface/async_substrate.py | 28 +++++++++------ .../utils/decoding_attempt.py | 35 +++++++++++-------- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 3f04485..21be027 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -5,7 +5,6 @@ """ import asyncio -from concurrent.futures import ProcessPoolExecutor import inspect import logging import random @@ -57,10 +56,14 @@ ) from async_substrate_interface.utils.storage import StorageKey from async_substrate_interface.type_registry import _TYPE_REGISTRY -from async_substrate_interface.utils.decoding_attempt import _decode_query_map, _decode_scale_with_runtime +from async_substrate_interface.utils.decoding_attempt import ( + decode_query_map, + _decode_scale_with_runtime, +) if TYPE_CHECKING: from websockets.asyncio.client import ClientConnection + from concurrent.futures import ProcessPoolExecutor ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]] @@ -415,7 +418,7 @@ def __init__( last_key: Optional[str] = None, max_results: Optional[int] = None, ignore_decoding_errors: bool = False, - executor: Optional["ProcessPoolExecutor"] = None + executor: Optional["ProcessPoolExecutor"] = None, ): self.records = records self.page_size = page_size @@ -441,7 +444,7 @@ async def retrieve_next_page(self, start_key) -> list: start_key=start_key, max_results=self.max_results, ignore_decoding_errors=self.ignore_decoding_errors, - executor=self.executor + executor=self.executor, ) if len(result.records) < self.page_size: self.loading_complete = True @@ -867,7 +870,6 @@ async def encode_scale( await self._wait_for_registry(_attempt, _retries) return self._encode_scale(type_string, value) - async def decode_scale( self, type_string: str, @@ -2865,7 +2867,7 @@ async def query_map( page_size: int = 100, ignore_decoding_errors: bool = False, reuse_block_hash: bool = False, - executor: Optional["ProcessPoolExecutor"] = None + executor: Optional["ProcessPoolExecutor"] = None, ) -> AsyncQueryMapResult: """ Iterates over all key-pairs located at the given module and storage_function. The storage @@ -2982,13 +2984,15 @@ async def query_map( # ) result = await asyncio.get_running_loop().run_in_executor( executor, - _decode_query_map, + decode_query_map, result_group["changes"], prefix, runtime.registry.registry, param_types, params, - value_type, key_hashers, ignore_decoding_errors + value_type, + key_hashers, + ignore_decoding_errors, ) # max_workers = executor._max_workers # result_group_changes_groups = [result_group["changes"][i:i + max_workers] for i in range(0, len(result_group["changes"]), max_workers)] @@ -3006,13 +3010,15 @@ async def query_map( # for r in all_results: # result.extend(r) else: - result = _decode_query_map( + result = decode_query_map( result_group["changes"], prefix, runtime.registry.registry, param_types, params, - value_type, key_hashers, ignore_decoding_errors + value_type, + key_hashers, + ignore_decoding_errors, ) return AsyncQueryMapResult( records=result, @@ -3025,7 +3031,7 @@ async def query_map( last_key=last_key, max_results=max_results, ignore_decoding_errors=ignore_decoding_errors, - executor=executor + executor=executor, ) async def submit_extrinsic( diff --git a/async_substrate_interface/utils/decoding_attempt.py b/async_substrate_interface/utils/decoding_attempt.py index c9d3718..259b4cc 100644 --- a/async_substrate_interface/utils/decoding_attempt.py +++ b/async_substrate_interface/utils/decoding_attempt.py @@ -9,11 +9,12 @@ class ScaleObj: def __init__(self, value): self.value = value + def _decode_scale_with_runtime( - type_string: str, - scale_bytes: bytes, - runtime_registry: "Runtime", - return_scale_obj: bool = False + type_string: str, + scale_bytes: bytes, + runtime_registry: "Runtime", + return_scale_obj: bool = False, ): if scale_bytes == b"": return None @@ -27,8 +28,17 @@ def _decode_scale_with_runtime( else: return obj -def _decode_query_map(result_group_changes, prefix, runtime_registry, - param_types, params, value_type, key_hashers, ignore_decoding_errors): + +def decode_query_map( + result_group_changes, + prefix, + runtime_registry, + param_types, + params, + value_type, + key_hashers, + ignore_decoding_errors, +): def concat_hash_len(key_hasher: str) -> int: """ Helper function to avoid if statements @@ -51,16 +61,14 @@ def concat_hash_len(key_hasher: str) -> int: # Determine type string key_type_string = [] for n in range(len(params), len(param_types)): - key_type_string.append( - f"[u8; {concat_hash_len(key_hashers[n])}]" - ) + key_type_string.append(f"[u8; {concat_hash_len(key_hashers[n])}]") key_type_string.append(param_types[n]) item_key_obj = _decode_scale_with_runtime( f"({', '.join(key_type_string)})", - bytes.fromhex(item[0][len(prefix):]), + bytes.fromhex(item[0][len(prefix) :]), runtime_registry, - False + False, ) # strip key_hashers to use as item key @@ -81,10 +89,7 @@ def concat_hash_len(key_hasher: str) -> int: item_bytes = hex_to_bytes_(item[1]) item_value = _decode_scale_with_runtime( - value_type, - item_bytes, - runtime_registry, - True + value_type, item_bytes, runtime_registry, True ) except Exception as _: From 05ddb4ef7ed5d3289b944f3337f074a8d69c8cf6 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 31 Mar 2025 22:46:16 +0200 Subject: [PATCH 3/5] Scrapped the multiprocessing idea. Now using decode_list from bt-decode --- async_substrate_interface/async_substrate.py | 67 ++--------- async_substrate_interface/utils/decoding.py | 100 ++++++++++++++++- .../utils/decoding_attempt.py | 104 ------------------ pyproject.toml | 2 +- 4 files changed, 110 insertions(+), 163 deletions(-) delete mode 100644 async_substrate_interface/utils/decoding_attempt.py diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 21be027..e8a95aa 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -56,14 +56,12 @@ ) from async_substrate_interface.utils.storage import StorageKey from async_substrate_interface.type_registry import _TYPE_REGISTRY -from async_substrate_interface.utils.decoding_attempt import ( +from async_substrate_interface.utils.decoding import ( decode_query_map, - _decode_scale_with_runtime, ) if TYPE_CHECKING: from websockets.asyncio.client import ClientConnection - from concurrent.futures import ProcessPoolExecutor ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]] @@ -418,7 +416,6 @@ def __init__( last_key: Optional[str] = None, max_results: Optional[int] = None, ignore_decoding_errors: bool = False, - executor: Optional["ProcessPoolExecutor"] = None, ): self.records = records self.page_size = page_size @@ -431,7 +428,6 @@ def __init__( self.params = params self.ignore_decoding_errors = ignore_decoding_errors self.loading_complete = False - self.executor = executor self._buffer = iter(self.records) # Initialize the buffer with initial records async def retrieve_next_page(self, start_key) -> list: @@ -444,7 +440,6 @@ async def retrieve_next_page(self, start_key) -> list: start_key=start_key, max_results=self.max_results, ignore_decoding_errors=self.ignore_decoding_errors, - executor=self.executor, ) if len(result.records) < self.page_size: self.loading_complete = True @@ -2867,7 +2862,6 @@ async def query_map( page_size: int = 100, ignore_decoding_errors: bool = False, reuse_block_hash: bool = False, - executor: Optional["ProcessPoolExecutor"] = None, ) -> AsyncQueryMapResult: """ Iterates over all key-pairs located at the given module and storage_function. The storage @@ -2972,54 +2966,16 @@ async def query_map( if "error" in response: raise SubstrateRequestException(response["error"]["message"]) for result_group in response["result"]: - if executor: - # print( - # ("prefix", type("prefix")), - # ("runtime_registry", type(runtime.registry)), - # ("param_types", type(param_types)), - # ("params", type(params)), - # ("value_type", type(value_type)), - # ("key_hasher", type(key_hashers)), - # ("ignore_decoding_errors", type(ignore_decoding_errors)), - # ) - result = await asyncio.get_running_loop().run_in_executor( - executor, - decode_query_map, - result_group["changes"], - prefix, - runtime.registry.registry, - param_types, - params, - value_type, - key_hashers, - ignore_decoding_errors, - ) - # max_workers = executor._max_workers - # result_group_changes_groups = [result_group["changes"][i:i + max_workers] for i in range(0, len(result_group["changes"]), max_workers)] - # all_results = executor.map( - # self._decode_query_map, - # result_group["changes"], - # repeat(prefix), - # repeat(runtime.registry), - # repeat(param_types), - # repeat(params), - # repeat(value_type), - # repeat(key_hashers), - # repeat(ignore_decoding_errors) - # ) - # for r in all_results: - # result.extend(r) - else: - result = decode_query_map( - result_group["changes"], - prefix, - runtime.registry.registry, - param_types, - params, - value_type, - key_hashers, - ignore_decoding_errors, - ) + result = decode_query_map( + result_group["changes"], + prefix, + runtime, + param_types, + params, + value_type, + key_hashers, + ignore_decoding_errors, + ) return AsyncQueryMapResult( records=result, page_size=page_size, @@ -3031,7 +2987,6 @@ async def query_map( last_key=last_key, max_results=max_results, ignore_decoding_errors=ignore_decoding_errors, - executor=executor, ) async def submit_extrinsic( diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py index f0ce439..db05b39 100644 --- a/async_substrate_interface/utils/decoding.py +++ b/async_substrate_interface/utils/decoding.py @@ -1,6 +1,14 @@ -from typing import Union +from typing import Union, TYPE_CHECKING -from bt_decode import AxonInfo, PrometheusInfo +from bt_decode import AxonInfo, PrometheusInfo, decode_list +from scalecodec import ss58_encode +from bittensor_wallet.utils import SS58_FORMAT + +from async_substrate_interface.utils import hex_to_bytes +from async_substrate_interface.types import ScaleObj + +if TYPE_CHECKING: + from async_substrate_interface.types import Runtime def _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value) -> bool: @@ -44,3 +52,91 @@ def _bt_decode_to_dict_or_list(obj) -> Union[dict, list[dict]]: else: as_dict[key] = val return as_dict + + +def _decode_scale_list_with_runtime( + type_string: list[str], + scale_bytes: list[bytes], + runtime_registry, + return_scale_obj: bool = False, +): + if scale_bytes == b"": + return None + if type_string == "scale_info::0": # Is an AccountId + # Decode AccountId bytes to SS58 address + return ss58_encode(scale_bytes, SS58_FORMAT) + else: + obj = decode_list(type_string, runtime_registry, scale_bytes) + if return_scale_obj: + return [ScaleObj(x) for x in obj] + else: + return obj + + +def decode_query_map( + result_group_changes, + prefix, + runtime: "Runtime", + param_types, + params, + value_type, + key_hashers, + ignore_decoding_errors, +): + def concat_hash_len(key_hasher: str) -> int: + """ + Helper function to avoid if statements + """ + if key_hasher == "Blake2_128Concat": + return 16 + elif key_hasher == "Twox64Concat": + return 8 + elif key_hasher == "Identity": + return 0 + else: + raise ValueError("Unsupported hash type") + + hex_to_bytes_ = hex_to_bytes + + result = [] + # Determine type string + key_type_string_ = [] + for n in range(len(params), len(param_types)): + key_type_string_.append(f"[u8; {concat_hash_len(key_hashers[n])}]") + key_type_string_.append(param_types[n]) + key_type_string = f"({', '.join(key_type_string_)})" + + pre_decoded_keys = [] + pre_decoded_key_types = [key_type_string] * len(result_group_changes) + pre_decoded_values = [] + pre_decoded_value_types = [value_type] * len(result_group_changes) + + for item in result_group_changes: + pre_decoded_keys.append(bytes.fromhex(item[0][len(prefix) :])) + pre_decoded_values.append(hex_to_bytes_(item[1])) + all_decoded = _decode_scale_list_with_runtime( + pre_decoded_key_types + pre_decoded_value_types, + pre_decoded_keys + pre_decoded_values, + runtime.registry, + ) + middl_index = len(all_decoded) // 2 + decoded_keys = all_decoded[:middl_index] + decoded_values = [ScaleObj(x) for x in all_decoded[middl_index:]] + for dk, dv in zip(decoded_keys, decoded_values): + try: + # strip key_hashers to use as item key + if len(param_types) - len(params) == 1: + item_key = dk[1] + else: + item_key = tuple( + dk[key + 1] for key in range(len(params), len(param_types) + 1, 2) + ) + + except Exception as _: + if not ignore_decoding_errors: + raise + item_key = None + + item_value = dv + result.append([item_key, item_value]) + return result diff --git a/async_substrate_interface/utils/decoding_attempt.py b/async_substrate_interface/utils/decoding_attempt.py deleted file mode 100644 index 259b4cc..0000000 --- a/async_substrate_interface/utils/decoding_attempt.py +++ /dev/null @@ -1,104 +0,0 @@ -from scalecodec import ss58_encode - -from async_substrate_interface.utils import hex_to_bytes -from bt_decode import decode as decode_by_type_string, PortableRegistry -from bittensor_wallet.utils import SS58_FORMAT - - -class ScaleObj: - def __init__(self, value): - self.value = value - - -def _decode_scale_with_runtime( - type_string: str, - scale_bytes: bytes, - runtime_registry: "Runtime", - return_scale_obj: bool = False, -): - if scale_bytes == b"": - return None - if type_string == "scale_info::0": # Is an AccountId - # Decode AccountId bytes to SS58 address - return ss58_encode(scale_bytes, SS58_FORMAT) - else: - obj = decode_by_type_string(type_string, runtime_registry, scale_bytes) - if return_scale_obj: - return ScaleObj(obj) - else: - return obj - - -def decode_query_map( - result_group_changes, - prefix, - runtime_registry, - param_types, - params, - value_type, - key_hashers, - ignore_decoding_errors, -): - def concat_hash_len(key_hasher: str) -> int: - """ - Helper function to avoid if statements - """ - if key_hasher == "Blake2_128Concat": - return 16 - elif key_hasher == "Twox64Concat": - return 8 - elif key_hasher == "Identity": - return 0 - else: - raise ValueError("Unsupported hash type") - - hex_to_bytes_ = hex_to_bytes - runtime_registry = PortableRegistry.from_json(runtime_registry) - - result = [] - for item in result_group_changes: - try: - # Determine type string - key_type_string = [] - for n in range(len(params), len(param_types)): - key_type_string.append(f"[u8; {concat_hash_len(key_hashers[n])}]") - key_type_string.append(param_types[n]) - - item_key_obj = _decode_scale_with_runtime( - f"({', '.join(key_type_string)})", - bytes.fromhex(item[0][len(prefix) :]), - runtime_registry, - False, - ) - - # strip key_hashers to use as item key - if len(param_types) - len(params) == 1: - item_key = item_key_obj[1] - else: - item_key = tuple( - item_key_obj[key + 1] - for key in range(len(params), len(param_types) + 1, 2) - ) - - except Exception as _: - if not ignore_decoding_errors: - raise - item_key = None - - try: - item_bytes = hex_to_bytes_(item[1]) - - item_value = _decode_scale_with_runtime( - value_type, item_bytes, runtime_registry, True - ) - - except Exception as _: - if not ignore_decoding_errors: - raise - item_value = None - result.append([item_key, item_value]) - return result - - -if __name__ == "__main__": - pass diff --git a/pyproject.toml b/pyproject.toml index 3fff69a..d9be720 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "wheel", "asyncstdlib~=3.13.0", "bittensor-wallet>=2.1.3", - "bt-decode==v0.5.0", + "bt-decode==v0.6.0", "scalecodec~=1.2.11", "websockets>=14.1", "xxhash" From 1a5d18a8570b20c9f663cff3285ca2559d3a39cc Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 31 Mar 2025 22:54:42 +0200 Subject: [PATCH 4/5] Apply logic to sync_substrate as well --- async_substrate_interface/sync_substrate.py | 69 ++++----------------- 1 file changed, 12 insertions(+), 57 deletions(-) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 9ff45a9..dc3dc75 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -34,6 +34,7 @@ from async_substrate_interface.utils.decoding import ( _determine_if_old_runtime_call, _bt_decode_to_dict_or_list, + decode_query_map, ) from async_substrate_interface.utils.storage import StorageKey from async_substrate_interface.type_registry import _TYPE_REGISTRY @@ -2600,7 +2601,7 @@ def query_map( block_hash = self._get_current_block_hash(block_hash, reuse_block_hash) if block_hash: self.last_block_hash = block_hash - self.init_runtime(block_hash=block_hash) + runtime = self.init_runtime(block_hash=block_hash) metadata_pallet = self.runtime.metadata.get_metadata_pallet(module) if not metadata_pallet: @@ -2656,19 +2657,6 @@ def query_map( result = [] last_key = None - def concat_hash_len(key_hasher: str) -> int: - """ - Helper function to avoid if statements - """ - if key_hasher == "Blake2_128Concat": - return 16 - elif key_hasher == "Twox64Concat": - return 8 - elif key_hasher == "Identity": - return 0 - else: - raise ValueError("Unsupported hash type") - if len(result_keys) > 0: last_key = result_keys[-1] @@ -2681,49 +2669,16 @@ def concat_hash_len(key_hasher: str) -> int: raise SubstrateRequestException(response["error"]["message"]) for result_group in response["result"]: - for item in result_group["changes"]: - try: - # Determine type string - key_type_string = [] - for n in range(len(params), len(param_types)): - key_type_string.append( - f"[u8; {concat_hash_len(key_hashers[n])}]" - ) - key_type_string.append(param_types[n]) - - item_key_obj = self.decode_scale( - type_string=f"({', '.join(key_type_string)})", - scale_bytes=bytes.fromhex(item[0][len(prefix) :]), - return_scale_obj=True, - ) - - # strip key_hashers to use as item key - if len(param_types) - len(params) == 1: - item_key = item_key_obj[1] - else: - item_key = tuple( - item_key_obj[key + 1] - for key in range(len(params), len(param_types) + 1, 2) - ) - - except Exception as _: - if not ignore_decoding_errors: - raise - item_key = None - - try: - item_bytes = hex_to_bytes_(item[1]) - - item_value = self.decode_scale( - type_string=value_type, - scale_bytes=item_bytes, - return_scale_obj=True, - ) - except Exception as _: - if not ignore_decoding_errors: - raise - item_value = None - result.append([item_key, item_value]) + result = decode_query_map( + result_group["changes"], + prefix, + runtime, + param_types, + params, + value_type, + key_hashers, + ignore_decoding_errors, + ) return QueryMapResult( records=result, page_size=page_size, From 9871dfa9c4afcc89a361adf0986aa13ee8780d5d Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 31 Mar 2025 23:07:16 +0200 Subject: [PATCH 5/5] Updated naming, removed irrelevant parts --- async_substrate_interface/utils/decoding.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py index db05b39..6dc7f21 100644 --- a/async_substrate_interface/utils/decoding.py +++ b/async_substrate_interface/utils/decoding.py @@ -55,18 +55,12 @@ def _bt_decode_to_dict_or_list(obj) -> Union[dict, list[dict]]: def _decode_scale_list_with_runtime( - type_string: list[str], - scale_bytes: list[bytes], + type_strings: list[str], + scale_bytes_list: list[bytes], runtime_registry, return_scale_obj: bool = False, ): - if scale_bytes == b"": - return None - if type_string == "scale_info::0": # Is an AccountId - # Decode AccountId bytes to SS58 address - return ss58_encode(scale_bytes, SS58_FORMAT) - else: - obj = decode_list(type_string, runtime_registry, scale_bytes) + obj = decode_list(type_strings, runtime_registry, scale_bytes_list) if return_scale_obj: return [ScaleObj(x) for x in obj] else: