1
+ #[ path = "../src/test_utils.rs" ]
2
+ mod test_utils;
1
3
use anyhow:: Result ;
2
4
use async_std:: {
3
5
net:: { TcpListener , TcpStream } ,
@@ -10,13 +12,13 @@ use hypercore::{
10
12
Hypercore , HypercoreBuilder , PartialKeypair , RequestBlock , RequestUpgrade , Storage ,
11
13
VerifyingKey ,
12
14
} ;
13
- use std:: { collections:: HashMap , convert:: TryInto , env, fmt:: Debug , sync :: OnceLock } ;
14
- use tracing:: { error, info} ;
15
+ use std:: { collections:: HashMap , convert:: TryInto , env, fmt:: Debug } ;
16
+ use tracing:: { error, info, instrument } ;
15
17
16
18
use hypercore_protocol:: { discovery_key, schema:: * , Channel , Event , Message , ProtocolBuilder } ;
17
19
18
20
fn main ( ) {
19
- log ( ) ;
21
+ test_utils :: log ( ) ;
20
22
if env:: args ( ) . count ( ) < 3 {
21
23
usage ( ) ;
22
24
}
@@ -62,12 +64,11 @@ fn main() {
62
64
hypercore_store. add ( hypercore_wrapper) ;
63
65
let hypercore_store = Arc :: new ( hypercore_store) ;
64
66
65
- let result = match mode. as_ref ( ) {
67
+ let _ = match mode. as_ref ( ) {
66
68
"server" => tcp_server ( address, onconnection, hypercore_store) . await ,
67
69
"client" => tcp_client ( address, onconnection, hypercore_store) . await ,
68
70
_ => panic ! ( "{:?}" , usage( ) ) ,
69
71
} ;
70
- log_if_error ( & result) ;
71
72
} ) ;
72
73
}
73
74
@@ -81,6 +82,7 @@ fn usage() {
81
82
// or once when connected (if client).
82
83
// Unfortunately, everything that touches the hypercore_store or a hypercore has to be generic
83
84
// at the moment.
85
+ #[ instrument( skip_all, ret) ]
84
86
async fn onconnection (
85
87
stream : TcpStream ,
86
88
is_initiator : bool ,
@@ -123,17 +125,17 @@ struct HypercoreStore {
123
125
hypercores : HashMap < String , Arc < HypercoreWrapper > > ,
124
126
}
125
127
impl HypercoreStore {
126
- pub fn new ( ) -> Self {
128
+ fn new ( ) -> Self {
127
129
let hypercores = HashMap :: new ( ) ;
128
130
Self { hypercores }
129
131
}
130
132
131
- pub fn add ( & mut self , hypercore : HypercoreWrapper ) {
133
+ fn add ( & mut self , hypercore : HypercoreWrapper ) {
132
134
let hdkey = hex:: encode ( hypercore. discovery_key ) ;
133
135
self . hypercores . insert ( hdkey, Arc :: new ( hypercore) ) ;
134
136
}
135
137
136
- pub fn get ( & self , discovery_key : & [ u8 ; 32 ] ) -> Option < & Arc < HypercoreWrapper > > {
138
+ fn get ( & self , discovery_key : & [ u8 ; 32 ] ) -> Option < & Arc < HypercoreWrapper > > {
137
139
let hdkey = hex:: encode ( discovery_key) ;
138
140
self . hypercores . get ( & hdkey)
139
141
}
@@ -148,7 +150,7 @@ struct HypercoreWrapper {
148
150
}
149
151
150
152
impl HypercoreWrapper {
151
- pub fn from_memory_hypercore ( hypercore : Hypercore ) -> Self {
153
+ fn from_memory_hypercore ( hypercore : Hypercore ) -> Self {
152
154
let key = hypercore. key_pair ( ) . public . to_bytes ( ) ;
153
155
HypercoreWrapper {
154
156
key,
@@ -157,11 +159,11 @@ impl HypercoreWrapper {
157
159
}
158
160
}
159
161
160
- pub fn key ( & self ) -> & [ u8 ; 32 ] {
162
+ fn key ( & self ) -> & [ u8 ; 32 ] {
161
163
& self . key
162
164
}
163
165
164
- pub fn onpeer ( & self , mut channel : Channel ) {
166
+ fn onpeer ( & self , mut channel : Channel ) {
165
167
let mut peer_state = PeerState :: default ( ) ;
166
168
let mut hypercore = self . hypercore . clone ( ) ;
167
169
task:: spawn ( async move {
@@ -415,32 +417,9 @@ async fn onmessage(
415
417
Ok ( ( ) )
416
418
}
417
419
418
- #[ allow( unused) ]
419
- pub fn log ( ) {
420
- use tracing_subscriber:: { fmt:: format:: FmtSpan , EnvFilter } ;
421
- static START_LOGS : OnceLock < ( ) > = OnceLock :: new ( ) ;
422
- START_LOGS . get_or_init ( || {
423
- tracing_subscriber:: fmt ( )
424
- . with_target ( true )
425
- . with_line_number ( true )
426
- // print when instrumented funtion enters
427
- . with_span_events ( FmtSpan :: ENTER | FmtSpan :: EXIT )
428
- . with_file ( true )
429
- . with_env_filter ( EnvFilter :: from_default_env ( ) ) // Reads `RUST_LOG` environment variable
430
- . without_time ( )
431
- . init ( ) ;
432
- } ) ;
433
- }
434
-
435
- /// Log a result if it's an error.
436
- pub fn log_if_error ( result : & Result < ( ) > ) {
437
- if let Err ( err) = result. as_ref ( ) {
438
- log:: error!( "error: {}" , err) ;
439
- }
440
- }
441
-
442
420
/// A simple async TCP server that calls an async function for each incoming connection.
443
- pub async fn tcp_server < F , C > (
421
+ #[ instrument( skip_all, ret) ]
422
+ async fn tcp_server < F , C > (
444
423
address : String ,
445
424
onconnection : impl Fn ( TcpStream , bool , C ) -> F + Send + Sync + Copy + ' static ,
446
425
context : C ,
@@ -450,22 +429,22 @@ where
450
429
C : Clone + Send + ' static ,
451
430
{
452
431
let listener = TcpListener :: bind ( & address) . await ?;
453
- log :: info!( "listening on {}" , listener. local_addr( ) ?) ;
432
+ tracing :: info!( "listening on {}" , listener. local_addr( ) ?) ;
454
433
let mut incoming = listener. incoming ( ) ;
455
434
while let Some ( Ok ( stream) ) = incoming. next ( ) . await {
456
435
let context = context. clone ( ) ;
457
436
let peer_addr = stream. peer_addr ( ) . unwrap ( ) ;
458
- log :: info!( "new connection from {}" , peer_addr) ;
437
+ tracing :: info!( "new connection from {}" , peer_addr) ;
459
438
task:: spawn ( async move {
460
- let result = onconnection ( stream, false , context) . await ;
461
- log_if_error ( & result) ;
462
- log:: info!( "connection closed from {}" , peer_addr) ;
439
+ let _ = onconnection ( stream, false , context) . await ;
440
+ tracing:: info!( "connection closed from {}" , peer_addr) ;
463
441
} ) ;
464
442
}
465
443
Ok ( ( ) )
466
444
}
467
445
468
446
/// A simple async TCP client that calls an async function when connected.
447
+ #[ instrument( skip_all, ret) ]
469
448
pub async fn tcp_client < F , C > (
470
449
address : String ,
471
450
onconnection : impl Fn ( TcpStream , bool , C ) -> F + Send + Sync + Copy + ' static ,
@@ -475,8 +454,8 @@ where
475
454
F : Future < Output = Result < ( ) > > + Send ,
476
455
C : Clone + Send + ' static ,
477
456
{
478
- log :: info!( "attempting connection to {address}" ) ;
457
+ tracing :: info!( "attempting connection to {address}" ) ;
479
458
let stream = TcpStream :: connect ( & address) . await ?;
480
- log :: info!( "connected to {address}" ) ;
459
+ tracing :: info!( "connected to {address}" ) ;
481
460
onconnection ( stream, true , context) . await
482
461
}
0 commit comments