@@ -5,12 +5,11 @@ use crate::{
5
5
address_cache:: AddressCache ,
6
6
availability:: ApiAvailabilityHandle ,
7
7
https_client_with_sni:: { HttpsConnectorWithSni , HttpsConnectorWithSniHandle } ,
8
- proxy:: ApiConnectionMode ,
8
+ proxy:: ConnectionModeProvider ,
9
9
} ;
10
10
use futures:: {
11
11
channel:: { mpsc, oneshot} ,
12
12
stream:: StreamExt ,
13
- Stream ,
14
13
} ;
15
14
use hyper:: {
16
15
client:: { connect:: Connect , Client } ,
@@ -120,23 +119,22 @@ impl Error {
120
119
121
120
/// A service that executes HTTP requests, allowing for on-demand termination of all in-flight
122
121
/// requests
123
- pub ( crate ) struct RequestService < T : Stream < Item = ApiConnectionMode > > {
122
+ pub ( crate ) struct RequestService < T : ConnectionModeProvider > {
124
123
command_tx : Weak < mpsc:: UnboundedSender < RequestCommand > > ,
125
124
command_rx : mpsc:: UnboundedReceiver < RequestCommand > ,
126
125
connector_handle : HttpsConnectorWithSniHandle ,
127
126
client : hyper:: Client < HttpsConnectorWithSni , hyper:: Body > ,
128
- proxy_config_provider : T ,
127
+ connection_mode_provider : T ,
129
128
api_availability : ApiAvailabilityHandle ,
130
129
}
131
130
132
- impl < T : Stream < Item = ApiConnectionMode > + Unpin + Send + ' static > RequestService < T > {
131
+ impl < T : ConnectionModeProvider + ' static > RequestService < T > {
133
132
/// Constructs a new request service.
134
133
pub fn spawn (
135
134
sni_hostname : Option < String > ,
136
135
api_availability : ApiAvailabilityHandle ,
137
136
address_cache : AddressCache ,
138
- initial_connection_mode : ApiConnectionMode ,
139
- proxy_config_provider : T ,
137
+ connection_mode_provider : T ,
140
138
#[ cfg( target_os = "android" ) ] socket_bypass_tx : Option < mpsc:: Sender < SocketBypassRequest > > ,
141
139
) -> RequestServiceHandle {
142
140
let ( connector, connector_handle) = HttpsConnectorWithSni :: new (
@@ -146,7 +144,7 @@ impl<T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static> RequestServic
146
144
socket_bypass_tx. clone ( ) ,
147
145
) ;
148
146
149
- connector_handle. set_connection_mode ( initial_connection_mode ) ;
147
+ connector_handle. set_connection_mode ( connection_mode_provider . initial ( ) ) ;
150
148
151
149
let ( command_tx, command_rx) = mpsc:: unbounded ( ) ;
152
150
let client = Client :: builder ( ) . build ( connector) ;
@@ -158,14 +156,35 @@ impl<T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static> RequestServic
158
156
command_rx,
159
157
connector_handle,
160
158
client,
161
- proxy_config_provider ,
159
+ connection_mode_provider ,
162
160
api_availability,
163
161
} ;
164
162
let handle = RequestServiceHandle { tx : command_tx } ;
165
163
tokio:: spawn ( service. into_future ( ) ) ;
166
164
handle
167
165
}
168
166
167
+ async fn into_future ( mut self ) {
168
+ loop {
169
+ tokio:: select! {
170
+ new_mode = self . connection_mode_provider. receive( ) => {
171
+ let Some ( new_mode) = new_mode else {
172
+ break ;
173
+ } ;
174
+ self . connector_handle. set_connection_mode( new_mode) ;
175
+ }
176
+ command = self . command_rx. next( ) => {
177
+ let Some ( command) = command else {
178
+ break ;
179
+ } ;
180
+
181
+ self . process_command( command) . await ;
182
+ }
183
+ }
184
+ }
185
+ self . connector_handle . reset ( ) ;
186
+ }
187
+
169
188
async fn process_command ( & mut self , command : RequestCommand ) {
170
189
match command {
171
190
RequestCommand :: NewRequest ( request, completion_tx) => {
@@ -174,11 +193,8 @@ impl<T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static> RequestServic
174
193
RequestCommand :: Reset => {
175
194
self . connector_handle . reset ( ) ;
176
195
}
177
- RequestCommand :: NextApiConfig ( completion_tx) => {
178
- if let Some ( connection_mode) = self . proxy_config_provider . next ( ) . await {
179
- self . connector_handle . set_connection_mode ( connection_mode) ;
180
- }
181
- let _ = completion_tx. send ( Ok ( ( ) ) ) ;
196
+ RequestCommand :: NextApiConfig => {
197
+ self . connection_mode_provider . rotate ( ) . await ;
182
198
}
183
199
}
184
200
}
@@ -201,22 +217,14 @@ impl<T: Stream<Item = ApiConnectionMode> + Unpin + Send + 'static> RequestServic
201
217
if err. is_network_error ( ) && !api_availability. get_state ( ) . is_offline ( ) {
202
218
log:: error!( "{}" , err. display_chain_with_msg( "HTTP request failed" ) ) ;
203
219
if let Some ( tx) = tx {
204
- let ( completion_tx, _completion_rx) = oneshot:: channel ( ) ;
205
- let _ = tx. unbounded_send ( RequestCommand :: NextApiConfig ( completion_tx) ) ;
220
+ let _ = tx. unbounded_send ( RequestCommand :: NextApiConfig ) ;
206
221
}
207
222
}
208
223
}
209
224
210
225
let _ = completion_tx. send ( response) ;
211
226
} ) ;
212
227
}
213
-
214
- async fn into_future ( mut self ) {
215
- while let Some ( command) = self . command_rx . next ( ) . await {
216
- self . process_command ( command) . await ;
217
- }
218
- self . connector_handle . reset ( ) ;
219
- }
220
228
}
221
229
222
230
#[ derive( Clone ) ]
@@ -239,15 +247,6 @@ impl RequestServiceHandle {
239
247
. map_err ( |_| Error :: RestServiceDown ) ?;
240
248
completion_rx. await . map_err ( |_| Error :: RestServiceDown ) ?
241
249
}
242
-
243
- /// Forcibly update the connection mode.
244
- pub async fn next_api_endpoint ( & self ) -> Result < ( ) > {
245
- let ( completion_tx, completion_rx) = oneshot:: channel ( ) ;
246
- self . tx
247
- . unbounded_send ( RequestCommand :: NextApiConfig ( completion_tx) )
248
- . map_err ( |_| Error :: RestServiceDown ) ?;
249
- completion_rx. await . map_err ( |_| Error :: RestServiceDown ) ?
250
- }
251
250
}
252
251
253
252
#[ derive( Debug ) ]
@@ -257,7 +256,7 @@ pub(crate) enum RequestCommand {
257
256
oneshot:: Sender < std:: result:: Result < Response , Error > > ,
258
257
) ,
259
258
Reset ,
260
- NextApiConfig ( oneshot :: Sender < std :: result :: Result < ( ) , Error > > ) ,
259
+ NextApiConfig ,
261
260
}
262
261
263
262
/// A REST request that is sent to the RequestService to be executed.
0 commit comments