Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add change ip functionality, update monitors logic #1035

Merged
merged 14 commits into from
Jan 18, 2024
Merged
121 changes: 93 additions & 28 deletions core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,41 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import json
import time
import os
import socket
import psutil
import hashlib
import logging
import os
import platform
import hashlib

import requests

import psutil
import socket
import time
from enum import Enum
from typing import Dict
from typing import Dict, List, Optional, TypedDict

try:
from sh import lsmod
except ImportError:
logging.warning('Could not import lsmod from sh package')
import requests

from skale import Skale
from skale.schain_config.generator import get_nodes_for_schain
from skale.transactions.exceptions import TransactionLogicError
from skale.utils.exceptions import InvalidNodeIdError
from skale.utils.helper import ip_from_bytes
from skale.utils.web3_utils import public_key_to_address, to_checksum_address

from core.filebeat import update_filebeat_service

from tools.configs import CHECK_REPORT_PATH, META_FILEPATH, WATCHDOG_PORT
from tools.configs import WATCHDOG_PORT, CHANGE_IP_DELAY, CHECK_REPORT_PATH, META_FILEPATH
from tools.helper import read_json
from tools.str_formatters import arguments_list_string
from tools.wallet_utils import check_required_balance

logger = logging.getLogger(__name__)

try:
from sh import lsmod
except ImportError:
logging.warning('Could not import lsmod from sh package')

Check warning on line 51 in core/node.py

View check run for this annotation

Codecov / codecov/patch

core/node.py#L50-L51

Added lines #L50 - L51 were not covered by tests


logger = logging.getLogger(__name__)


class NodeStatus(Enum):
"""This class contains possible node statuses"""
Expand Down Expand Up @@ -345,6 +348,81 @@
}


def get_check_report(report_path: str = CHECK_REPORT_PATH) -> Dict:
if not os.path.isfile(report_path):
return {}
with open(report_path) as report_file:
return json.load(report_file)

Check warning on line 355 in core/node.py

View check run for this annotation

Codecov / codecov/patch

core/node.py#L352-L355

Added lines #L352 - L355 were not covered by tests


def get_abi_hash(file_path):
with open(file_path, 'rb') as file:
abi_hash = hashlib.sha256(file.read()).hexdigest()
return abi_hash

Check warning on line 361 in core/node.py

View check run for this annotation

Codecov / codecov/patch

core/node.py#L359-L361

Added lines #L359 - L361 were not covered by tests


class ManagerNodeInfo(TypedDict):
name: str
ip: str
publicIP: str
port: int
start_block: int
last_reward_date: int
finish_time: int
status: int
validator_id: int
publicKey: str
domain_name: str


class ExtendedManagerNodeInfo(ManagerNodeInfo):
ip_change_ts: int


def get_current_nodes(skale: Skale, name: str) -> List[ExtendedManagerNodeInfo]:
if not skale.schains_internal.is_schain_exist(name):
return []
current_nodes: ManagerNodeInfo = get_nodes_for_schain(skale, name)
for node in current_nodes:
node['ip_change_ts'] = skale.nodes.get_last_change_ip_time(node['id'])
node['ip'] = ip_from_bytes(node['ip'])
node['publicIP'] = ip_from_bytes(node['publicIP'])
return current_nodes


def get_current_ips(current_nodes: List[ExtendedManagerNodeInfo]) -> list[str]:
return [node['ip'] for node in current_nodes]


def get_max_ip_change_ts(current_nodes: List[ExtendedManagerNodeInfo]) -> Optional[int]:
max_ip_change_ts = max(current_nodes, key=lambda node: node['ip_change_ts'])['ip_change_ts']
return None if max_ip_change_ts == 0 else max_ip_change_ts


def calc_reload_ts(current_nodes: List[ExtendedManagerNodeInfo], node_index: int) -> int:
max_ip_change_ts = get_max_ip_change_ts(current_nodes)
if max_ip_change_ts is None:
return
return max_ip_change_ts + get_node_delay(node_index)


def get_node_delay(node_index: int) -> int:
"""
Returns delay for node in seconds.
Example: for node with index 3 and delay 300 it will be 1200 seconds
"""
return CHANGE_IP_DELAY * (node_index + 1)


def get_node_index_in_group(skale: Skale, schain_name: str, node_id: int) -> Optional[int]:
"""Returns node index in group or None if node is not in group"""
try:
node_ids = skale.schains_internal.get_node_ids_for_schain(schain_name)
return node_ids.index(node_id)
except ValueError:
return None


