Skip to content

Commit 9fcc709

Browse files
committed
expose operation_id and make session_id required
1 parent c9ea189 commit 9fcc709

File tree

6 files changed

+44
-36
lines changed

6 files changed

+44
-36
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ballista/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ tokio = { workspace = true }
6060
tokio-stream = { workspace = true, features = ["net"] }
6161
tonic = { workspace = true }
6262
url = { workspace = true }
63+
uuid = { workspace = true }
6364

6465
[dev-dependencies]
6566
tempfile = { workspace = true }

ballista/core/proto/ballista.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,8 +524,12 @@ message ExecuteQueryParams {
524524
string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL needed use `flight-sql`
525525
}
526526

527-
optional string session_id = 3;
527+
string session_id = 3;
528528
repeated KeyValuePair settings = 4;
529+
// operation_id is unique number for each request
530+
// client makes. it helps mapping requests between
531+
// client and scheduler
532+
string operation_id = 5;
529533
}
530534

531535
message CreateUpdateSessionParams {
@@ -563,6 +567,7 @@ message ExecuteQueryResult {
563567
ExecuteQuerySuccessResult success = 1;
564568
ExecuteQueryFailureResult failure = 2;
565569
}
570+
string operation_id = 3;
566571
}
567572

568573
message ExecuteQuerySuccessResult {

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion_proto::logical_plan::{
3939
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
4040
};
4141
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
42-
use log::{error, info};
42+
use log::{debug, error, info};
4343
use std::any::Any;
4444
use std::fmt::Debug;
4545
use std::marker::PhantomData;
@@ -202,11 +202,16 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
202202
},
203203
)
204204
.collect();
205-
205+
let operation_id = uuid::Uuid::now_v7().to_string();
206+
debug!(
207+
"Distributed query with session_id: {}, execution operation_id: {}",
208+
self.session_id, operation_id
209+
);
206210
let query = ExecuteQueryParams {
207211
query: Some(Query::LogicalPlan(buf)),
208212
settings,
209-
session_id: Some(self.session_id.clone()),
213+
session_id: self.session_id.clone(),
214+
operation_id,
210215
};
211216

212217
let stream = futures::stream::once(

ballista/core/src/serde/generated/ballista.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,10 +786,12 @@ pub struct UpdateTaskStatusResult {
786786
}
787787
#[derive(Clone, PartialEq, ::prost::Message)]
788788
pub struct ExecuteQueryParams {
789-
#[prost(string, optional, tag = "3")]
790-
pub session_id: ::core::option::Option<::prost::alloc::string::String>,
789+
#[prost(string, tag = "3")]
790+
pub session_id: ::prost::alloc::string::String,
791791
#[prost(message, repeated, tag = "4")]
792792
pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
793+
#[prost(string, tag = "5")]
794+
pub operation_id: ::prost::alloc::string::String,
793795
#[prost(oneof = "execute_query_params::Query", tags = "1, 2")]
794796
pub query: ::core::option::Option<execute_query_params::Query>,
795797
}
@@ -845,6 +847,8 @@ pub struct ExecuteSqlParams {
845847
}
846848
#[derive(Clone, PartialEq, ::prost::Message)]
847849
pub struct ExecuteQueryResult {
850+
#[prost(string, tag = "3")]
851+
pub operation_id: ::prost::alloc::string::String,
848852
#[prost(oneof = "execute_query_result::Result", tags = "1, 2")]
849853
pub result: ::core::option::Option<execute_query_result::Result>,
850854
}

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
313313
if let ExecuteQueryParams {
314314
query: Some(query),
315315
session_id,
316+
operation_id,
316317
settings,
317318
} = query_params
318319
{
@@ -322,35 +323,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
322323
.and_then(|s| s.value.clone())
323324
.unwrap_or_default();
324325

325-
let (session_id, session_ctx) = match session_id {
326-
Some(session_id) => {
327-
let session_config = self.state.session_manager.produce_config();
328-
let session_config =
329-
session_config.update_from_key_value_pair(&settings);
326+
let job_id = self.state.task_manager.generate_job_id();
330327

331-
let ctx = self
332-
.state
333-
.session_manager
334-
.create_or_update_session(&session_id, &session_config)
335-
.await
336-
.map_err(|e| {
337-
Status::internal(format!(
338-
"Failed to create SessionContext: {e:?}"
339-
))
340-
})?;
328+
info!("execution query - session_id: {}, operation_id: {}, job_name: {}, job_id: {}", session_id, operation_id, job_name, job_id);
341329

342-
(session_id, ctx)
343-
}
344-
_ => {
345-
error!("Client should set session_id");
346-
return Ok(Response::new(ExecuteQueryResult {
347-
result: Some(execute_query_result::Result::Failure(
348-
ExecuteQueryFailureResult {
349-
failure: Some(execute_query_failure_result::Failure::SessionNotFound("Client should set session_id".to_string())),
350-
},
351-
)),
352-
}));
353-
}
330+
let (session_id, session_ctx) = {
331+
let session_config = self.state.session_manager.produce_config();
332+
let session_config = session_config.update_from_key_value_pair(&settings);
333+
334+
let ctx = self
335+
.state
336+
.session_manager
337+
.create_or_update_session(&session_id, &session_config)
338+
.await
339+
.map_err(|e| {
340+
Status::internal(format!(
341+
"Failed to create SessionContext: {e:?}"
342+
))
343+
})?;
344+
345+
(session_id, ctx)
354346
};
355347

356348
let plan = match query {
@@ -367,6 +359,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
367359
format!("Could not parse logical plan protobuf: {e}");
368360
error!("{}", msg);
369361
return Ok(Response::new(ExecuteQueryResult {
362+
operation_id,
370363
result: Some(execute_query_result::Result::Failure(
371364
ExecuteQueryFailureResult {
372365
failure: Some(execute_query_failure_result::Failure::PlanParsingFailure(msg)),
@@ -387,6 +380,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
387380
let msg = format!("Error parsing SQL: {e}");
388381
error!("{}", msg);
389382
return Ok(Response::new(ExecuteQueryResult {
383+
operation_id,
390384
result: Some(execute_query_result::Result::Failure(
391385
ExecuteQueryFailureResult {
392386
failure: Some(execute_query_failure_result::Failure::PlanParsingFailure(msg)),
@@ -403,9 +397,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
403397
plan.display_indent()
404398
);
405399

406-
let job_id = self.state.task_manager.generate_job_id();
407-
408-
log::trace!("setting job name: {}", job_name);
409400
self.submit_job(&job_id, &job_name, session_ctx, &plan)
410401
.await
411402
.map_err(|e| {
@@ -417,6 +408,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
417408
})?;
418409

419410
Ok(Response::new(ExecuteQueryResult {
411+
operation_id,
420412
result: Some(execute_query_result::Result::Success(
421413
ExecuteQuerySuccessResult { job_id, session_id },
422414
)),

0 commit comments

Comments
 (0)