@@ -114,7 +114,7 @@ impl NameServiceProxy {
114
114
async fn msync_if_needed ( & self , write : bool ) -> Result < ( ) > {
115
115
if !self . msycned . fetch_or ( true , Ordering :: SeqCst ) && !write {
116
116
let msync_msg = hdfs:: MsyncRequestProto :: default ( ) ;
117
- self . call_inner ( "msync" , msync_msg. encode_length_delimited_to_vec ( ) , false )
117
+ self . call_inner ( "msync" , msync_msg. encode_length_delimited_to_vec ( ) , true )
118
118
. await
119
119
. map ( |_| ( ) )
120
120
. or_else ( |err| match err {
@@ -196,17 +196,23 @@ impl NameServiceProxy {
196
196
}
197
197
Err ( _) if attempts >= self . proxy_connections . len ( ) - 1 => return result,
198
198
// Retriable error, do nothing and try the next connection
199
- Err ( HdfsError :: RPCError ( exception, _) ) => match exception. as_ref ( ) {
200
- OBSERVER_RETRY_EXCEPTION => {
201
- self . current_observers . lock ( ) . unwrap ( ) . insert ( proxy_index) ;
202
- }
203
- STANDBY_EXCEPTION => {
204
- self . current_observers . lock ( ) . unwrap ( ) . remove ( & proxy_index) ;
199
+ Err ( HdfsError :: RPCError ( exception, _) )
200
+ | Err ( HdfsError :: FatalRPCError ( exception, _) )
201
+ if Self :: is_retriable ( & exception) =>
202
+ {
203
+ match exception. as_ref ( ) {
204
+ OBSERVER_RETRY_EXCEPTION => {
205
+ self . current_observers . lock ( ) . unwrap ( ) . insert ( proxy_index) ;
206
+ }
207
+ STANDBY_EXCEPTION => {
208
+ self . current_observers . lock ( ) . unwrap ( ) . remove ( & proxy_index) ;
209
+ }
210
+ _ => ( ) ,
205
211
}
206
- _ => ( ) ,
207
- } ,
212
+ }
208
213
Err ( e) => {
209
214
// Some other error, we will retry but log the error
215
+ self . current_observers . lock ( ) . unwrap ( ) . remove ( & proxy_index) ;
210
216
warn ! ( "{:?}" , e) ;
211
217
}
212
218
}
0 commit comments