Skip to content

Commit d7165f2

Browse files
committed
expanded rpc api to add MempoolSizeChanged
1 parent 47abc36 commit d7165f2

File tree

20 files changed

+252
-19
lines changed

20 files changed

+252
-19
lines changed

consensus/notify/src/notification.rs

+14
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ pub enum Notification {
4242

4343
#[display(fmt = "NewBlockTemplate notification")]
4444
NewBlockTemplate(NewBlockTemplateNotification),
45+
46+
#[display(fmt = "MempoolSizeChanged notification: size {}", "_0.network_mempool_size")]
47+
MempoolSizeChanged(MempoolSizeChangedNotification),
4548
}
4649
}
4750

@@ -181,3 +184,14 @@ pub struct PruningPointUtxoSetOverrideNotification {}
181184

182185
#[derive(Debug, Clone)]
183186
pub struct NewBlockTemplateNotification {}
187+
188+
#[derive(Debug, Clone)]
189+
pub struct MempoolSizeChangedNotification {
190+
pub network_mempool_size: u64,
191+
}
192+
193+
impl MempoolSizeChangedNotification {
194+
pub fn new(network_mempool_size: u64) -> Self {
195+
Self { network_mempool_size }
196+
}
197+
}

notify/src/events.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ event_type_enum! {
5151
VirtualDaaScoreChanged,
5252
PruningPointUtxoSetOverride,
5353
NewBlockTemplate,
54+
MempoolSizeChanged,
5455
}
5556
}
5657

57-
pub const EVENT_COUNT: usize = 9;
58+
pub const EVENT_COUNT: usize = 10;
5859

5960
impl FromStr for EventType {
6061
type Err = Error;
@@ -70,6 +71,7 @@ impl FromStr for EventType {
7071
"virtual-daa-score-changed" => Ok(EventType::VirtualDaaScoreChanged),
7172
"pruning-point-utxo-set-override" => Ok(EventType::PruningPointUtxoSetOverride),
7273
"new-block-template" => Ok(EventType::NewBlockTemplate),
74+
"mempool-size-changed" => Ok(EventType::MempoolSizeChanged),
7375
_ => Err(Error::InvalidEventType(s.to_string())),
7476
}
7577
}

notify/src/scope.rs

+18
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub enum Scope {
4545
VirtualDaaScoreChanged,
4646
PruningPointUtxoSetOverride,
4747
NewBlockTemplate,
48+
MempoolSizeChanged,
4849
}
4950
}
5051

@@ -266,3 +267,20 @@ impl Deserializer for NewBlockTemplateScope {
266267
Ok(Self {})
267268
}
268269
}
270+
271+
#[derive(Clone, Display, Debug, Default, PartialEq, Eq, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
272+
pub struct MempoolSizeChangedScope {}
273+
274+
impl Serializer for MempoolSizeChangedScope {
275+
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
276+
store!(u16, &1, writer)?;
277+
Ok(())
278+
}
279+
}
280+
281+
impl Deserializer for MempoolSizeChangedScope {
282+
fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
283+
let _version = load!(u16, reader)?;
284+
Ok(Self {})
285+
}
286+
}

protocol/flows/src/flow_context.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use kaspa_consensus_core::config::Config;
1414
use kaspa_consensus_core::errors::block::RuleError;
1515
use kaspa_consensus_core::tx::{Transaction, TransactionId};
1616
use kaspa_consensus_notify::{
17-
notification::{Notification, PruningPointUtxoSetOverrideNotification},
17+
notification::{MempoolSizeChangedNotification, Notification, PruningPointUtxoSetOverrideNotification},
1818
root::ConsensusNotificationRoot,
1919
};
2020
use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusInstance, ConsensusManager, ConsensusProxy, ConsensusSessionOwned};
@@ -26,6 +26,7 @@ use kaspa_core::{
2626
use kaspa_core::{time::unix_now, warn};
2727
use kaspa_hashes::Hash;
2828
use kaspa_mining::mempool::tx::{Orphan, Priority};
29+
use kaspa_mining::model::tx_query::TransactionQuery;
2930
use kaspa_mining::{manager::MiningManagerProxy, mempool::tx::RbfPolicy};
3031
use kaspa_notify::notifier::Notify;
3132
use kaspa_p2p_lib::{
@@ -624,6 +625,7 @@ impl FlowContext {
624625
.await;
625626
}
626627
}
628+
context.on_transaction_added_to_mempool().await;
627629
context.mempool_scanning_is_done().await;
628630
debug!("<> Mempool scanning task is done");
629631
});
@@ -649,7 +651,10 @@ impl FlowContext {
649651

650652
/// Notifies that a transaction has been added to the mempool.
651653
pub async fn on_transaction_added_to_mempool(&self) {
652-
// TODO: call a handler function or a predefined registered service
654+
let network_mempool_size = self.mining_manager().clone().transaction_count(TransactionQuery::TransactionsOnly).await as u64;
655+
656+
let _ =
657+
self.notification_root.notify(Notification::MempoolSizeChanged(MempoolSizeChangedNotification::new(network_mempool_size)));
653658
}
654659

655660
/// Adds the rpc-submitted transaction to the mempool and propagates it to peers.
@@ -673,6 +678,9 @@ impl FlowContext {
673678
false, // RPC transactions are considered high priority, so we don't want to throttle them
674679
)
675680
.await;
681+
682+
self.on_transaction_added_to_mempool().await;
683+
676684
Ok(())
677685
}
678686

