Skip to content

Commit e9f60c9

Browse files
committed
Refactor
1 parent 1fa7519 commit e9f60c9

File tree

5 files changed

+24
-51
lines changed

5 files changed

+24
-51
lines changed

src/components/block_source/delta/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub struct DeltaClient {
2525
ctx: SessionContext,
2626
start_block: u64,
2727
query_step: u64,
28+
block_per_file: u32,
2829
}
2930

3031
impl DeltaClient {
@@ -57,6 +58,7 @@ impl DeltaClient {
5758
ctx,
5859
start_block,
5960
query_step: cfg.query_step,
61+
block_per_file: cfg.block_per_file,
6062
})
6163
}
6264

@@ -82,13 +84,12 @@ impl DeltaClient {
8284
sender: AsyncSender<Vec<BlockDataMessage>>,
8385
valve: Valve,
8486
) -> Result<(), SourceError> {
85-
let mut start_block = self.start_block;
87+
let mut start_block = self.start_block - (self.start_block % self.block_per_file as u64);
8688
info!(DeltaClient, "source start collecting data");
8789

8890
loop {
8991
while !valve.should_continue() {
9092
let sleep_tine = valve.get_wait();
91-
info!(DeltaClient, "source sleeping"; sleep_time => sleep_tine);
9293
tokio::time::sleep(Duration::from_secs(sleep_tine)).await;
9394
}
9495

@@ -135,6 +136,14 @@ mod test {
135136
use super::*;
136137
use crate::config::ValveConfig;
137138

139+
#[test]
140+
fn test_adjust_start_block() {
141+
let actual_start_block = 10_124_125;
142+
let block_per_file = 2000;
143+
let adjusted_start_block = actual_start_block - (actual_start_block % block_per_file);
144+
assert_eq!(adjusted_start_block, 10_124_000);
145+
}
146+
138147
#[tokio::test]
139148
async fn test_delta() {
140149
env_logger::try_init().unwrap_or_default();
@@ -143,6 +152,7 @@ mod test {
143152
table_path: "s3://ethereum/blocks_01/".to_owned(),
144153
query_step: 10,
145154
version: None,
155+
block_per_file: 2,
146156
};
147157

148158
let client = DeltaClient::new(cfg, 10_000_000).await.unwrap();

src/components/subgraph/mod.rs

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ mod datasource_wasm_instance;
22
mod metrics;
33

44
use super::ManifestLoader;
5-
use super::Valve;
65
use crate::chain::ethereum::block::EthereumBlockData;
76
use crate::common::HandlerTypes;
87
use crate::config::Config;
98
use crate::database::DatabaseAgent;
10-
use crate::error;
119
use crate::errors::SubgraphError;
1210
use crate::info;
1311
use crate::messages::EthereumFilteredEvent;
@@ -18,11 +16,9 @@ use datasource_wasm_instance::DatasourceWasmInstance;
1816
use metrics::SubgraphMetrics;
1917
use prometheus::Registry;
2018
use std::collections::HashMap;
21-
use std::time::Instant;
2219

2320
pub struct Subgraph {
2421
// NOTE: using IPFS might lead to subgraph-id using a hex/hash
25-
pub id: String,
2622
pub name: String,
2723
sources: HashMap<String, DatasourceWasmInstance>,
2824
metrics: SubgraphMetrics,
@@ -33,7 +29,6 @@ impl Subgraph {
3329
Self {
3430
sources: HashMap::new(),
3531
name: config.subgraph_name.clone(),
36-
id: config.get_subgraph_id(),
3732
metrics: SubgraphMetrics::new(registry),
3833
}
3934
}
@@ -109,17 +104,9 @@ impl Subgraph {
109104
}
110105
}
111106

112-
pub async fn process(
113-
&mut self,
114-
msg: FilteredDataMessage,
115-
db_agent: &DatabaseAgent,
116-
rpc_agent: &RpcAgent,
117-
valve: &Valve,
118-
) -> Result<(), SubgraphError> {
107+
pub async fn process(&mut self, msg: FilteredDataMessage) -> Result<(), SubgraphError> {
119108
let block_ptr = msg.get_block_ptr();
120109

121-
rpc_agent.set_block_ptr(block_ptr.clone()).await;
122-
123110
self.metrics
124111
.current_block_number
125112
.set(block_ptr.number as i64);
@@ -136,29 +123,6 @@ impl Subgraph {
136123
block_number => block_ptr.number,
137124
block_hash => block_ptr.hash
138125
);
139-
valve.set_finished(block_ptr.number);
140-
}
141-
142-
if block_ptr.number % 2000 == 0 {
143-
info!(Subgraph, "commiting data to DB"; block_number => block_ptr.number);
144-
let time = Instant::now();
145-
db_agent.migrate(block_ptr.clone()).await.map_err(|e| {
146-
error!(
147-
Subgraph, "Failed to commit data to DB";
148-
error => e.to_string(),
149-
block_number => block_ptr.number,
150-
block_hash => block_ptr.hash
151-
);
152-
SubgraphError::MigrateDbError
153-
})?;
154-
155-
info!(Subgraph, "data committed to database"; execution_time => format!("{:?}", time.elapsed()));
156-
157-
db_agent
158-
.clear_in_memory()
159-
.await
160-
.map_err(|_| SubgraphError::MigrateDbError)?;
161-
info!(Subgraph, "flushed entity cache");
162126
}
163127

164128
Ok(())

src/config.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub struct DeltaConfig {
1010
pub table_path: String,
1111
pub query_step: u64,
1212
pub version: Option<u64>,
13+
pub block_per_file: u32,
1314
}
1415

1516
#[derive(Deserialize, Clone, Debug)]
@@ -36,7 +37,6 @@ pub struct Config {
3637
pub chain: Chain,
3738
pub source: SourceTypes,
3839
pub subgraph_name: String,
39-
pub subgraph_id: Option<String>,
4040
pub subgraph_dir: String,
4141
pub database: DatabaseConfig,
4242
pub reorg_threshold: u16,
@@ -54,13 +54,6 @@ impl Config {
5454
.extract()
5555
.expect("Load config failed")
5656
}
57-
58-
pub fn get_subgraph_id(&self) -> String {
59-
self.subgraph_id
60-
.clone()
61-
.unwrap_or(self.subgraph_name.to_owned())
62-
.to_owned()
63-
}
6457
}
6558

6659
#[cfg(test)]

src/errors.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,6 @@ pub enum SubgraphError {
103103
InvalidSourceID(String),
104104
#[error("Invalid handler_name: {0}")]
105105
InvalidHandlerName(String),
106-
#[error("Migrate memory to db error")]
107-
MigrateDbError,
108106
#[error("Create source failed: `{0}`")]
109107
CreateSourceFail(String),
110108
}

src/main.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7676
let time = std::time::Instant::now();
7777
let sorted_blocks = filter.filter_multi(blocks)?;
7878
let count_blocks = sorted_blocks.len();
79+
let last_block = sorted_blocks.last().map(|b| b.get_block_ptr());
7980

8081
info!(
8182
MainFlow,
@@ -88,6 +89,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
8889

8990
for block in sorted_blocks {
9091
let block_ptr = block.get_block_ptr();
92+
rpc.set_block_ptr(&block_ptr).await;
9193

9294
match inspector.check_block(block_ptr.clone()) {
9395
BlockInspectionResult::UnexpectedBlock
@@ -105,15 +107,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
105107
};
106108

107109
subgraph.create_sources(&manifest, &db, &rpc).await?;
108-
subgraph.process(block, &db, &rpc, &valve).await?;
110+
subgraph.process(block).await?;
111+
}
112+
113+
if let Some(block_ptr) = last_block {
114+
db.commit_data(block_ptr.clone()).await?;
115+
db.flush_cache().await?;
116+
valve.set_finished(block_ptr.number);
109117
}
110118

111119
info!(
112120
MainFlow,
113121
"block batch processed done";
114122
exec_time => format!("{:?}", time.elapsed()),
115123
number_of_blocks => count_blocks,
116-
avg_speed => format!("~{:?} blocks/sec", (count_blocks as u64 / time.elapsed().as_secs()) as u64)
124+
avg_speed => format!("~{:?} blocks/sec", { count_blocks as u64 / time.elapsed().as_secs() })
117125
);
118126
}
119127

0 commit comments

Comments
 (0)