diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 2cdfe5dfa0..8cffa9632d 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -45,7 +45,7 @@ jobs:
fi
- name: Upload artifact
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
with:
name: dist
path: dist/
@@ -60,7 +60,7 @@ jobs:
steps:
- name: Download artifact
- uses: actions/download-artifact@v3
+ uses: actions/download-artifact@v4
with:
name: dist
path: dist/
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0976792d33..7041322233 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,64 @@
# Changelog
+
+## 8.5.1rc11 /2025-02-03
+## What's Changed
+* Updates `dynamic_info` to add `volume`.
+* Updates `metagraph_info` to add `volume`.
+* Renames metagraph methods in async to `get_metagraph_info` and `get_all_metagraphs_info`.
+
+## 8.5.1rc10 /2025-01-29
+## What's Changed
+* Adds `get_metagraph`, `get_all_metagraphs`: a new structure for metagraph data.
+* Updates other methods, adds new ones like move and transfer stake.
+* Bug fixes and improvements.
+
+## 8.5.1rc9 /2025-01-22
+## What's Changed
+* Updates to the sdk to support methods used in Rao
+
+## 8.5.1rc8 /2025-01-15
+
+## What's Changed
+* Updates `get_stake_for_coldkey` to return only non-zero stakes.
+* Bumps bittensor-cli
+
+## 8.5.1rc7 /2025-01-14
+
+## What's Changed
+* Adds `get_subnets_info`, `get_subnet_info`, `get_stake_for_coldkey_and_hotkey`, `get_stake_for_coldkey`.
+* Updates `add_stake`, `add_stake_multiple`, `unstake`, `unstake_multiple`
+
+## 8.5.1rc6 /2025-01-11
+
+## What's Changed
+* Updates bittensor-cli to 8.2.0rc10
+
+## 8.5.1rc5 /2025-01-09
+
+## What's Changed
+* Updates bittensor-cli to 8.2.0rc9
+
+## 8.5.1rc4 /2025-01-09
+
+## What's Changed
+* Updates bittensor-cli to 8.2.0rc8
+
+## 8.5.1rc3 /2025-01-09
+
+## What's Changed
+* Updates bittensor-cli to 8.2.0rc7
+
+## 8.5.1rc2 /2025-01-09
+
+## What's Changed
+* Fixed units variable name
+
+## 8.5.1rc1 /2025-01-09
+
+## What's Changed
+* Development release of RAO network
+
## 8.5.1 /2024-12-16
## What's Changed
diff --git a/README.md b/README.md
index 109a321030..1573f4e6a0 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,7 @@
# **Bittensor SDK**
+### Rao Development Version
[](https://discord.gg/bittensor)
[](https://badge.fury.io/py/bittensor)
[](https://opensource.org/licenses/MIT)
diff --git a/VERSION b/VERSION
index e0741a834a..74ee28dac1 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-8.5.1
\ No newline at end of file
+8.5.1rc11
\ No newline at end of file
diff --git a/bittensor/core/async_subtensor.py b/bittensor/core/async_subtensor.py
index 2141055cca..33829a83b8 100644
--- a/bittensor/core/async_subtensor.py
+++ b/bittensor/core/async_subtensor.py
@@ -1,28 +1,40 @@
import asyncio
import ssl
+import warnings
from typing import Optional, Any, Union, TypedDict, Iterable
import aiohttp
import numpy as np
import scalecodec
+from async_substrate_interface.errors import SubstrateRequestException
+from async_substrate_interface.substrate_interface import (
+ AsyncSubstrateInterface,
+ QueryMapResult,
+)
from bittensor_wallet import Wallet
from bittensor_wallet.utils import SS58_FORMAT
from numpy.typing import NDArray
from scalecodec import GenericCall
from scalecodec.base import RuntimeConfiguration
from scalecodec.type_registry import load_type_registry_preset
-from substrateinterface.exceptions import SubstrateRequestException
+from scalecodec.types import ScaleType
from bittensor.core.chain_data import (
DelegateInfo,
custom_rpc_type_registry,
StakeInfo,
+ MetagraphInfo,
NeuronInfoLite,
NeuronInfo,
SubnetHyperparameters,
decode_account_id,
+ DynamicInfo,
+)
+from bittensor.core.errors import StakeError
+from bittensor.core.extrinsics.async_registration import (
+ register_extrinsic,
+ burned_register_extrinsic,
)
-from bittensor.core.extrinsics.async_registration import register_extrinsic
from bittensor.core.extrinsics.async_root import (
set_root_weights_extrinsic,
root_register_extrinsic,
@@ -32,6 +44,7 @@
commit_weights_extrinsic,
set_weights_extrinsic,
)
+from bittensor.core.metagraph import Metagraph
from bittensor.core.settings import (
TYPE_REGISTRY,
DEFAULTS,
@@ -48,14 +61,11 @@
validate_chain_endpoint,
hex_to_bytes,
)
-from bittensor.utils.async_substrate_interface import (
- AsyncSubstrateInterface,
- TimeoutException,
-)
-from bittensor.utils.balance import Balance
+from bittensor.utils.balance import Balance, FixedPoint, fixed_to_float
from bittensor.utils.btlogging import logging
from bittensor.utils.delegates_details import DelegatesDetails
from bittensor.utils.weight_utils import generate_weight_hash
+from bittensor.core.subtensor import Subtensor
class ParamWithTypes(TypedDict):
@@ -141,7 +151,7 @@ def __init__(self, network: str = DEFAULT_NETWORK):
self.network = DEFAULTS.subtensor.network
self.substrate = AsyncSubstrateInterface(
- chain_endpoint=self.chain_endpoint,
+ url=self.chain_endpoint,
ss58_format=SS58_FORMAT,
type_registry=TYPE_REGISTRY,
chain_name="Bittensor",
@@ -157,7 +167,7 @@ async def __aenter__(self):
try:
async with self.substrate:
return self
- except TimeoutException:
+ except TimeoutError:
logging.error(
f"[red]Error[/red]: Timeout occurred connecting to substrate. Verify your chain and network settings: {self}"
)
@@ -225,7 +235,151 @@ async def get_hyperparameter(
return result
+ async def determine_block_hash(
+ self,
+ block: Optional[int],
+ block_hash: Optional[str] = None,
+ reuse_block: bool = False,
+ ) -> Optional[str]:
+ # Ensure that only one of the parameters is specified.
+ if sum(bool(x) for x in [block, block_hash, reuse_block]) > 1:
+ raise ValueError(
+ "Only one of `block`, `block_hash`, or `reuse_block` can be specified."
+ )
+
+ # Return the appropriate value.
+ if block_hash:
+ return block_hash
+ if block:
+ return await self.get_block_hash(block)
+ return None
+
+ # Chain calls methods ==============================================================================================
+ async def query_subtensor(
+ self,
+ name: str,
+ block: Optional[int] = None,
+ block_hash: Optional[str] = None,
+ reuse_block: bool = False,
+ params: Optional[list] = None,
+ ) -> "ScaleType":
+ """
+ Queries named storage from the Subtensor module on the Bittensor blockchain. This function is used to retrieve
+ specific data or parameters from the blockchain, such as stake, rank, or other neuron-specific attributes.
+
+ Args:
+ name: The name of the storage function to query.
+ block: The blockchain block number at which to perform the query.
+ block_hash: The hash of the block to retrieve the parameter from. Do not specify if using block or
+ reuse_block
+ reuse_block: Whether to use the last-used block. Do not set if using block_hash or block.
+ params: A list of parameters to pass to the query function.
+
+ Returns:
+ query_response (scalecodec.ScaleType): An object containing the requested data.
+
+ This query function is essential for accessing detailed information about the network and its neurons, providing
+ valuable insights into the state and dynamics of the Bittensor ecosystem.
+ """
+ block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
+ return await self.substrate.query(
+ module="SubtensorModule",
+ storage_function=name,
+ params=params,
+ block_hash=block_hash,
+ reuse_block_hash=reuse_block,
+ )
+
+ async def query_map_subtensor(
+ self,
+ name: str,
+ block: Optional[int] = None,
+ block_hash: Optional[str] = None,
+ reuse_block: bool = False,
+ params: Optional[list] = None,
+ ) -> "QueryMapResult":
+ """
+ Queries map storage from the Subtensor module on the Bittensor blockchain. This function is designed to retrieve
+ a map-like data structure, which can include various neuron-specific details or network-wide attributes.
+
+ Args:
+ name: The name of the map storage function to query.
+ block: The blockchain block number at which to perform the query.
+ block_hash: The hash of the block to retrieve the parameter from. Do not specify if using block or
+ reuse_block
+ reuse_block: Whether to use the last-used block. Do not set if using block_hash or block.
+ params: A list of parameters to pass to the query function.
+
+ Returns:
+ An object containing the map-like data structure, or `None` if not found.
+
+ This function is particularly useful for analyzing and understanding complex network structures and
+ relationships within the Bittensor ecosystem, such as interneuronal connections and stake distributions.
+ """
+ block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
+ return await self.substrate.query_map(
+ module="SubtensorModule",
+ storage_function=name,
+ params=params,
+ block_hash=block_hash,
+ reuse_block_hash=reuse_block,
+ )
+
# Common subtensor methods =========================================================================================
+ async def metagraph(
+ self, netuid: int, lite: bool = True, block: Optional[int] = None
+ ) -> "Metagraph": # type: ignore
+ """
+ Returns a synced metagraph for a specified subnet within the Bittensor network. The metagraph represents the network's structure, including neuron connections and interactions.
+
+ Args:
+ netuid (int): The network UID of the subnet to query.
+ lite (bool): If true, returns a metagraph using a lightweight sync (no weights, no bonds). Default is ``True``.
+ block (Optional[int]): Block number for synchronization, or ``None`` for the latest block.
+
+ Returns:
+ bittensor.core.metagraph.Metagraph: The metagraph representing the subnet's structure and neuron relationships.
+
+ The metagraph is an essential tool for understanding the topology and dynamics of the Bittensor network's decentralized architecture, particularly in relation to neuron interconnectivity and consensus processes.
+ """
+ metagraph = Metagraph(
+ network=self.chain_endpoint,
+ netuid=netuid,
+ lite=lite,
+ sync=False,
+ subtensor=self,
+ )
+ meta_sub = Subtensor(network=self.network)
+ metagraph.sync(block=block, lite=lite, subtensor=meta_sub)
+
+ return metagraph
+
+ async def get_metagraph_info(
+ self, netuid: int, block: Optional[int] = None
+ ) -> Optional[MetagraphInfo]:
+ block_hash = await self.get_block_hash(block)
+
+ query = await self.substrate.runtime_call(
+ "SubnetInfoRuntimeApi",
+ "get_metagraph",
+ params=[netuid],
+ block_hash=block_hash,
+ )
+ metagraph_bytes = bytes.fromhex(query.decode()[2:])
+ return MetagraphInfo.from_vec_u8(metagraph_bytes)
+
+ async def get_all_metagraphs_info(
+ self, block: Optional[int] = None
+ ) -> list[MetagraphInfo]:
+ block_hash = await self.get_block_hash(block)
+
+ query = await self.substrate.runtime_call(
+ "SubnetInfoRuntimeApi",
+ "get_all_metagraphs",
+ block_hash=block_hash,
+ )
+ metagraphs_bytes = bytes.fromhex(query.decode()[2:])
+ return MetagraphInfo.list_from_vec_u8(metagraphs_bytes)
async def get_current_block(self) -> int:
"""
@@ -255,6 +409,403 @@ async def get_block_hash(self, block_id: Optional[int] = None):
else:
return await self.substrate.get_chain_head()
+ async def wait_for_block(self, block: Optional[int] = None):
+ async def _w(_):
+ return True
+
+ if block is None:
+ block = (await self.get_current_block()) + 1
+
+ await self.substrate.wait_for_block(block, _w, False)
+
+ async def get_stake_for_coldkey(
+ self, coldkey_ss58: str, block: Optional[int] = None
+ ) -> Optional[list["StakeInfo"]]:
+ """
+ Retrieves the stake information for a given coldkey.
+
+ Args:
+ coldkey_ss58 (str): The SS58 address of the coldkey.
+ block (Optional[int]): The block number at which to query the stake information.
+
+ Returns:
+ Optional[list[StakeInfo]]: A list of StakeInfo objects, or ``None`` if no stake information is found.
+ """
+ encoded_coldkey = ss58_to_vec_u8(coldkey_ss58)
+ block_hash = await self.get_block_hash(block)
+ hex_bytes_result = await self.query_runtime_api(
+ runtime_api="StakeInfoRuntimeApi",
+ method="get_stake_info_for_coldkey",
+ params=[encoded_coldkey],
+ block_hash=block_hash,
+ )
+
+ if hex_bytes_result is None:
+ return []
+ try:
+ bytes_result = bytes.fromhex(hex_bytes_result[2:])
+ except ValueError:
+ bytes_result = bytes.fromhex(hex_bytes_result)
+
+ stakes = StakeInfo.list_from_vec_u8(bytes_result)
+ return [stake for stake in stakes if stake.stake > 0]
+
+ async def unstake(
+ self,
+ wallet: Wallet,
+ hotkey: str,
+ netuid: int,
+ amount: Union[float, Balance, int],
+ wait_for_inclusion: bool = False,
+ wait_for_finalization: bool = False,
+ ):
+ """
+ Removes a specified amount of stake from a hotkey and coldkey pair.
+
+ Args:
+ wallet (bittensor_wallet.Wallet): The wallet to be used for unstaking.
+ hotkey (str): The ``SS58`` address of the hotkey associated with the neuron.
+ netuid (int): The subnet ID to filter by. If provided, only returns stake for this specific subnet.
+ amount (Union[float, Balance, int]): The amount of TAO to unstake.
+ wait_for_inclusion (bool): Waits for the transaction to be included in a block.
+ wait_for_finalization (bool): Waits for the transaction to be finalized on the blockchain.
+
+ Returns:
+ bool: ``True`` if the unstaking is successful, False otherwise.
+ """
+ if isinstance(amount, (float, int)):
+ amount = Balance(amount)
+
+ call = await self.substrate.compose_call(
+ call_module="SubtensorModule",
+ call_function="remove_stake",
+ call_params={
+ "hotkey": hotkey,
+ "amount_unstaked": amount.rao,
+ "netuid": netuid,
+ },
+ )
+ next_nonce = await self.substrate.get_account_next_index(
+ wallet.coldkeypub.ss58_address
+ )
+ extrinsic = await self.substrate.create_signed_extrinsic(
+ call=call, keypair=wallet.coldkey, nonce=next_nonce
+ )
+ response = await self.substrate.submit_extrinsic(
+ extrinsic,
+ wait_for_inclusion=wait_for_inclusion,
+ wait_for_finalization=wait_for_finalization,
+ )
+ # We only wait here if we expect finalization.
+ if not wait_for_finalization and not wait_for_inclusion:
+ return True
+
+ if await response.is_success:
+ return True
+ else:
+ raise StakeError(format_error_message(await response.error_message))
+
+ remove_stake = unstake
+
+ async def add_stake(
+ self,
+ wallet: "Wallet",
+ hotkey: str,
+ netuid: int,
+ tao_amount: Union[int, float, "Balance"],
+ wait_for_inclusion: bool = False,
+ wait_for_finalization: bool = False,
+ ):
+ if isinstance(tao_amount, (float, int)):
+ tao_amount = Balance.from_tao(tao_amount)
+
+ call = await self.substrate.compose_call(
+ call_module="SubtensorModule",
+ call_function="add_stake",
+ call_params={
+ "hotkey": hotkey,
+ "amount_staked": tao_amount.rao,
+ "netuid": netuid,
+ },
+ )
+ next_nonce = await self.substrate.get_account_next_index(
+ wallet.coldkeypub.ss58_address
+ )
+
+ extrinsic = await self.substrate.create_signed_extrinsic(
+ call=call, keypair=wallet.coldkey, nonce=next_nonce
+ )
+ response = await self.substrate.submit_extrinsic(
+ extrinsic,
+ wait_for_inclusion=wait_for_inclusion,
+ wait_for_finalization=wait_for_finalization,
+ )
+ # We only wait here if we expect finalization.
+ if not wait_for_finalization and not wait_for_inclusion:
+ return True
+
+ if await response.is_success:
+ return True
+ else:
+ raise StakeError(format_error_message(await response.error_message))
+
+ stake = add_stake
+
+ async def transfer_stake(
+ self,
+ wallet: "Wallet",
+ destination_coldkey_ss58: str,
+ hotkey_ss58: str,
+ origin_netuid: int,
+ destination_netuid: int,
+ amount: Union["Balance", float, int],
+ wait_for_inclusion: bool = True,
+ wait_for_finalization: bool = False,
+ ) -> bool:
+ """
+ Transfers stake from one subnet to another. Keeps the same hotkey but destination coldkey is different.
+ Allows moving stake to a different coldkey's control while also having the option to change the subnet.
+
+ Hotkey is the same. Coldkeys are different.
+
+ Args:
+ wallet (bittensor.wallet): The wallet to transfer stake from.
+ destination_coldkey_ss58 (str): The destination coldkey SS58 address. Different from the origin coldkey.
+ hotkey_ss58 (str): The hotkey SS58 address associated with the stake. This is owned by the origin coldkey.
+ origin_netuid (int): The source subnet UID.
+ destination_netuid (int): The destination subnet UID.
+ amount (Union[Balance, float]): Amount to transfer.
+ wait_for_inclusion (bool): Waits for the transaction to be included in a block.
+ wait_for_finalization (bool): Waits for the transaction to be finalized on the blockchain.
+
+ Returns:
+ success (bool): True if the extrinsic was included in a block.
+
+ Raises:
+ StakeError: If the transfer fails due to insufficient stake or other reasons.
+ """
+ if isinstance(amount, (float, int)):
+ amount = Balance.from_tao(amount)
+
+ hotkey_owner = await self.get_hotkey_owner(hotkey_ss58)
+ if hotkey_owner != wallet.coldkeypub.ss58_address:
+ logging.error(
+ f":cross_mark: [red]Failed[/red]: Hotkey: {hotkey_ss58} does not belong to the origin coldkey owner: {wallet.coldkeypub.ss58_address}"
+ )
+ return False
+
+ stake_in_origin = await self.get_stake(
+ hotkey_ss58=hotkey_ss58,
+ coldkey_ss58=wallet.coldkeypub.ss58_address,
+ netuid=origin_netuid,
+ )
+ if stake_in_origin < amount:
+ logging.error(
+ f":cross_mark: [red]Failed[/red]: Insufficient stake in origin hotkey: {hotkey_ss58}. Stake: {stake_in_origin}, amount: {amount}"
+ )
+ return False
+
+ call = await self.substrate.compose_call(
+ call_module="SubtensorModule",
+ call_function="transfer_stake",
+ call_params={
+ "destination_coldkey": destination_coldkey_ss58,
+ "hotkey": hotkey_ss58,
+ "origin_netuid": origin_netuid,
+ "destination_netuid": destination_netuid,
+ "alpha_amount": amount.rao,
+ },
+ )
+ next_nonce = await self.substrate.get_account_next_index(
+ wallet.coldkeypub.ss58_address
+ )
+ extrinsic = await self.substrate.create_signed_extrinsic(
+ call=call, keypair=wallet.coldkey, nonce=next_nonce
+ )
+ response = await self.substrate.submit_extrinsic(
+ extrinsic,
+ wait_for_inclusion=wait_for_inclusion,
+ wait_for_finalization=wait_for_finalization,
+ )
+ if not wait_for_finalization and not wait_for_inclusion:
+ return True
+
+ if await response.is_success:
+ return True
+ else:
+ logging.error(
+ f":cross_mark: [red]Failed[/red]: {format_error_message(await response.error_message)}"
+ )
+ return False
+
+ async def swap_stake(
+ self,
+ wallet: "Wallet",
+ hotkey_ss58: str,
+ origin_netuid: int,
+ destination_netuid: int,
+ amount: Union["Balance", float, int],
+ wait_for_inclusion: bool = True,
+ wait_for_finalization: bool = False,
+ ) -> bool:
+ """
+ Moves stake between subnets while keeping the same coldkey-hotkey pair ownership.
+ Like subnet hopping - same owner, same hotkey, just changing which subnet the stake is in.
+
+ Both hotkey and coldkey are the same.
+
+ Args:
+ wallet (bittensor.wallet): The wallet to transfer stake from.
+ hotkey_ss58 (str): The SS58 address of the hotkey whose stake is being swapped.
+ origin_netuid (int): The netuid from which stake is removed.
+ destination_netuid (int): The netuid to which stake is added.
+ amount (Union[Balance, float, int]): The amount to swap.
+ wait_for_inclusion (bool): Waits for the transaction to be included in a block.
+ wait_for_finalization (bool): Waits for the transaction to be finalized on the blockchain.
+
+ Returns:
+ success (bool): True if the extrinsic was successful.
+ """
+ if isinstance(amount, (float, int)):
+ amount = Balance.from_tao(amount)
+
+ hotkey_owner = await self.get_hotkey_owner(hotkey_ss58)
+ if hotkey_owner != wallet.coldkeypub.ss58_address:
+ logging.error(
+ f":cross_mark: [red]Failed[/red]: Hotkey: {hotkey_ss58} does not belong to the origin coldkey owner: {wallet.coldkeypub.ss58_address}"
+ )
+ return False
+
+ stake_in_origin = await self.get_stake(
+ hotkey_ss58=hotkey_ss58,
+ coldkey_ss58=wallet.coldkeypub.ss58_address,
+ netuid=origin_netuid,
+ )
+ if stake_in_origin < amount:
+ logging.error(
+ f":cross_mark: [red]Failed[/red]: Insufficient stake in origin hotkey: {hotkey_ss58}. Stake: {stake_in_origin}, amount: {amount}"
+ )
+ return False
+
+ call = await self.substrate.compose_call(
+ call_module="SubtensorModule",
+ call_function="swap_stake",
+ call_params={
+ "hotkey": hotkey_ss58,
+ "origin_netuid": origin_netuid,
+ "destination_netuid": destination_netuid,
+ "alpha_amount": amount.rao,
+ },
+ )
+ next_nonce = await self.substrate.get_account_next_index(
+ wallet.coldkeypub.ss58_address
+ )
+ extrinsic = await self.substrate.create_signed_extrinsic(
+ call=call,
+ keypair=wallet.coldkey,
+ nonce=next_nonce,
+ )
+ response = await self.substrate.submit_extrinsic(
+ extrinsic,
+ wait_for_inclusion=wait_for_inclusion,
+ wait_for_finalization=wait_for_finalization,
+ )
+ if not wait_for_finalization and not wait_for_inclusion:
+ return True
+
+ if await response.is_success:
+ return True
+ else:
+ logging.error(
+ f":cross_mark: [red]Failed[/red]: {format_error_message(await response.error_message)}"
+ )
+ return False
+
+ async def move_stake(
+ self,
+ wallet: "Wallet",
+ origin_hotkey: str,
+ origin_netuid: int,
+ destination_hotkey: str,
+ destination_netuid: int,
+ amount: Union["Balance", float, int],
+ wait_for_inclusion: bool = True,
+ wait_for_finalization: bool = False,
+ ) -> bool:
+ """
+ Moves stake to a different hotkey and/or subnet while keeping the same coldkey owner.
+ Flexible movement allowing changes to both hotkey and subnet under the same coldkey's control.
+
+ Coldkey is the same. Hotkeys can be different.
+
+ Args:
+ wallet (bittensor.wallet): The wallet to transfer stake from.
+ origin_hotkey (str): The SS58 address of the source hotkey.
+ origin_netuid (int): The netuid of the source subnet.
+ destination_hotkey (str): The SS58 address of the destination hotkey.
+ destination_netuid (int): The netuid of the destination subnet.
+ amount (Union[Balance, float, int]): Amount of stake to move.
+ wait_for_inclusion (bool): Waits for the transaction to be included in a block. Default is True.
+ wait_for_finalization (bool): Waits for the transaction to be finalized on the blockchain. Default is False.
+
+ Returns:
+ bool: True if the stake movement was successful, False otherwise.
+
+ Raises:
+ StakeError: If the movement fails due to insufficient stake or other reasons.
+ """
+ if isinstance(amount, (float, int)):
+ amount = Balance.from_tao(amount)
+
+ stake_in_origin = await self.get_stake(
+ hotkey_ss58=origin_hotkey,
+ coldkey_ss58=wallet.coldkeypub.ss58_address,
+ netuid=origin_netuid,
+ )
+ if stake_in_origin < amount:
+ logging.error(
+ f":cross_mark: [red]Failed[/red]: Insufficient stake in origin hotkey: {origin_hotkey}. Stake: {stake_in_origin}, amount: {amount}"
+ )
+ return False
+
+ call = await self.substrate.compose_call(
+ call_module="SubtensorModule",
+ call_function="move_stake",
+ call_params={
+ "origin_hotkey": origin_hotkey,
+ "origin_netuid": origin_netuid,
+ "destination_hotkey": destination_hotkey,
+ "destination_netuid": destination_netuid,
+ "alpha_amount": amount.rao,
+ },
+ )
+
+ next_nonce = await self.substrate.get_account_next_index(
+ wallet.coldkeypub.ss58_address
+ )
+ extrinsic = await self.substrate.create_signed_extrinsic(
+ call=call,
+ keypair=wallet.coldkey,
+ nonce=next_nonce,
+ )
+
+ response = await self.substrate.submit_extrinsic(
+ extrinsic,
+ wait_for_inclusion=wait_for_inclusion,
+ wait_for_finalization=wait_for_finalization,
+ )
+
+ if not wait_for_finalization and not wait_for_inclusion:
+ return True
+
+ if await response.is_success:
+ return True
+ else:
+ logging.error(
+ f":cross_mark: [red]Failed[/red]: {format_error_message(await response.error_message)}"
+ )
+ return False
+
async def is_hotkey_registered_any(
self,
hotkey_ss58: str,
@@ -328,33 +879,91 @@ async def get_total_subnets(
)
return result
- async def get_subnets(
- self, block_hash: Optional[str] = None, reuse_block: bool = False
+ async def get_netuids(
+ self, block: Optional[int] = None, block_hash: Optional[str] = None
) -> list[int]:
"""
- Retrieves the list of all subnet unique identifiers (netuids) currently present in the Bittensor network.
+ Retrieves a list of all subnets currently active within the Bittensor network. This function provides an overview of the various subnets and their identifiers.
Args:
- block_hash (Optional[str]): The hash of the block to retrieve the subnet unique identifiers from.
- reuse_block (bool): Whether to reuse the last-used block hash.
+ block (Optional[int]): The blockchain block number for the query.
+ block_hash (Optional[str]): The hash of the blockchain block number for the query.
Returns:
- A list of subnet netuids.
+ list[int]: A list of network UIDs representing each active subnet.
- This function provides a comprehensive view of the subnets within the Bittensor network,
- offering insights into its diversity and scale.
+ This function is valuable for understanding the network's structure and the diversity of subnets available for neuron participation and collaboration.
"""
- result = await self.substrate.query_map(
- module="SubtensorModule",
- storage_function="NetworksAdded",
+ block_hash = await self.determine_block_hash(block, block_hash)
+ result = await self.query_map_subtensor("NetworksAdded", block_hash=block_hash)
+ return (
+ [network[0] for network in result.records if network[1]]
+ if result and hasattr(result, "records")
+ else []
+ )
+
+ async def all_subnets(
+ self, block_number: int = None
+ ) -> Optional[list["DynamicInfo"]]:
+ """
+ Retrieves the subnet information for all subnets in the Bittensor network.
+
+ Args:
+ block_number (Optional[int]): The block number to get the subnets at.
+
+ Returns:
+ Optional[DynamicInfo]: A list of DynamicInfo objects, each containing detailed information about a subnet.
+
+ """
+ if block_number is not None:
+ block_hash = await self.get_block_hash(block_number)
+ else:
+ block_hash = None
+ query = await self.substrate.runtime_call(
+ "SubnetInfoRuntimeApi",
+ "get_all_dynamic_info",
block_hash=block_hash,
- reuse_block_hash=reuse_block,
)
- return (
- []
- if result is None or not hasattr(result, "records")
- else [netuid async for netuid, exists in result if exists]
+ return DynamicInfo.list_from_vec_u8(bytes.fromhex(query.decode()[2:]))
+
+ get_subnets_info = all_subnets
+ get_all_subnets = all_subnets
+
+ async def subnet(
+ self, netuid: int, block_number: int = None
+ ) -> Optional[DynamicInfo]:
+ """
+ Retrieves the subnet information for a single subnet in the Bittensor network.
+
+ Args:
+ netuid (int): The unique identifier of the subnet.
+ block_number (Optional[int]): The block number to get the subnets at.
+
+ Returns:
+ Optional[DynamicInfo]: A DynamicInfo object, containing detailed information about a subnet.
+
+ This function can be called in two ways:
+ 1. As a context manager:
+ async with sub:
+ subnet = await sub.subnet(1)
+ 2. Directly:
+ subnet = await sub.subnet(1)
+ """
+ if block_number is not None:
+ block_hash = await self.get_block_hash(block_number)
+ else:
+ block_hash = None
+ query = await self.substrate.runtime_call(
+ "SubnetInfoRuntimeApi",
+ "get_dynamic_info",
+ params=[netuid],
+ block_hash=block_hash,
)
+ subnet = DynamicInfo.from_vec_u8(bytes.fromhex(query.decode()[2:]))
+ return subnet
+
+ get_subnet_info = subnet
+ get_subnet = subnet
async def is_hotkey_delegate(
self,
@@ -443,29 +1052,52 @@ async def get_stake_for_coldkey_and_hotkey(
self,
hotkey_ss58: str,
coldkey_ss58: str,
- block_hash: Optional[str] = None,
+ netuid: int,
+ block: Optional[int] = None,
reuse_block: bool = False,
) -> Balance:
"""
- Retrieves stake information associated with a specific coldkey and hotkey.
+ Returns the stake under a coldkey - hotkey pairing.
Args:
- hotkey_ss58 (str): the hotkey SS58 address to query
- coldkey_ss58 (str): the coldkey SS58 address to query
- block_hash (Optional[str]): the hash of the blockchain block number for the query.
- reuse_block (Optional[bool]): whether to reuse the last-used block hash.
-
+ hotkey_ss58 (str): The SS58 address of the hotkey.
+ coldkey_ss58 (str): The SS58 address of the coldkey.
+ netuid (Optional[int]): The subnet ID to filter by. If provided, only returns stake for this specific subnet.
+ block (Optional[int]): The block number at which to query the stake information.
+ reuse_block (bool): Whether to reuse the last-used block hash.
Returns:
- Stake Balance for the given coldkey and hotkey
+ Balance: The stake under the coldkey - hotkey pairing.
"""
- _result = await self.substrate.query(
- module="SubtensorModule",
- storage_function="Stake",
- params=[hotkey_ss58, coldkey_ss58],
- block_hash=block_hash,
- reuse_block_hash=reuse_block,
+ alpha_shares: FixedPoint = await self.query_subtensor(
+ name="Alpha",
+ block=block,
+ reuse_block=reuse_block,
+ params=[hotkey_ss58, coldkey_ss58, netuid],
+ )
+ hotkey_alpha: int = await self.query_subtensor(
+ name="TotalHotkeyAlpha",
+ block=block,
+ reuse_block=reuse_block,
+ params=[hotkey_ss58, netuid],
)
- return Balance.from_rao(_result or 0)
+ hotkey_shares: FixedPoint = await self.query_subtensor(
+ name="TotalHotkeyShares",
+ block=block,
+ reuse_block=reuse_block,
+ params=[hotkey_ss58, netuid],
+ )
+
+ alpha_shares_as_float = fixed_to_float(alpha_shares)
+ hotkey_shares_as_float = fixed_to_float(hotkey_shares)
+
+ if hotkey_shares_as_float == 0:
+ return Balance.from_rao(0)
+
+ stake = alpha_shares_as_float / hotkey_shares_as_float * hotkey_alpha.value
+
+ return Balance.from_rao(int(stake)).set_unit(netuid=netuid)
+
+ get_stake = get_stake_for_coldkey_and_hotkey
async def query_runtime_api(
self,
@@ -560,6 +1192,8 @@ async def get_balance(
results.update({item[0].params[0]: Balance(value["data"]["free"])})
return results
+ balance = get_balance
+
async def get_transfer_fee(
self, wallet: "Wallet", dest: str, value: Union["Balance", float, int]
) -> "Balance":
@@ -626,26 +1260,12 @@ async def get_total_stake_for_coldkey(
Returns:
Dict in view {address: Balance objects}.
"""
- if reuse_block:
- block_hash = self.substrate.last_block_hash
- elif not block_hash:
- block_hash = await self.get_block_hash()
- calls = [
- (
- await self.substrate.create_storage_key(
- "SubtensorModule",
- "TotalColdkeyStake",
- [address],
- block_hash=block_hash,
- )
- )
- for address in ss58_addresses
- ]
- batch_call = await self.substrate.query_multi(calls, block_hash=block_hash)
- results = {}
- for item in batch_call:
- results.update({item[0].params[0]: Balance.from_rao(item[1] or 0)})
- return results
+ warnings.simplefilter("default", DeprecationWarning)
+ warnings.warn(
+ "get_total_stake_for_coldkey is not available in the Rao network at the moment. Please use get_stake_for_coldkey instead.",
+ category=DeprecationWarning,
+ stacklevel=2,
+ )
async def get_total_stake_for_hotkey(
self,
@@ -876,6 +1496,33 @@ async def neurons_lite(
return NeuronInfoLite.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
+ async def burned_register(
+ self,
+ wallet: "Wallet",
+ netuid: int,
+ wait_for_inclusion: bool = False,
+ wait_for_finalization: bool = True,
+ ) -> bool:
+ """
+ Registers a neuron on the Bittensor network by recycling TAO. This method of registration involves recycling TAO tokens, allowing them to be re-mined by performing work on the network.
+
+ Args:
+ wallet (bittensor_wallet.Wallet): The wallet associated with the neuron to be registered.
+ netuid (int): The unique identifier of the subnet.
+ wait_for_inclusion (bool, optional): Waits for the transaction to be included in a block. Defaults to `False`.
+ wait_for_finalization (bool, optional): Waits for the transaction to be finalized on the blockchain. Defaults to `True`.
+
+ Returns:
+ bool: ``True`` if the registration is successful, False otherwise.
+ """
+ return await burned_register_extrinsic(
+ subtensor=self,
+ wallet=wallet,
+ netuid=netuid,
+ wait_for_inclusion=wait_for_inclusion,
+ wait_for_finalization=wait_for_finalization,
+ )
+
async def get_neuron_for_pubkey_and_subnet(
self,
hotkey_ss58: str,
@@ -1413,6 +2060,22 @@ async def weights_rate_limit(
)
return None if call is None else int(call)
+ async def recycle(self, netuid: int) -> Optional["Balance"]:
+ """
+ Retrieves the 'Burn' hyperparameter for a specified subnet. The 'Burn' parameter represents the amount of Tao that is effectively recycled within the Bittensor network.
+
+ Args:
+ netuid (int): The unique identifier of the subnet.
+ block (Optional[int]): The blockchain block number for the query.
+
+ Returns:
+ Optional[Balance]: The value of the 'Burn' hyperparameter if the subnet exists, None otherwise.
+
+ Understanding the 'Burn' rate is essential for analyzing the network registration usage, particularly how it is correlated with user activity and the overall cost of participation in a given subnet.
+ """
+ call = await self.get_hyperparameter(param_name="Burn", netuid=netuid)
+ return None if call is None else Balance.from_rao(int(call.value))
+
async def blocks_since_last_update(self, netuid: int, uid: int) -> Optional[int]:
"""
Returns the number of blocks since the last update for a specific UID in the subnetwork.
diff --git a/bittensor/core/chain_data/__init__.py b/bittensor/core/chain_data/__init__.py
index 760eaa3354..20ed1fa684 100644
--- a/bittensor/core/chain_data/__init__.py
+++ b/bittensor/core/chain_data/__init__.py
@@ -9,15 +9,43 @@
from .delegate_info import DelegateInfo
from .delegate_info_lite import DelegateInfoLite
from .ip_info import IPInfo
+from .metagraph_info import MetagraphInfo
from .neuron_info import NeuronInfo
from .neuron_info_lite import NeuronInfoLite
from .neuron_certificate import NeuronCertificate
from .prometheus_info import PrometheusInfo
from .proposal_vote_data import ProposalVoteData
from .scheduled_coldkey_swap_info import ScheduledColdkeySwapInfo
+from .subnet_state import SubnetState
from .stake_info import StakeInfo
from .subnet_hyperparameters import SubnetHyperparameters
from .subnet_info import SubnetInfo
+from .dynamic_info import DynamicInfo
+from .subnet_identity import SubnetIdentity
from .utils import custom_rpc_type_registry, decode_account_id, process_stake_data
ProposalCallData = GenericCall
+
+__all__ = [
+ AxonInfo,
+ DelegateInfo,
+ DelegateInfoLite,
+ IPInfo,
+ MetagraphInfo,
+ NeuronInfo,
+ NeuronInfoLite,
+ NeuronCertificate,
+ PrometheusInfo,
+ ProposalVoteData,
+ ScheduledColdkeySwapInfo,
+ SubnetState,
+ StakeInfo,
+ SubnetHyperparameters,
+ SubnetInfo,
+ DynamicInfo,
+ SubnetIdentity,
+ custom_rpc_type_registry,
+ decode_account_id,
+ process_stake_data,
+ ProposalCallData,
+]
diff --git a/bittensor/core/chain_data/chain_identity.py b/bittensor/core/chain_data/chain_identity.py
new file mode 100644
index 0000000000..f66de75410
--- /dev/null
+++ b/bittensor/core/chain_data/chain_identity.py
@@ -0,0 +1,15 @@
+from dataclasses import dataclass
+
+
+@dataclass
+class ChainIdentity:
+ """Dataclass for chain identity information."""
+
+ # In `bittensor.core.chain_data.utils.custom_rpc_type_registry` represents as `ChainIdentityOf` structure.
+
+ name: str
+ url: str
+ image: str
+ discord: str
+ description: str
+ additional: str
diff --git a/bittensor/core/chain_data/dynamic_info.py b/bittensor/core/chain_data/dynamic_info.py
new file mode 100644
index 0000000000..79232bddcc
--- /dev/null
+++ b/bittensor/core/chain_data/dynamic_info.py
@@ -0,0 +1,249 @@
+"""
+This module defines the `DynamicInfo` data class and associated methods for handling and decoding
+dynamic information in the Bittensor network.
+"""
+
+from dataclasses import dataclass
+from typing import Optional, Union
+
+from scalecodec.utils.ss58 import ss58_encode
+
+from bittensor.core.chain_data.utils import (
+ ChainDataType,
+ from_scale_encoding,
+ SS58_FORMAT,
+)
+from bittensor.core.chain_data.subnet_identity import SubnetIdentity
+from bittensor.utils.balance import Balance
+
+
+@dataclass
+class DynamicInfo:
+ netuid: int
+ owner_hotkey: str
+ owner_coldkey: str
+ subnet_name: str
+ symbol: str
+ tempo: int
+ last_step: int
+ blocks_since_last_step: int
+ emission: Balance
+ alpha_in: Balance
+ alpha_out: Balance
+ tao_in: Balance
+ price: Balance
+ k: float
+ is_dynamic: bool
+ alpha_out_emission: Balance
+ alpha_in_emission: Balance
+ tao_in_emission: Balance
+ pending_alpha_emission: Balance
+ pending_root_emission: Balance
+ network_registered_at: int
+ subnet_volume: Balance
+ subnet_identity: Optional[SubnetIdentity]
+
+ @classmethod
+ def from_vec_u8(cls, vec_u8: list[int]) -> Optional["DynamicInfo"]:
+ if len(vec_u8) == 0:
+ return None
+ decoded = from_scale_encoding(vec_u8, ChainDataType.DynamicInfo)
+ if decoded is None:
+ return None
+ return DynamicInfo.fix_decoded_values(decoded)
+
+ @classmethod
+ def list_from_vec_u8(cls, vec_u8: Union[list[int], bytes]) -> list["DynamicInfo"]:
+ decoded = from_scale_encoding(
+ vec_u8, ChainDataType.DynamicInfo, is_vec=True, is_option=True
+ )
+ if decoded is None:
+ return []
+ decoded = [DynamicInfo.fix_decoded_values(d) for d in decoded]
+ return decoded
+
+ @classmethod
+ def fix_decoded_values(cls, decoded: dict) -> "DynamicInfo":
+ """Returns a DynamicInfo object from a decoded DynamicInfo dictionary."""
+
+ netuid = int(decoded["netuid"])
+ symbol = bytes([int(b) for b in decoded["token_symbol"]]).decode()
+ subnet_name = bytes([int(b) for b in decoded["subnet_name"]]).decode()
+
+ is_dynamic = (
+ True if int(decoded["netuid"]) > 0 else False
+ ) # Root is not dynamic
+
+ owner_hotkey = ss58_encode(decoded["owner_hotkey"], SS58_FORMAT)
+ owner_coldkey = ss58_encode(decoded["owner_coldkey"], SS58_FORMAT)
+
+ emission = Balance.from_rao(decoded["emission"]).set_unit(0)
+ alpha_in = Balance.from_rao(decoded["alpha_in"]).set_unit(netuid)
+ alpha_out = Balance.from_rao(decoded["alpha_out"]).set_unit(netuid)
+ tao_in = Balance.from_rao(decoded["tao_in"]).set_unit(0)
+ alpha_out_emission = Balance.from_rao(decoded["alpha_out_emission"]).set_unit(
+ netuid
+ )
+ alpha_in_emission = Balance.from_rao(decoded["alpha_in_emission"]).set_unit(
+ netuid
+ )
+ tao_in_emission = Balance.from_rao(decoded["tao_in_emission"]).set_unit(0)
+ pending_alpha_emission = Balance.from_rao(
+ decoded["pending_alpha_emission"]
+ ).set_unit(netuid)
+ pending_root_emission = Balance.from_rao(
+ decoded["pending_root_emission"]
+ ).set_unit(0)
+ subnet_volume = Balance.from_rao(decoded["subnet_volume"]).set_unit(netuid)
+
+ price = (
+ Balance.from_tao(1.0)
+ if netuid == 0
+ else Balance.from_tao(tao_in.tao / alpha_in.tao)
+ if alpha_in.tao > 0
+ else Balance.from_tao(1)
+ ) # Root always has 1-1 price
+
+ if decoded.get("subnet_identity"):
+ subnet_identity = SubnetIdentity(
+ subnet_name=decoded["subnet_identity"]["subnet_name"],
+ github_repo=decoded["subnet_identity"]["github_repo"],
+ subnet_contact=decoded["subnet_identity"]["subnet_contact"],
+ )
+ else:
+ subnet_identity = None
+
+ return cls(
+ netuid=netuid,
+ owner_hotkey=owner_hotkey,
+ owner_coldkey=owner_coldkey,
+ subnet_name=subnet_name,
+ symbol=symbol,
+ tempo=int(decoded["tempo"]),
+ last_step=int(decoded["last_step"]),
+ blocks_since_last_step=int(decoded["blocks_since_last_step"]),
+ emission=emission,
+ alpha_in=alpha_in,
+ alpha_out=alpha_out,
+ tao_in=tao_in,
+ k=tao_in.rao * alpha_in.rao,
+ is_dynamic=is_dynamic,
+ price=price,
+ alpha_out_emission=alpha_out_emission,
+ alpha_in_emission=alpha_in_emission,
+ tao_in_emission=tao_in_emission,
+ pending_alpha_emission=pending_alpha_emission,
+ pending_root_emission=pending_root_emission,
+ network_registered_at=int(decoded["network_registered_at"]),
+ subnet_identity=subnet_identity,
+ subnet_volume=subnet_volume,
+ )
+
+ def tao_to_alpha(self, tao: Union[Balance, float, int]) -> Balance:
+ if isinstance(tao, (float, int)):
+ tao = Balance.from_tao(tao)
+ if self.price.tao != 0:
+ return Balance.from_tao(tao.tao / self.price.tao).set_unit(self.netuid)
+ else:
+ return Balance.from_tao(0)
+
+ def alpha_to_tao(self, alpha: Union[Balance, float, int]) -> Balance:
+ if isinstance(alpha, (float, int)):
+ alpha = Balance.from_tao(alpha)
+ return Balance.from_tao(alpha.tao * self.price.tao)
+
+ def tao_to_alpha_with_slippage(
+ self, tao: Union[Balance, float, int], percentage: bool = False
+ ) -> Union[tuple[Balance, Balance], float]:
+ """
+ Returns an estimate of how much Alpha would a staker receive if they stake their tao using the current pool state.
+ Args:
+ tao: Amount of TAO to stake.
+ Returns:
+ If percentage is False, a tuple of balances where the first part is the amount of Alpha received, and the
+ second part (slippage) is the difference between the estimated amount and ideal
+ amount as if there was no slippage. If percentage is True, a float representing the slippage percentage.
+ """
+ if isinstance(tao, (float, int)):
+ tao = Balance.from_tao(tao)
+
+ if self.is_dynamic:
+ new_tao_in = self.tao_in + tao
+ if new_tao_in == 0:
+ return tao, Balance.from_rao(0)
+ new_alpha_in = self.k / new_tao_in
+
+ # Amount of alpha given to the staker
+ alpha_returned = Balance.from_rao(
+ self.alpha_in.rao - new_alpha_in.rao
+ ).set_unit(self.netuid)
+
+ # Ideal conversion as if there is no slippage, just price
+ alpha_ideal = self.tao_to_alpha(tao)
+
+ if alpha_ideal.tao > alpha_returned.tao:
+ slippage = Balance.from_tao(
+ alpha_ideal.tao - alpha_returned.tao
+ ).set_unit(self.netuid)
+ else:
+ slippage = Balance.from_tao(0)
+ else:
+ alpha_returned = tao.set_unit(self.netuid)
+ slippage = Balance.from_tao(0)
+
+ if percentage:
+ slippage_pct_float = (
+ 100 * float(slippage) / float(slippage + alpha_returned)
+ if slippage + alpha_returned != 0
+ else 0
+ )
+ return slippage_pct_float
+ else:
+ return alpha_returned, slippage
+
+ slippage = tao_to_alpha_with_slippage
+ tao_slippage = tao_to_alpha_with_slippage
+
+ def alpha_to_tao_with_slippage(
+ self, alpha: Union[Balance, float, int], percentage: bool = False
+ ) -> Union[tuple[Balance, Balance], float]:
+ """
+ Returns an estimate of how much TAO would a staker receive if they unstake their alpha using the current pool state.
+ Args:
+ alpha: Amount of Alpha to stake.
+ Returns:
+ If percentage is False, a tuple of balances where the first part is the amount of TAO received, and the
+ second part (slippage) is the difference between the estimated amount and ideal
+ amount as if there was no slippage. If percentage is True, a float representing the slippage percentage.
+ """
+ if isinstance(alpha, (float, int)):
+ alpha = Balance.from_tao(alpha)
+
+ if self.is_dynamic:
+ new_alpha_in = self.alpha_in + alpha
+ new_tao_reserve = self.k / new_alpha_in
+ # Amount of TAO given to the unstaker
+ tao_returned = Balance.from_rao(self.tao_in.rao - new_tao_reserve.rao)
+
+ # Ideal conversion as if there is no slippage, just price
+ tao_ideal = self.alpha_to_tao(alpha)
+
+ if tao_ideal > tao_returned:
+ slippage = Balance.from_tao(tao_ideal.tao - tao_returned.tao)
+ else:
+ slippage = Balance.from_tao(0)
+ else:
+ tao_returned = alpha.set_unit(0)
+ slippage = Balance.from_tao(0)
+
+ if percentage:
+ slippage_pct_float = (
+ 100 * float(slippage) / float(slippage + tao_returned)
+ if slippage + tao_returned != 0
+ else 0
+ )
+ return slippage_pct_float
+ else:
+ return tao_returned, slippage
+
+ alpha_slippage = alpha_to_tao_with_slippage
diff --git a/bittensor/core/chain_data/metagraph_info.py b/bittensor/core/chain_data/metagraph_info.py
new file mode 100644
index 0000000000..fb53e04599
--- /dev/null
+++ b/bittensor/core/chain_data/metagraph_info.py
@@ -0,0 +1,229 @@
+from dataclasses import dataclass
+from typing import Optional
+
+from bittensor.core.chain_data.axon_info import AxonInfo
+from bittensor.core.chain_data.chain_identity import ChainIdentity
+from bittensor.core.chain_data.subnet_identity import SubnetIdentity
+from bittensor.core.chain_data.utils import (
+ ChainDataType,
+ from_scale_encoding,
+)
+from bittensor.utils import u64_normalized_float as u64tf, u16_normalized_float as u16tf
+from bittensor.utils.balance import Balance
+from scalecodec.utils.ss58 import ss58_encode
+
+
+# to balance with unit (just shortcut)
+def _tbwu(val: int, netuid: Optional[int] = 0) -> Balance:
+ """Returns a Balance object from a value and unit."""
+ return Balance.from_tao(val, netuid)
+
+
+@dataclass
+class MetagraphInfo:
+ # Subnet index
+ netuid: int
+
+ # Name and symbol
+ name: str
+ symbol: str
+ identity: Optional[SubnetIdentity]
+ network_registered_at: int
+
+ # Keys for owner.
+ owner_hotkey: str # hotkey
+ owner_coldkey: str # coldkey
+
+ # Tempo terms.
+ block: int # block at call.
+ tempo: int # epoch tempo
+ last_step: int
+ blocks_since_last_step: int
+
+ # Subnet emission terms
+ subnet_emission: Balance # subnet emission via tao
+ alpha_in: Balance # amount of alpha in reserve
+ alpha_out: Balance # amount of alpha outstanding
+ tao_in: Balance # amount of tao injected per block
+ alpha_out_emission: Balance # amount injected in alpha reserves per block
+ alpha_in_emission: Balance # amount injected outstanding per block
+ tao_in_emission: Balance # amount of tao injected per block
+ pending_alpha_emission: Balance # pending alpha to be distributed
+ pending_root_emission: Balance # pending tao for root divs to be distributed
+ subnet_volume: Balance # volume of the subnet
+
+ # Hparams for epoch
+ rho: int # subnet rho param
+ kappa: float # subnet kappa param
+
+ # Validator params
+ min_allowed_weights: float # min allowed weights per val
+ max_weights_limit: float # max allowed weights per val
+ weights_version: int # allowed weights version
+ weights_rate_limit: int # rate limit on weights.
+ activity_cutoff: int # validator weights cut off period in blocks
+ max_validators: int # max allowed validators.
+
+ # Registration
+ num_uids: int
+ max_uids: int
+ burn: Balance # current burn cost.
+ difficulty: float # current difficulty.
+ registration_allowed: bool # allows registrations.
+ pow_registration_allowed: bool # pow registration enabled.
+ immunity_period: int # subnet miner immunity period
+ min_difficulty: float # min pow difficulty
+ max_difficulty: float # max pow difficulty
+ min_burn: Balance # min tao burn
+ max_burn: Balance # max tao burn
+ adjustment_alpha: float # adjustment speed for registration params.
+ adjustment_interval: int # pow and burn adjustment interval
+ target_regs_per_interval: int # target registrations per interval
+ max_regs_per_block: int # max registrations per block.
+ serving_rate_limit: int # axon serving rate limit
+
+ # CR
+ commit_reveal_weights_enabled: bool # Is CR enabled.
+ commit_reveal_period: int # Commit reveal interval
+
+ # Bonds
+ liquid_alpha_enabled: bool # Bonds liquid enabled.
+ alpha_high: float # Alpha param high
+ alpha_low: float # Alpha param low
+ bonds_moving_avg: float # Bonds moving avg
+
+ # Metagraph info.
+ hotkeys: list[str] # hotkey per UID
+ coldkeys: list[str] # coldkey per UID
+ identities: list[Optional[ChainIdentity]] # coldkeys identities
+ axons: list[AxonInfo] # UID axons.
+ active: list[bool] # Active per UID
+ validator_permit: list[bool] # Val permit per UID
+ pruning_score: list[float] # Pruning per UID
+ last_update: list[int] # Last update per UID
+ emission: list[Balance] # Emission per UID
+ dividends: list[float] # Dividends per UID
+ incentives: list[float] # Mining incentives per UID
+ consensus: list[float] # Consensus per UID
+ trust: list[float] # Trust per UID
+ rank: list[float] # Rank per UID
+ block_at_registration: list[int] # Reg block per UID
+ alpha_stake: list[Balance] # Alpha staked per UID
+ tao_stake: list[Balance] # TAO staked per UID
+ total_stake: list[Balance] # Total stake per UID
+
+ # Dividend break down.
+ tao_dividends_per_hotkey: list[
+ tuple[str, Balance]
+ ] # List of dividend payouts in tao via root.
+ alpha_dividends_per_hotkey: list[
+ tuple[str, Balance]
+ ] # List of dividend payout in alpha via subnet.
+
+ @classmethod
+ def from_vec_u8(cls, vec_u8: bytes) -> Optional["MetagraphInfo"]:
+ """Returns a Metagraph object from encoded MetagraphInfo vector."""
+ if len(vec_u8) == 0:
+ return None
+ decoded = from_scale_encoding(vec_u8, ChainDataType.MetagraphInfo)
+ if decoded is None:
+ return None
+
+ return MetagraphInfo.fix_decoded_values(decoded)
+
+ @classmethod
+ def list_from_vec_u8(cls, vec_u8: bytes) -> list["MetagraphInfo"]:
+ """Returns a list of Metagraph objects from a list of encoded MetagraphInfo vectors."""
+ decoded = from_scale_encoding(
+ vec_u8, ChainDataType.MetagraphInfo, is_vec=True, is_option=True
+ )
+ if decoded is None:
+ return []
+
+ decoded = [
+ MetagraphInfo.fix_decoded_values(meta)
+ for meta in decoded
+ if meta is not None
+ ]
+ return decoded
+
+ @classmethod
+ def fix_decoded_values(cls, decoded: dict) -> "MetagraphInfo":
+ """Returns a Metagraph object from a decoded MetagraphInfo dictionary."""
+ # Subnet index
+ _netuid = decoded["netuid"]
+
+ # Name and symbol
+ decoded.update({"name": bytes(decoded.get("name")).decode()})
+ decoded.update({"symbol": bytes(decoded.get("symbol")).decode()})
+ decoded.update({"identity": decoded.get("identity", {})})
+
+ # Keys for owner.
+ decoded["owner_hotkey"] = ss58_encode(decoded["owner_hotkey"])
+ decoded["owner_coldkey"] = ss58_encode(decoded["owner_coldkey"])
+
+ # Subnet emission terms
+ decoded["subnet_emission"] = _tbwu(decoded["subnet_emission"])
+ decoded["alpha_in"] = _tbwu(decoded["alpha_in"], _netuid)
+ decoded["alpha_out"] = _tbwu(decoded["alpha_out"], _netuid)
+ decoded["tao_in"] = _tbwu(decoded["tao_in"])
+ decoded["alpha_out_emission"] = _tbwu(decoded["alpha_out_emission"], _netuid)
+ decoded["alpha_in_emission"] = _tbwu(decoded["alpha_in_emission"], _netuid)
+ decoded["tao_in_emission"] = _tbwu(decoded["tao_in_emission"])
+ decoded["pending_alpha_emission"] = _tbwu(
+ decoded["pending_alpha_emission"], _netuid
+ )
+ decoded["pending_root_emission"] = _tbwu(decoded["pending_root_emission"])
+ decoded["subnet_volume"] = Balance.from_rao(decoded["subnet_volume"]).set_unit(
+ _netuid
+ )
+
+ # Hparams for epoch
+ decoded["kappa"] = u16tf(decoded["kappa"])
+
+ # Validator params
+ decoded["min_allowed_weights"] = u16tf(decoded["min_allowed_weights"])
+ decoded["max_weights_limit"] = u16tf(decoded["max_weights_limit"])
+
+ # Registration
+ decoded["burn"] = _tbwu(decoded["burn"])
+ decoded["difficulty"] = u64tf(decoded["difficulty"])
+ decoded["min_difficulty"] = u64tf(decoded["min_difficulty"])
+ decoded["max_difficulty"] = u64tf(decoded["max_difficulty"])
+ decoded["min_burn"] = _tbwu(decoded["min_burn"])
+ decoded["max_burn"] = _tbwu(decoded["max_burn"])
+ decoded["adjustment_alpha"] = u64tf(decoded["adjustment_alpha"])
+
+ # Bonds
+ decoded["alpha_high"] = u16tf(decoded["alpha_high"])
+ decoded["alpha_low"] = u16tf(decoded["alpha_low"])
+ decoded["bonds_moving_avg"] = u64tf(decoded["bonds_moving_avg"])
+
+ # Metagraph info.
+ decoded["hotkeys"] = [ss58_encode(ck) for ck in decoded.get("hotkeys", [])]
+ decoded["coldkeys"] = [ss58_encode(hk) for hk in decoded.get("coldkeys", [])]
+ decoded["axons"] = decoded.get("axons", [])
+ decoded["pruning_score"] = [
+ u16tf(ps) for ps in decoded.get("pruning_score", [])
+ ]
+ decoded["emission"] = [_tbwu(em, _netuid) for em in decoded.get("emission", [])]
+ decoded["dividends"] = [u16tf(dv) for dv in decoded.get("dividends", [])]
+ decoded["incentives"] = [u16tf(ic) for ic in decoded.get("incentives", [])]
+ decoded["consensus"] = [u16tf(cs) for cs in decoded.get("consensus", [])]
+ decoded["trust"] = [u16tf(tr) for tr in decoded.get("trust", [])]
+ decoded["rank"] = [u16tf(rk) for rk in decoded.get("trust", [])]
+ decoded["alpha_stake"] = [_tbwu(ast, _netuid) for ast in decoded["alpha_stake"]]
+ decoded["tao_stake"] = [_tbwu(ts) for ts in decoded["tao_stake"]]
+ decoded["total_stake"] = [_tbwu(ts, _netuid) for ts in decoded["total_stake"]]
+
+ # Dividend break down
+ decoded["tao_dividends_per_hotkey"] = [
+ (ss58_encode(alpha[0]), _tbwu(alpha[1]))
+ for alpha in decoded["tao_dividends_per_hotkey"]
+ ]
+ decoded["alpha_dividends_per_hotkey"] = [
+ (ss58_encode(adphk[0]), _tbwu(adphk[1], _netuid))
+ for adphk in decoded["alpha_dividends_per_hotkey"]
+ ]
+
+ return MetagraphInfo(**decoded)
diff --git a/bittensor/core/chain_data/stake_info.py b/bittensor/core/chain_data/stake_info.py
index 8d3b5020fb..1b57797b62 100644
--- a/bittensor/core/chain_data/stake_info.py
+++ b/bittensor/core/chain_data/stake_info.py
@@ -25,7 +25,12 @@ class StakeInfo:
hotkey_ss58: str # Hotkey address
coldkey_ss58: str # Coldkey address
+ netuid: int # Network UID
stake: Balance # Stake for the hotkey-coldkey pair
+ locked: Balance # Stake which is locked.
+ emission: Balance # Emission for the hotkey-coldkey pair
+ drain: int
+ is_registered: bool
@classmethod
def fix_decoded_values(cls, decoded: Any) -> "StakeInfo":
@@ -33,7 +38,12 @@ def fix_decoded_values(cls, decoded: Any) -> "StakeInfo":
return cls(
hotkey_ss58=ss58_encode(decoded["hotkey"], SS58_FORMAT),
coldkey_ss58=ss58_encode(decoded["coldkey"], SS58_FORMAT),
- stake=Balance.from_rao(decoded["stake"]),
+ netuid=int(decoded["netuid"]),
+ stake=Balance.from_rao(decoded["stake"]).set_unit(decoded["netuid"]),
+ locked=Balance.from_rao(decoded["locked"]).set_unit(decoded["netuid"]),
+ emission=Balance.from_rao(decoded["emission"]).set_unit(decoded["netuid"]),
+ drain=int(decoded["drain"]),
+ is_registered=bool(decoded["is_registered"]),
)
@classmethod
diff --git a/bittensor/core/chain_data/subnet_identity.py b/bittensor/core/chain_data/subnet_identity.py
new file mode 100644
index 0000000000..e011dde31c
--- /dev/null
+++ b/bittensor/core/chain_data/subnet_identity.py
@@ -0,0 +1,12 @@
+from dataclasses import dataclass
+
+
+@dataclass
+class SubnetIdentity:
+ """Dataclass for subnet identity information."""
+
+ subnet_name: str
+ github_repo: str
+ subnet_contact: str
+
+ # TODO: Add other methods when fetching from chain
diff --git a/bittensor/core/chain_data/subnet_state.py b/bittensor/core/chain_data/subnet_state.py
new file mode 100644
index 0000000000..631b5b106b
--- /dev/null
+++ b/bittensor/core/chain_data/subnet_state.py
@@ -0,0 +1,92 @@
+"""
+This module defines the `SubnetState` data class and associated methods for handling and decoding
+subnetwork states in the Bittensor network.
+"""
+
+from dataclasses import dataclass
+from typing import Optional
+
+from scalecodec.utils.ss58 import ss58_encode
+
+from bittensor.core.chain_data.utils import (
+ ChainDataType,
+ from_scale_encoding,
+ SS58_FORMAT,
+)
+from bittensor.utils import u16_normalized_float
+from bittensor.utils.balance import Balance
+
+
+@dataclass
+class SubnetState:
+ netuid: int
+ hotkeys: list[str]
+ coldkeys: list[str]
+ active: list[bool]
+ validator_permit: list[bool]
+ pruning_score: list[float]
+ last_update: list[int]
+ emission: list["Balance"]
+ dividends: list[float]
+ incentives: list[float]
+ consensus: list[float]
+ trust: list[float]
+ rank: list[float]
+ block_at_registration: list[int]
+ alpha_stake: list["Balance"]
+ tao_stake: list["Balance"]
+ total_stake: list["Balance"]
+ emission_history: list[list[int]]
+
+ @classmethod
+ def from_vec_u8(cls, vec_u8: list[int]) -> Optional["SubnetState"]:
+ if len(vec_u8) == 0:
+ return None
+ decoded = from_scale_encoding(vec_u8, ChainDataType.SubnetState)
+ if decoded is None:
+ return None
+ return SubnetState.fix_decoded_values(decoded)
+
+ @classmethod
+ def list_from_vec_u8(cls, vec_u8: list[int]) -> list["SubnetState"]:
+ decoded = from_scale_encoding(
+ vec_u8, ChainDataType.SubnetState, is_vec=True, is_option=True
+ )
+ if decoded is None:
+ return []
+ decoded = [SubnetState.fix_decoded_values(d) for d in decoded]
+ return decoded
+
+ @classmethod
+ def fix_decoded_values(cls, decoded: dict) -> "SubnetState":
+ netuid = decoded["netuid"]
+ return SubnetState(
+ netuid=netuid,
+ hotkeys=[ss58_encode(val, SS58_FORMAT) for val in decoded["hotkeys"]],
+ coldkeys=[ss58_encode(val, SS58_FORMAT) for val in decoded["coldkeys"]],
+ active=decoded["active"],
+ validator_permit=decoded["validator_permit"],
+ pruning_score=[
+ u16_normalized_float(val) for val in decoded["pruning_score"]
+ ],
+ last_update=decoded["last_update"],
+ emission=[
+ Balance.from_rao(val).set_unit(netuid) for val in decoded["emission"]
+ ],
+ dividends=[u16_normalized_float(val) for val in decoded["dividends"]],
+ incentives=[u16_normalized_float(val) for val in decoded["incentives"]],
+ consensus=[u16_normalized_float(val) for val in decoded["consensus"]],
+ trust=[u16_normalized_float(val) for val in decoded["trust"]],
+ rank=[u16_normalized_float(val) for val in decoded["rank"]],
+ block_at_registration=decoded["block_at_registration"],
+ alpha_stake=[
+ Balance.from_rao(val).set_unit(netuid) for val in decoded["alpha_stake"]
+ ],
+ tao_stake=[
+ Balance.from_rao(val).set_unit(0) for val in decoded["tao_stake"]
+ ],
+ total_stake=[
+ Balance.from_rao(val).set_unit(netuid) for val in decoded["total_stake"]
+ ],
+ emission_history=decoded["emission_history"],
+ )
diff --git a/bittensor/core/chain_data/utils.py b/bittensor/core/chain_data/utils.py
index 1218b9ea56..e3a599128b 100644
--- a/bittensor/core/chain_data/utils.py
+++ b/bittensor/core/chain_data/utils.py
@@ -23,6 +23,12 @@ class ChainDataType(Enum):
ScheduledColdkeySwapInfo = 9
AccountId = 10
NeuronCertificate = 11
+ SubnetState = 12
+ DynamicInfo = 13
+ SubnetIdentity = 14
+ MetagraphInfo = 15
+ ChainIdentity = 16
+ AxonInfo = 17
def from_scale_encoding(
@@ -215,12 +221,40 @@ def from_scale_encoding_using_type_string(
["ip_type_and_protocol", "Compact
"],
],
},
+ "SubnetState": {
+ "type": "struct",
+ "type_mapping": [
+ ["netuid", "Compact"],
+ ["hotkeys", "Vec"],
+ ["coldkeys", "Vec"],
+ ["active", "Vec"],
+ ["validator_permit", "Vec"],
+ ["pruning_score", "Vec>"],
+ ["last_update", "Vec>"],
+ ["emission", "Vec>"],
+ ["dividends", "Vec>"],
+ ["incentives", "Vec>"],
+ ["consensus", "Vec>"],
+ ["trust", "Vec>"],
+ ["rank", "Vec>"],
+ ["block_at_registration", "Vec>"],
+ ["alpha_stake", "Vec>"],
+ ["tao_stake", "Vec>"],
+ ["total_stake", "Vec>"],
+ ["emission_history", "Vec>>"],
+ ],
+ },
"StakeInfo": {
"type": "struct",
"type_mapping": [
["hotkey", "AccountId"],
["coldkey", "AccountId"],
+ ["netuid", "Compact"],
["stake", "Compact"],
+ ["locked", "Compact"],
+ ["emission", "Compact"],
+ ["drain", "Compact"],
+ ["is_registered", "bool"],
],
},
"SubnetHyperparameters": {
@@ -263,6 +297,139 @@ def from_scale_encoding_using_type_string(
["arbitration_block", "Compact"],
],
},
+ "SubnetIdentity": {
+ "type": "struct",
+ "type_mapping": [
+ ["subnet_name", "Vec"],
+ ["github_repo", "Vec"],
+ ["subnet_contact", "Vec"],
+ ],
+ },
+ "DynamicInfo": {
+ "type": "struct",
+ "type_mapping": [
+ ["netuid", "Compact"],
+ ["owner_hotkey", "AccountId"],
+ ["owner_coldkey", "AccountId"],
+ ["subnet_name", "Vec>"],
+ ["token_symbol", "Vec>"],
+ ["tempo", "Compact"],
+ ["last_step", "Compact"],
+ ["blocks_since_last_step", "Compact"],
+ ["emission", "Compact"],
+ ["alpha_in", "Compact"],
+ ["alpha_out", "Compact"],
+ ["tao_in", "Compact"],
+ ["alpha_out_emission", "Compact"],
+ ["alpha_in_emission", "Compact"],
+ ["tao_in_emission", "Compact"],
+ ["pending_alpha_emission", "Compact"],
+ ["pending_root_emission", "Compact"],
+ ["subnet_volume", "Compact"],
+ ["network_registered_at", "Compact"],
+ ["subnet_identity", "Option"],
+ ],
+ },
+ "MetagraphInfo": {
+ "type": "struct",
+ "type_mapping": [
+ ["netuid", "Compact"],
+ ["name", "Vec>"],
+ ["symbol", "Vec>"],
+ ["identity", "Option"],
+ ["network_registered_at", "Compact"],
+ ["owner_hotkey", "T::AccountId"],
+ ["owner_coldkey", "T::AccountId"],
+ ["block", "Compact"],
+ ["tempo", "Compact"],
+ ["last_step", "Compact"],
+ ["blocks_since_last_step", "Compact"],
+ ["subnet_emission", "Compact"],
+ ["alpha_in", "Compact"],
+ ["alpha_out", "Compact"],
+ ["tao_in", "Compact"],
+ ["alpha_out_emission", "Compact"],
+ ["alpha_in_emission", "Compact"],
+ ["tao_in_emission", "Compact"],
+ ["pending_alpha_emission", "Compact"],
+ ["pending_root_emission", "Compact"],
+ ["subnet_volume", "Compact"],
+ ["rho", "Compact"],
+ ["kappa", "Compact"],
+ ["min_allowed_weights", "Compact"],
+ ["max_weights_limit", "Compact"],
+ ["weights_version", "Compact"],
+ ["weights_rate_limit", "Compact"],
+ ["activity_cutoff", "Compact"],
+ ["max_validators", "Compact"],
+ ["num_uids", "Compact"],
+ ["max_uids", "Compact"],
+ ["burn", "Compact"],
+ ["difficulty", "Compact"],
+ ["registration_allowed", "bool"],
+ ["pow_registration_allowed", "bool"],
+ ["immunity_period", "Compact"],
+ ["min_difficulty", "Compact"],
+ ["max_difficulty", "Compact"],
+ ["min_burn", "Compact"],
+ ["max_burn", "Compact"],
+ ["adjustment_alpha", "Compact"],
+ ["adjustment_interval", "Compact"],
+ ["target_regs_per_interval", "Compact"],
+ ["max_regs_per_block", "Compact"],
+ ["serving_rate_limit", "Compact"],
+ ["commit_reveal_weights_enabled", "bool"],
+ ["commit_reveal_period", "Compact"],
+ ["liquid_alpha_enabled", "bool"],
+ ["alpha_high", "Compact"],
+ ["alpha_low", "Compact"],
+ ["bonds_moving_avg", "Compact"],
+ ["hotkeys", "Vec"],
+ ["coldkeys", "Vec"],
+ ["identities", "Vec