@@ -698,6 +706,9 @@ impl FlowContext {
698706
false, // RPC transactions are considered high priority, so we don't want to throttle them
699707
)
700708
.await;
709+
710+
self.on_transaction_added_to_mempool().await;
711+
701712
// The combination of args above of Orphan::Forbidden and RbfPolicy::Mandatory should always result
702713
// in a removed transaction returned, however we prefer failing gracefully in case of future internal mempool changes
703714
transaction_insertion.removed.ok_or(ProtocolError::Other(

rpc/core/src/api/notifications.rs

+12
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ pub enum Notification {
4848

4949
#[display(fmt = "NewBlockTemplate notification")]
5050
NewBlockTemplate(NewBlockTemplateNotification),
51+
52+
#[display(fmt = "MempoolSizeChanged notification: size {}", "_0.network_mempool_size")]
53+
MempoolSizeChanged(MempoolSizeChangedNotification),
5154
}
5255
}
5356

@@ -64,6 +67,7 @@ impl Notification {
6467
Notification::VirtualDaaScoreChanged(v) => to_value(&v),
6568
Notification::SinkBlueScoreChanged(v) => to_value(&v),
6669
Notification::VirtualChainChanged(v) => to_value(&v),
70+
Notification::MempoolSizeChanged(v) => to_value(&v),
6771
}
6872
}
6973
}
@@ -157,6 +161,10 @@ impl Serializer for Notification {
157161
store!(u16, &8, writer)?;
158162
serialize!(NewBlockTemplateNotification, notification, writer)?;
159163
}
164+
Notification::MempoolSizeChanged(notification) => {
165+
store!(u16, &9, writer)?;
166+
serialize!(MempoolSizeChangedNotification, notification, writer)?;
167+
}
160168
}
161169
Ok(())
162170
}
@@ -202,6 +210,10 @@ impl Deserializer for Notification {
202210
let notification = deserialize!(NewBlockTemplateNotification, reader)?;
203211
Ok(Notification::NewBlockTemplate(notification))
204212
}
213+
9 => {
214+
let notification = deserialize!(MempoolSizeChangedNotification, reader)?;
215+
Ok(Notification::MempoolSizeChanged(notification))
216+
}
205217
_ => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid variant")),
206218
}
207219
}

rpc/core/src/api/ops.rs

