diff --git a/pool/payment/payment.py b/pool/payment/payment.py index 3ca6668d..b361025e 100644 --- a/pool/payment/payment.py +++ b/pool/payment/payment.py @@ -1,4 +1,5 @@ import asyncio +import aiohttp import logging import pathlib import traceback @@ -82,12 +83,16 @@ def __init__(self, config: Dict, constants: ConsensusConstants, pool_store: Opti self.create_payment_loop_task: Optional[asyncio.Task] = None self.submit_payment_loop_task: Optional[asyncio.Task] = None self.get_peak_loop_task: Optional[asyncio.Task] = None + self.heart_beat_loop_task: Optional[asyncio.Task] = None self.node_rpc_client: Optional[FullNodeRpcClient] = None self.node_rpc_port = pool_config["node_rpc_port"] self.wallet_rpc_client: Optional[WalletRpcClient] = None self.wallet_rpc_port = pool_config["wallet_rpc_port"] + self.heart_beat_monitor_url = pool_config["heart_beat_monitor_url_payment"] + self.heart_beat_interval = int(pool_config.get("heart_beat_interval_payment", "30")) + async def start(self): await self.store.connect() @@ -109,6 +114,7 @@ async def start(self): self.create_payment_loop_task = asyncio.create_task(self.create_payment_loop()) self.submit_payment_loop_task = asyncio.create_task(self.submit_payment_loop()) self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop()) + self.heart_beat_loop_task = asyncio.create_task(self.heart_beat_loop()) self.pending_payments = asyncio.Queue() @@ -119,6 +125,8 @@ async def stop(self): self.submit_payment_loop_task.cancel() if self.get_peak_loop_task is not None: self.get_peak_loop_task.cancel() + if self.heart_beat_loop_task is not None: + self.heart_beat_loop_task.cancel() self.wallet_rpc_client.close() await self.wallet_rpc_client.await_closed() @@ -303,3 +311,19 @@ async def submit_payment_loop(self): error_stack = traceback.format_exc() self.log.error(f"Unexpected error in submit_payment_loop: {e} {error_stack}") await asyncio.sleep(300) + + async def heart_beat_loop(self): + """ + Periodically send heart beat signal to uptime bot + """ + while True: + try: + async with aiohttp.request('GET', self.heart_beat_monitor_url) as resp: + assert resp.status == 200 + await asyncio.sleep(self.heart_beat_interval) + except asyncio.CancelledError: + self.log.info("Cancelled heart_beat_loop, closing") + return + except Exception as e: + self.log.error(f"Unexpected error in heart_beat_loop: {e}") + await asyncio.sleep(self.heart_beat_interval) diff --git a/pool/pool.py b/pool/pool.py index 221628cf..5052db9c 100644 --- a/pool/pool.py +++ b/pool/pool.py @@ -1,4 +1,5 @@ import asyncio +import aiohttp import logging import pathlib import time @@ -179,11 +180,14 @@ def __init__( self.create_payment_loop_task: Optional[asyncio.Task] = None self.submit_payment_loop_task: Optional[asyncio.Task] = None self.get_peak_loop_task: Optional[asyncio.Task] = None + self.heart_beat_loop_task: Optional[asyncio.Task] = None self.node_rpc_client: Optional[FullNodeRpcClient] = None self.node_rpc_port = pool_config["node_rpc_port"] # self.wallet_rpc_client: Optional[WalletRpcClient] = None # self.wallet_rpc_port = pool_config["wallet_rpc_port"] + self.heart_beat_monitor_url = pool_config["heart_beat_monitor_url_pool"] + self.heart_beat_interval = int(pool_config.get("heart_beat_interval_pool", "30")) async def start(self): await self.store.connect() @@ -211,6 +215,7 @@ async def start(self): # self.create_payment_loop_task = asyncio.create_task(self.create_payment_loop()) # self.submit_payment_loop_task = asyncio.create_task(self.submit_payment_loop()) self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop()) + self.heart_beat_loop_task = asyncio.create_task(self.heart_beat_loop()) self.pending_payments = asyncio.Queue() @@ -225,6 +230,8 @@ async def stop(self): self.submit_payment_loop_task.cancel() if self.get_peak_loop_task is not None: self.get_peak_loop_task.cancel() + if self.heart_beat_loop_task is not None: + self.heart_beat_loop_task.cancel() # self.wallet_rpc_client.close() # await self.wallet_rpc_client.await_closed() @@ -911,3 +918,19 @@ async def get_signage_point_or_eos(): current_difficulty = new_difficulty return PostPartialResponse(current_difficulty).to_json_dict() + + async def heart_beat_loop(self): + """ + Periodically send heart beat signal to uptime bot + """ + while True: + try: + async with aiohttp.request('GET', self.heart_beat_monitor_url) as resp: + assert resp.status == 200 + await asyncio.sleep(self.heart_beat_interval) + except asyncio.CancelledError: + self.log.info("Cancelled heart_beat_loop, closing") + return + except Exception as e: + self.log.error(f"Unexpected error in heart_beat_loop: {e}") + await asyncio.sleep(self.heart_beat_interval) diff --git a/pool/reward/reward_collector.py b/pool/reward/reward_collector.py index 70afa22d..eaeedeb1 100644 --- a/pool/reward/reward_collector.py +++ b/pool/reward/reward_collector.py @@ -1,4 +1,5 @@ import asyncio +import aiohttp import logging import pathlib import traceback @@ -71,12 +72,16 @@ def __init__(self, config: Dict, constants: ConsensusConstants, pool_store: Opti self.collect_pool_rewards_loop_task: Optional[asyncio.Task] = None self.get_peak_loop_task: Optional[asyncio.Task] = None + self.heart_beat_loop_task: Optional[asyncio.Task] = None self.node_rpc_client: Optional[FullNodeRpcClient] = None self.node_rpc_port = pool_config["node_rpc_port"] self.wallet_rpc_client: Optional[WalletRpcClient] = None self.wallet_rpc_port = pool_config["wallet_rpc_port"] + self.heart_beat_monitor_url = pool_config["heart_beat_monitor_url_reward"] + self.heart_beat_interval = int(pool_config.get("heart_beat_interval_reward", "30")) + async def start(self): await self.store.connect() @@ -99,12 +104,15 @@ async def start(self): self.collect_pool_rewards_loop_task = asyncio.create_task(self.collect_pool_rewards_loop()) self.get_peak_loop_task = asyncio.create_task(self.get_peak_loop()) + self.heart_beat_loop_task = asyncio.create_task(self.heart_beat_loop()) async def stop(self): if self.collect_pool_rewards_loop_task is not None: self.collect_pool_rewards_loop_task.cancel() if self.get_peak_loop_task is not None: self.get_peak_loop_task.cancel() + if self.heart_beat_loop_task is not None: + self.heart_beat_loop_task.cancel() self.wallet_rpc_client.close() await self.wallet_rpc_client.await_closed() @@ -247,3 +255,19 @@ async def collect_pool_rewards_loop(self): error_stack = traceback.format_exc() self.log.error(f"Unexpected error in collect_pool_rewards_loop: {e} {error_stack}") await asyncio.sleep(self.collect_pool_rewards_interval) + + async def heart_beat_loop(self): + """ + Periodically send heart beat signal to uptime bot + """ + while True: + try: + async with aiohttp.request('GET', self.heart_beat_monitor_url) as resp: + assert resp.status == 200 + await asyncio.sleep(self.heart_beat_interval) + except asyncio.CancelledError: + self.log.info("Cancelled heart_beat_loop, closing") + return + except Exception as e: + self.log.error(f"Unexpected error in heart_beat_loop: {e}") + await asyncio.sleep(self.heart_beat_interval) diff --git a/pool/snapshot/snapshot.py b/pool/snapshot/snapshot.py index a594dd13..bc9b8b9a 100644 --- a/pool/snapshot/snapshot.py +++ b/pool/snapshot/snapshot.py @@ -35,15 +35,22 @@ def __init__(self, config: Dict, constants: ConsensusConstants, pool_store: Opti self.snapshot_interval = pool_config["snapshot_interval"] self.create_payment_loop_task: Optional[asyncio.Task] = None + self.heart_beat_loop_task: Optional[asyncio.Task] = None + + self.heart_beat_monitor_url = pool_config["heart_beat_monitor_url_snapshot"] + self.heart_beat_interval = int(pool_config.get("heart_beat_interval_snapshot", "30")) async def start(self): await self.store.connect() self.create_snapshot_loop_task = asyncio.create_task(self.create_snapshot_loop()) + self.heart_beat_loop_task = asyncio.create_task(self.heart_beat_loop()) async def stop(self): if self.create_snapstho_loop_task is not None: self.create_snapshot_loop_task.cancel() + if self.heart_beat_loop_task is not None: + self.heart_beat_loop_task.cancel() await self.store.connection.close() @@ -67,3 +74,19 @@ async def create_snapshot_loop(self): error_stack = traceback.format_exc() self.log.error(f"Unexpected error in create_snapshot_loop: {e} {error_stack}") await asyncio.sleep(self.snapshot_interval) + + async def heart_beat_loop(self): + """ + Periodically send heart beat signal to uptime bot + """ + while True: + try: + async with aiohttp.request('GET', self.heart_beat_monitor_url) as resp: + assert resp.status == 200 + await asyncio.sleep(self.heart_beat_interval) + except asyncio.CancelledError: + self.log.info("Cancelled heart_beat_loop, closing") + return + except Exception as e: + self.log.error(f"Unexpected error in heart_beat_loop: {e}") + await asyncio.sleep(self.heart_beat_interval)