def is_port_open(ip, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
Expand Down Expand Up @@ -378,16 +456,3 @@
except Exception as err:
return {'status': 1, 'errors': [err]}
return {'status': 0, 'data': res}


def get_check_report(report_path: str = CHECK_REPORT_PATH) -> Dict:
if not os.path.isfile(CHECK_REPORT_PATH):
return {}
with open(CHECK_REPORT_PATH) as report_file:
return json.load(report_file)


def get_abi_hash(file_path):
with open(file_path, 'rb') as file:
abi_hash = hashlib.sha256(file.read()).hexdigest()
return abi_hash
47 changes: 40 additions & 7 deletions core/schains/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from core.node import ExtendedManagerNodeInfo, get_current_ips

from core.schains.config.directory import get_schain_check_filepath
from core.schains.config.file_manager import ConfigFileManager
Expand Down Expand Up @@ -141,6 +142,7 @@ def __init__(self,
schain_record: SChainRecord,
rotation_id: int,
stream_version: str,
current_nodes: list[ExtendedManagerNodeInfo],
estate: ExternalState,
econfig: Optional[ExternalConfig] = None
) -> None:
Expand All @@ -149,6 +151,7 @@ def __init__(self,
self.schain_record = schain_record
self.rotation_id = rotation_id
self.stream_version = stream_version
self.current_nodes = current_nodes
self.estate = estate
self.econfig = econfig or ExternalConfig(schain_name)
self.cfm: ConfigFileManager = ConfigFileManager(
Expand All @@ -173,17 +176,45 @@ def dkg(self) -> CheckRes:
)
return CheckRes(os.path.isfile(secret_key_share_filepath))

@property
def skaled_node_ips(self) -> CheckRes:
"""Checks that IP list on the skale-manager is the same as in the skaled config"""
res = False
if self.cfm.skaled_config_exists():
conf = self.cfm.skaled_config
node_ips = get_node_ips_from_config(conf)
current_ips = get_current_ips(self.current_nodes)
res = set(node_ips) == set(current_ips)
return CheckRes(res)

@property
def upstream_config(self) -> CheckRes:
"""Checks that config exists for rotation id and stream"""
"""
Returns True if config exists for current rotation id,
node ip addresses and stream version are up to date
and config regeneration was not triggered manually.
Returns False otherwise.
"""
exists = self.cfm.upstream_exist_for_rotation_id(self.rotation_id)

logger.debug('Upstream configs status for %s: %s', self.name, exists)
return CheckRes(
exists and
self.schain_record.config_version == self.stream_version and
not self.schain_record.sync_config_run
stream_updated = self.schain_record.config_version == self.stream_version
node_ips_updated = True
triggered = self.schain_record.sync_config_run
if exists:
conf = self.cfm.latest_upstream_config
upstream_node_ips = get_node_ips_from_config(conf)
current_ips = get_current_ips(self.current_nodes)
node_ips_updated = set(upstream_node_ips) == set(current_ips)

logger.info(
'Upstream config status, rotation_id %s: exist: %s, ips: %s, stream: %s, triggered: %s',
self.rotation_id,
exists,
node_ips_updated,
stream_updated,
triggered
)
return CheckRes(exists and node_ips_updated and stream_updated and not triggered)

@property
def external_state(self) -> CheckRes:
Expand All @@ -203,7 +234,7 @@ def __init__(
rule_controller: IRuleController,
*,
econfig: Optional[ExternalConfig] = None,
dutils: DockerUtils = None
dutils: Optional[DockerUtils] = None
):
self.name = schain_name
self.schain_record = schain_record
Expand Down Expand Up @@ -361,6 +392,7 @@ def __init__(
rule_controller: IRuleController,
stream_version: str,
estate: ExternalState,
current_nodes: list[ExtendedManagerNodeInfo],
rotation_id: int = 0,
*,
econfig: Optional[ExternalConfig] = None,
Expand All @@ -373,6 +405,7 @@ def __init__(
schain_record=schain_record,
rotation_id=rotation_id,
stream_version=stream_version,
current_nodes=current_nodes,
estate=estate,
econfig=econfig
),
Expand Down
9 changes: 7 additions & 2 deletions core/schains/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from sgx import SgxClient

from core.node import get_skale_node_version
from core.node import get_current_nodes, get_skale_node_version
from core.schains.checks import SChainChecks
from core.schains.config.file_manager import ConfigFileManager
from core.schains.config.directory import schain_config_dir
Expand Down Expand Up @@ -207,11 +207,13 @@ def remove_schain(skale, node_id, schain_name, msg, dutils=None) -> None:
rotation_data = skale.node_rotation.get_rotation(schain_name)
rotation_id = rotation_data['rotation_id']
estate = ExternalConfig(name=schain_name).get()
current_nodes = get_current_nodes(skale, schain_name)
cleanup_schain(
node_id,
schain_name,
sync_agent_ranges,
rotation_id=rotation_id,
current_nodes=current_nodes,
estate=estate,
dutils=dutils
)
Expand All @@ -222,6 +224,7 @@ def cleanup_schain(
schain_name,
sync_agent_ranges,
rotation_id,
current_nodes,
estate,
dutils=None
) -> None:
Expand All @@ -239,8 +242,10 @@ def cleanup_schain(
rule_controller=rc,
stream_version=stream_version,
schain_record=schain_record,
current_nodes=current_nodes,
rotation_id=rotation_id,
estate=estate
estate=estate,
dutils=dutils
)
status = checks.get_all()
if status['skaled_container'] or is_exited(
Expand Down
12 changes: 9 additions & 3 deletions core/schains/external_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ class ExternalState:
chain_id: int
ranges: field(default_factory=list)
ima_linked: bool = False
reload_ts: Optional[int] = None

def to_dict(self):
return {
'chain_id': self.chain_id,
'ima_linked': self.ima_linked,
'ranges': list(map(list, self.ranges))
'ranges': list(map(list, self.ranges)),
'reload_ts': self.reload_ts
}


Expand All @@ -38,6 +40,10 @@ def ima_linked(self) -> bool:
def chain_id(self) -> Optional[int]:
return self.read().get('chain_id', None)

@property
def reload_ts(self) -> Optional[int]:
return self.read().get('reload_ts', None)

@property
def ranges(self) -> List[IpRange]:
plain_ranges = self.read().get('ranges', [])
Expand All @@ -49,8 +55,8 @@ def get(self) -> Optional[ExternalState]:
return ExternalState(
chain_id=plain['chain_id'],
ima_linked=plain['ima_linked'],
ranges=list(sorted(map(lambda r: IpRange(*r), plain['ranges'])))

ranges=list(sorted(map(lambda r: IpRange(*r), plain['ranges']))),
reload_ts=plain.get('reload_ts')
)
return None

Expand Down
Loading
Loading