+4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub enum RpcApiOps {
4040
NotifyVirtualDaaScoreChanged = 16,
4141
NotifyVirtualChainChanged = 17,
4242
NotifySinkBlueScoreChanged = 18,
43+
NotifyMempoolSizeChanged = 19,
4344

4445
// Notification ops required by wRPC
4546

@@ -54,6 +55,7 @@ pub enum RpcApiOps {
5455
VirtualDaaScoreChangedNotification = 66,
5556
PruningPointUtxoSetOverrideNotification = 67,
5657
NewBlockTemplateNotification = 68,
58+
MempoolSizeChangedNotification = 69,
5759

5860
// RPC methods
5961
/// Ping the node to check if connection is alive
@@ -153,6 +155,7 @@ impl RpcApiOps {
153155
| RpcApiOps::NotifyFinalityConflictResolved
154156
| RpcApiOps::NotifySinkBlueScoreChanged
155157
| RpcApiOps::NotifyVirtualDaaScoreChanged
158+
| RpcApiOps::NotifyMempoolSizeChanged
156159
| RpcApiOps::Subscribe
157160
| RpcApiOps::Unsubscribe
158161
)
@@ -177,6 +180,7 @@ impl From<EventType> for RpcApiOps {
177180
EventType::UtxosChanged => RpcApiOps::UtxosChangedNotification,
178181
EventType::SinkBlueScoreChanged => RpcApiOps::SinkBlueScoreChangedNotification,
179182
EventType::VirtualDaaScoreChanged => RpcApiOps::VirtualDaaScoreChangedNotification,
183+
EventType::MempoolSizeChanged => RpcApiOps::MempoolSizeChangedNotification,
180184
EventType::PruningPointUtxoSetOverride => RpcApiOps::PruningPointUtxoSetOverrideNotification,
181185
EventType::NewBlockTemplate => RpcApiOps::NewBlockTemplateNotification,
182186
}

rpc/core/src/convert/notification.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
33
use crate::{
44
convert::utxo::utxo_set_into_rpc, BlockAddedNotification, FinalityConflictNotification, FinalityConflictResolvedNotification,
5-
NewBlockTemplateNotification, Notification, PruningPointUtxoSetOverrideNotification, RpcAcceptedTransactionIds,
6-
SinkBlueScoreChangedNotification, UtxosChangedNotification, VirtualChainChangedNotification, VirtualDaaScoreChangedNotification,
5+
MempoolSizeChangedNotification, NewBlockTemplateNotification, Notification, PruningPointUtxoSetOverrideNotification,
6+
RpcAcceptedTransactionIds, SinkBlueScoreChangedNotification, UtxosChangedNotification, VirtualChainChangedNotification,
7+
VirtualDaaScoreChangedNotification,
78
};
89
use kaspa_consensus_notify::notification as consensus_notify;
910
use kaspa_index_core::notification as index_notify;
@@ -31,6 +32,7 @@ impl From<&consensus_notify::Notification> for Notification {
3132
consensus_notify::Notification::VirtualDaaScoreChanged(msg) => Notification::VirtualDaaScoreChanged(msg.into()),
3233
consensus_notify::Notification::PruningPointUtxoSetOverride(msg) => Notification::PruningPointUtxoSetOverride(msg.into()),
3334
consensus_notify::Notification::NewBlockTemplate(msg) => Notification::NewBlockTemplate(msg.into()),
35+
consensus_notify::Notification::MempoolSizeChanged(msg) => Notification::MempoolSizeChanged(msg.into()),
3436
}
3537
}
3638
}
@@ -112,6 +114,12 @@ impl From<&consensus_notify::NewBlockTemplateNotification> for NewBlockTemplateN
112114
}
113115
}
114116

117+
impl From<&consensus_notify::MempoolSizeChangedNotification> for MempoolSizeChangedNotification {
118+
fn from(item: &consensus_notify::MempoolSizeChangedNotification) -> Self {
119+
Self { network_mempool_size: item.network_mempool_size }
120+
}
121+
}
122+
115123
// ----------------------------------------------------------------------------
116124
// index to rpc_core
117125
// ----------------------------------------------------------------------------

rpc/core/src/convert/scope.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
//! Conversion of Notification Scope related types
22
33
use crate::{
4-
NotifyBlockAddedRequest, NotifyFinalityConflictRequest, NotifyNewBlockTemplateRequest, NotifyPruningPointUtxoSetOverrideRequest,
5-
NotifySinkBlueScoreChangedRequest, NotifyUtxosChangedRequest, NotifyVirtualChainChangedRequest,
6-
NotifyVirtualDaaScoreChangedRequest,
4+
NotifyBlockAddedRequest, NotifyFinalityConflictRequest, NotifyMempoolSizeChangedRequest, NotifyNewBlockTemplateRequest,
5+
NotifyPruningPointUtxoSetOverrideRequest, NotifySinkBlueScoreChangedRequest, NotifyUtxosChangedRequest,
6+
NotifyVirtualChainChangedRequest, NotifyVirtualDaaScoreChangedRequest,
77
};
88
use kaspa_notify::scope::*;
99

@@ -62,3 +62,4 @@ from!(SinkBlueScoreChanged);
6262
from!(VirtualDaaScoreChanged);
6363
from!(PruningPointUtxoSetOverride);
6464
from!(NewBlockTemplate);
65+
from!(MempoolSizeChanged);

rpc/core/src/model/message.rs

+75
Original file line numberDiff line numberDiff line change
@@ -3321,6 +3321,81 @@ impl Deserializer for VirtualDaaScoreChangedNotification {
33213321
}
33223322
}
33233323

