@@ -252,16 +252,19 @@ namespace NYql::NConnector {
252
252
promise.SetValue ({std::move (status), std::move (NApi::TDescribeTableResponse ())});
253
253
return promise.GetFuture ();
254
254
}
255
+
256
+ auto context = CreateClientContext ();
255
257
256
- auto callback = [promise](NYdbGrpc::TGrpcStatus&& status, NApi::TDescribeTableResponse&& resp) mutable {
258
+ auto callback = [context, promise](NYdbGrpc::TGrpcStatus&& status, NApi::TDescribeTableResponse&& resp) mutable {
257
259
promise.SetValue ({std::move (status), std::move (resp)});
258
260
};
259
261
260
262
GetConnection (kind)->DoRequest <NApi::TDescribeTableRequest, NApi::TDescribeTableResponse>(
261
263
std::move (request),
262
264
std::move (callback),
263
265
&NApi::Connector::Stub::AsyncDescribeTable,
264
- { .Timeout = timeout }
266
+ { .Timeout = timeout },
267
+ context.get ()
265
268
);
266
269
267
270
return promise.GetFuture ();
@@ -307,6 +310,16 @@ namespace NYql::NConnector {
307
310
}
308
311
309
312
private:
313
+ NYdbGrpc::IQueueClientContextPtr CreateClientContext () {
314
+ auto context = GrpcClient_->CreateContext ();
315
+
316
+ if (!context) {
317
+ throw yexception () << " Client is being shut down" ;
318
+ }
319
+
320
+ return context;
321
+ }
322
+
310
323
void Init (const TGenericGatewayConfig& config) {
311
324
// TODO: place in a config file ?
312
325
GrpcClient_ = std::make_shared<NYdbGrpc::TGRpcClientLow>(DEFAULT_CONNECTION_MANAGER_NUM_THREADS);
@@ -378,10 +391,11 @@ namespace NYql::NConnector {
378
391
typename TRpcCallback = typename NYdbGrpc::TStreamRequestReadProcessor<NApi::Connector::Stub, TRequest, TResponse>::TAsyncRequest
379
392
>
380
393
TIteratorAsyncResult<IStreamIterator<TResponse>> ServerSideStreamingCall (
381
- const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) const {
394
+ const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) {
382
395
auto promise = NThreading::NewPromise<TIteratorResult<IStreamIterator<TResponse>>>();
396
+ auto context = CreateClientContext ();
383
397
384
- auto callback = [promise](NYdbGrpc::TGrpcStatus&& status, NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr streamProcessor) mutable {
398
+ auto callback = [context, promise](NYdbGrpc::TGrpcStatus&& status, NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr streamProcessor) mutable {
385
399
if (!streamProcessor) {
386
400
promise.SetValue ({std::move (status), nullptr });
387
401
return ;
@@ -396,7 +410,8 @@ namespace NYql::NConnector {
396
410
std::move (request),
397
411
std::move (callback),
398
412
rpc,
399
- { .Timeout = timeout }
413
+ { .Timeout = timeout },
414
+ context.get ()
400
415
);
401
416
402
417
return promise.GetFuture ();
0 commit comments