3324+
// NotifyMempoolSizeChangedRequest registers this connection for
3325+
// mempoolSizeChanged notifications.
3326+
//
3327+
// See: MempoolSizeChangedNotification
3328+
#[derive(Clone, Debug, Serialize, Deserialize)]
3329+
#[serde(rename_all = "camelCase")]
3330+
pub struct NotifyMempoolSizeChangedRequest {
3331+
pub command: Command,
3332+
}
3333+
3334+
impl NotifyMempoolSizeChangedRequest {
3335+
pub fn new(command: Command) -> Self {
3336+
Self { command }
3337+
}
3338+
}
3339+
3340+
impl Serializer for NotifyMempoolSizeChangedRequest {
3341+
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
3342+
store!(u16, &1, writer)?;
3343+
store!(Command, &self.command, writer)?;
3344+
Ok(())
3345+
}
3346+
}
3347+
3348+
impl Deserializer for NotifyMempoolSizeChangedRequest {
3349+
fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
3350+
let _version = load!(u16, reader)?;
3351+
let command = load!(Command, reader)?;
3352+
Ok(Self { command })
3353+
}
3354+
}
3355+
3356+
#[derive(Clone, Debug, Serialize, Deserialize)]
3357+
#[serde(rename_all = "camelCase")]
3358+
pub struct NotifyMempoolSizeChangedResponse {}
3359+
3360+
impl Serializer for NotifyMempoolSizeChangedResponse {
3361+
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
3362+
store!(u16, &1, writer)?;
3363+
Ok(())
3364+
}
3365+
}
3366+
3367+
impl Deserializer for NotifyMempoolSizeChangedResponse {
3368+
fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
3369+
let _version = load!(u16, reader)?;
3370+
Ok(Self {})
3371+
}
3372+
}
3373+
3374+
// MempoolSizeChangedNotification is sent whenever the mempool changes.
3375+
//
3376+
// See NotifyMempoolSizeChangedRequest
3377+
#[derive(Clone, Debug, Serialize, Deserialize)]
3378+
#[serde(rename_all = "camelCase")]
3379+
pub struct MempoolSizeChangedNotification {
3380+
pub network_mempool_size: u64,
3381+
}
3382+
3383+
impl Serializer for MempoolSizeChangedNotification {
3384+
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
3385+
store!(u16, &1, writer)?;
3386+
store!(u64, &self.network_mempool_size, writer)?;
3387+
Ok(())
3388+
}
3389+
}
3390+
3391+
impl Deserializer for MempoolSizeChangedNotification {
3392+
fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
3393+
let _version = load!(u16, reader)?;
3394+
let network_mempool_size = load!(u64, reader)?;
3395+
Ok(Self { network_mempool_size })
3396+
}
3397+
}
3398+
33243399
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
33253400
// PruningPointUtxoSetOverrideNotification
33263401

rpc/core/src/model/tests.rs

+24
Original file line numberDiff line numberDiff line change
@@ -1290,6 +1290,30 @@ mod mockery {
12901290
}
12911291
}
12921292

1293+
impl Mock for NotifyMempoolSizeChangedRequest {
1294+
fn mock() -> Self {
1295+
NotifyMempoolSizeChangedRequest { command: Command::Start }
1296+
}
1297+
}
1298+
1299+
test!(NotifyMempoolSizeChangedRequest);
1300+
1301+
impl Mock for NotifyMempoolSizeChangedResponse {
1302+
fn mock() -> Self {
1303+
NotifyMempoolSizeChangedResponse {}
1304+
}
1305+
}
1306+
1307+
test!(NotifyMempoolSizeChangedResponse);
1308+
1309+
impl Mock for MempoolSizeChangedNotification {
1310+
fn mock() -> Self {
1311+
MempoolSizeChangedNotification { network_mempool_size: mock() }
1312+
}
1313+
}
1314+
1315+
test!(MempoolSizeChangedNotification);
1316+
12931317
test!(SubscribeResponse);
12941318

12951319
impl Mock for UnsubscribeResponse {

rpc/grpc/core/proto/messages.proto

+3
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ message KaspadRequest {
6666
GetFeeEstimateExperimentalRequestMessage getFeeEstimateExperimentalRequest = 1108;
6767
GetCurrentBlockColorRequestMessage getCurrentBlockColorRequest = 1110;
6868
GetUtxoReturnAddressRequestMessage GetUtxoReturnAddressRequest = 1112;
69+
NotifyMempoolSizeChangedRequestMessage notifyMempoolSizeChangedRequest = 1114;
6970
}
7071
}
7172

@@ -132,6 +133,8 @@ message KaspadResponse {
132133
GetFeeEstimateExperimentalResponseMessage getFeeEstimateExperimentalResponse = 1109;
133134
GetCurrentBlockColorResponseMessage getCurrentBlockColorResponse = 1111;
134135
GetUtxoReturnAddressResponseMessage GetUtxoReturnAddressResponse = 1113;
136+
NotifyMempoolSizeChangedResponseMessage notifyMempoolSizeChangedResponse = 1115;
137+
MempoolSizeChangedNotificationMessage mempoolSizeChangedNotification = 1117;
135138
}
136139
}
137140

0 commit comments

Comments
 (0)