Skip to content

Commit 30f3042

Browse files
authored
feat: ClusterState does not cache session contexts (#1226)
* make scheduler session context stateless ... ... as session context was not cleaned up. * rename create_session to create_or_update session * update proto removing update session and adding create or update * remove unused code * add test to cover session events * expose operation_id and make session_id required * minor generated code update * fix standalone context creation * core extension cleanup
1 parent 64ed787 commit 30f3042

File tree

16 files changed

+213
-386
lines changed

16 files changed

+213
-386
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ballista/client/src/extension.rs

Lines changed: 18 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use ballista_core::extension::SessionConfigHelperExt;
1918
pub use ballista_core::extension::{SessionConfigExt, SessionStateExt};
20-
use ballista_core::{
21-
serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams},
22-
utils::create_grpc_client_connection,
23-
};
19+
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
2420
use datafusion::{
25-
error::DataFusionError,
26-
execution::SessionState,
27-
prelude::{SessionConfig, SessionContext},
21+
error::DataFusionError, execution::SessionState, prelude::SessionContext,
2822
};
2923
use url::Url;
3024

@@ -100,40 +94,34 @@ impl SessionContextExt for SessionContext {
10094
url: &str,
10195
state: SessionState,
10296
) -> datafusion::error::Result<SessionContext> {
103-
let config = state.config();
104-
10597
let scheduler_url = Extension::parse_url(url)?;
10698
log::info!(
10799
"Connecting to Ballista scheduler at {}",
108100
scheduler_url.clone()
109101
);
110-
let remote_session_id =
111-
Extension::setup_remote(config, scheduler_url.clone()).await?;
102+
103+
let session_state = state.upgrade_for_ballista(scheduler_url)?;
104+
112105
log::info!(
113106
"Server side SessionContext created with session id: {}",
114-
remote_session_id
107+
session_state.session_id()
115108
);
116-
let session_state =
117-
state.upgrade_for_ballista(scheduler_url, remote_session_id)?;
118109

119110
Ok(SessionContext::new_with_state(session_state))
120111
}
121112

122113
async fn remote(url: &str) -> datafusion::error::Result<SessionContext> {
123-
let config = SessionConfig::new_with_ballista();
124114
let scheduler_url = Extension::parse_url(url)?;
125115
log::info!(
126116
"Connecting to Ballista scheduler at: {}",
127117
scheduler_url.clone()
128118
);
129-
let remote_session_id =
130-
Extension::setup_remote(&config, scheduler_url.clone()).await?;
119+
120+
let session_state = SessionState::new_ballista_state(scheduler_url)?;
131121
log::info!(
132122
"Server side SessionContext created with session id: {}",
133-
remote_session_id
123+
session_state.session_id()
134124
);
135-
let session_state =
136-
SessionState::new_ballista_state(scheduler_url, remote_session_id)?;
137125

138126
Ok(SessionContext::new_with_state(session_state))
139127
}
@@ -142,15 +130,13 @@ impl SessionContextExt for SessionContext {
142130
async fn standalone_with_state(
143131
state: SessionState,
144132
) -> datafusion::error::Result<SessionContext> {
145-
let (remote_session_id, scheduler_url) =
146-
Extension::setup_standalone(Some(&state)).await?;
133+
let scheduler_url = Extension::setup_standalone(Some(&state)).await?;
147134

148-
let session_state =
149-
state.upgrade_for_ballista(scheduler_url, remote_session_id.clone())?;
135+
let session_state = state.upgrade_for_ballista(scheduler_url)?;
150136

151137
log::info!(
152138
"Server side SessionContext created with session id: {}",
153-
remote_session_id
139+
session_state.session_id()
154140
);
155141

156142
Ok(SessionContext::new_with_state(session_state))
@@ -160,15 +146,13 @@ impl SessionContextExt for SessionContext {
160146
async fn standalone() -> datafusion::error::Result<Self> {
161147
log::info!("Running in local mode. Scheduler will be run in-proc");
162148

163-
let (remote_session_id, scheduler_url) =
164-
Extension::setup_standalone(None).await?;
149+
let scheduler_url = Extension::setup_standalone(None).await?;
165150

166-
let session_state =
167-
SessionState::new_ballista_state(scheduler_url, remote_session_id.clone())?;
151+
let session_state = SessionState::new_ballista_state(scheduler_url)?;
168152

169153
log::info!(
170154
"Server side SessionContext created with session id: {}",
171-
remote_session_id
155+
session_state.session_id()
172156
);
173157

174158
Ok(SessionContext::new_with_state(session_state))
@@ -193,7 +177,7 @@ impl Extension {
193177
#[cfg(feature = "standalone")]
194178
async fn setup_standalone(
195179
session_state: Option<&SessionState>,
196-
) -> datafusion::error::Result<(String, String)> {
180+
) -> datafusion::error::Result<String> {
197181
use ballista_core::{serde::BallistaCodec, utils::default_config_producer};
198182

199183
let addr = match session_state {
@@ -214,7 +198,7 @@ impl Extension {
214198

215199
let scheduler_url = format!("http://localhost:{}", addr.port());
216200

217-
let mut scheduler = loop {
201+
let scheduler = loop {
218202
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
219203
Err(_) => {
220204
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
@@ -224,15 +208,6 @@ impl Extension {
224208
}
225209
};
226210

227-
let remote_session_id = scheduler
228-
.create_session(CreateSessionParams {
229-
settings: config.to_key_value_pairs(),
230-
})
231-
.await
232-
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
233-
.into_inner()
234-
.session_id;
235-
236211
let concurrent_tasks = config.ballista_standalone_parallelism();
237212

238213
match session_state {
@@ -256,31 +231,6 @@ impl Extension {
256231
}
257232
}
258233

259-
Ok((remote_session_id, scheduler_url))
260-
}
261-
262-
async fn setup_remote(
263-
config: &SessionConfig,
264-
scheduler_url: String,
265-
) -> datafusion::error::Result<String> {
266-
let connection = create_grpc_client_connection(scheduler_url.clone())
267-
.await
268-
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
269-
270-
let limit = config.ballista_grpc_client_max_message_size();
271-
let mut scheduler = SchedulerGrpcClient::new(connection)
272-
.max_encoding_message_size(limit)
273-
.max_decoding_message_size(limit);
274-
275-
let remote_session_id = scheduler
276-
.create_session(CreateSessionParams {
277-
settings: config.to_key_value_pairs(),
278-
})
279-
.await
280-
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
281-
.into_inner()
282-
.session_id;
283-
284-
Ok(remote_session_id)
234+
Ok(scheduler_url)
285235
}
286236
}

ballista/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ tokio = { workspace = true }
6464
tokio-stream = { workspace = true, features = ["net"] }
6565
tonic = { workspace = true }
6666
url = { workspace = true }
67+
uuid = { workspace = true }
6768

6869
[dev-dependencies]
6970
tempfile = { workspace = true }

ballista/core/proto/ballista.proto

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -524,15 +524,20 @@ message ExecuteQueryParams {
524524
string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL needed use `flight-sql`
525525
}
526526

527-
optional string session_id = 3;
527+
string session_id = 3;
528528
repeated KeyValuePair settings = 4;
529+
// operation_id is unique number for each request
530+
// client makes. it helps mapping requests between
531+
// client and scheduler
532+
string operation_id = 5;
529533
}
530534

531-
message CreateSessionParams {
535+
message CreateUpdateSessionParams {
536+
string session_id = 2;
532537
repeated KeyValuePair settings = 1;
533538
}
534539

535-
message CreateSessionResult {
540+
message CreateUpdateSessionResult {
536541
string session_id = 1;
537542
}
538543

@@ -562,6 +567,7 @@ message ExecuteQueryResult {
562567
ExecuteQuerySuccessResult success = 1;
563568
ExecuteQueryFailureResult failure = 2;
564569
}
570+
string operation_id = 3;
565571
}
566572

567573
message ExecuteQuerySuccessResult {
@@ -697,9 +703,7 @@ service SchedulerGrpc {
697703

698704
rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}
699705

700-
rpc CreateSession (CreateSessionParams) returns (CreateSessionResult) {}
701-
702-
rpc UpdateSession (UpdateSessionParams) returns (UpdateSessionResult) {}
706+
rpc CreateUpdateSession (CreateUpdateSessionParams) returns (CreateUpdateSessionResult) {}
703707

704708
rpc RemoveSession (RemoveSessionParams) returns (RemoveSessionResult) {}
705709

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use datafusion_proto::logical_plan::{
4343
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
4444
};
4545
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
46-
use log::{error, info};
46+
use log::{debug, error, info};
4747
use std::any::Any;
4848
use std::fmt::Debug;
4949
use std::marker::PhantomData;
@@ -214,11 +214,16 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
214214
},
215215
)
216216
.collect();
217-
217+
let operation_id = uuid::Uuid::now_v7().to_string();
218+
debug!(
219+
"Distributed query with session_id: {}, execution operation_id: {}",
220+
self.session_id, operation_id
221+
);
218222
let query = ExecuteQueryParams {
219223
query: Some(Query::LogicalPlan(buf)),
220224
settings,
221-
session_id: Some(self.session_id.clone()),
225+
session_id: self.session_id.clone(),
226+
operation_id,
222227
};
223228

224229
let metric_row_count = MetricBuilder::new(&self.metrics).output_rows(partition);

ballista/core/src/extension.rs

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,13 @@ pub trait SessionStateExt {
3838
/// State will be created with appropriate [SessionConfig] configured
3939
fn new_ballista_state(
4040
scheduler_url: String,
41-
session_id: String,
4241
) -> datafusion::error::Result<SessionState>;
4342
/// Upgrades [SessionState] for ballista usage
4443
///
4544
/// State will be upgraded to appropriate [SessionConfig]
4645
fn upgrade_for_ballista(
4746
self,
4847
scheduler_url: String,
49-
session_id: String,
5048
) -> datafusion::error::Result<SessionState>;
5149
}
5250

@@ -57,6 +55,13 @@ pub trait SessionConfigExt {
5755
/// ballista configuration initialized
5856
fn new_with_ballista() -> SessionConfig;
5957

58+
/// update [SessionConfig] with Ballista specific settings
59+
fn upgrade_for_ballista(self) -> SessionConfig;
60+
61+
/// return ballista specific configuration or
62+
/// creates one if does not exist
63+
fn ballista_config(&self) -> BallistaConfig;
64+
6065
/// Overrides ballista's [LogicalExtensionCodec]
6166
fn with_ballista_logical_extension_codec(
6267
self,
@@ -131,7 +136,6 @@ pub trait SessionConfigHelperExt {
131136
impl SessionStateExt for SessionState {
132137
fn new_ballista_state(
133138
scheduler_url: String,
134-
session_id: String,
135139
) -> datafusion::error::Result<SessionState> {
136140
let session_config = SessionConfig::new_with_ballista();
137141
let planner = BallistaQueryPlanner::<LogicalPlanNode>::new(
@@ -145,7 +149,6 @@ impl SessionStateExt for SessionState {
145149
.with_config(session_config)
146150
.with_runtime_env(Arc::new(runtime_env))
147151
.with_query_planner(Arc::new(planner))
148-
.with_session_id(session_id)
149152
.build();
150153

151154
Ok(session_state)
@@ -154,35 +157,23 @@ impl SessionStateExt for SessionState {
154157
fn upgrade_for_ballista(
155158
self,
156159
scheduler_url: String,
157-
session_id: String,
158160
) -> datafusion::error::Result<SessionState> {
159161
let codec_logical = self.config().ballista_logical_extension_codec();
160162
let planner_override = self.config().ballista_query_planner();
161163

162-
let new_config = self
163-
.config()
164-
.options()
165-
.extensions
166-
.get::<BallistaConfig>()
167-
.cloned()
168-
.unwrap_or_else(BallistaConfig::default);
164+
let session_config = self.config().clone().upgrade_for_ballista();
169165

170-
let session_config = self
171-
.config()
172-
.clone()
173-
.with_option_extension(new_config.clone())
174-
.ballista_restricted_configuration();
166+
let ballista_config = session_config.ballista_config();
175167

176-
let builder = SessionStateBuilder::new_from_existing(self)
177-
.with_config(session_config)
178-
.with_session_id(session_id);
168+
let builder =
169+
SessionStateBuilder::new_from_existing(self).with_config(session_config);
179170

180171
let builder = match planner_override {
181172
Some(planner) => builder.with_query_planner(planner),
182173
None => {
183174
let planner = BallistaQueryPlanner::<LogicalPlanNode>::with_extension(
184175
scheduler_url,
185-
new_config,
176+
ballista_config,
186177
codec_logical,
187178
);
188179
builder.with_query_planner(Arc::new(planner))
@@ -201,6 +192,26 @@ impl SessionConfigExt for SessionConfig {
201192
.with_target_partitions(16)
202193
.ballista_restricted_configuration()
203194
}
195+
196+
fn upgrade_for_ballista(self) -> SessionConfig {
197+
// if ballista config is not provided
198+
// one is created and session state is updated
199+
let ballista_config = self.ballista_config();
200+
201+
// session config has ballista config extension and
202+
// default datafusion configuration is altered
203+
// to fit ballista execution
204+
self.with_option_extension(ballista_config)
205+
.ballista_restricted_configuration()
206+
}
207+
208+
fn ballista_config(&self) -> BallistaConfig {
209+
self.options()
210+
.extensions
211+
.get::<BallistaConfig>()
212+
.cloned()
213+
.unwrap_or_else(BallistaConfig::default)
214+
}
204215
fn with_ballista_logical_extension_codec(
205216
self,
206217
codec: Arc<dyn LogicalExtensionCodec>,
@@ -452,19 +463,16 @@ mod test {
452463
// Ballista disables round robin repatriations
453464
#[tokio::test]
454465
async fn should_disable_round_robin_repartition() {
455-
let state = SessionState::new_ballista_state(
456-
"scheduler_url".to_string(),
457-
"session_id".to_string(),
458-
)
459-
.unwrap();
466+
let state =
467+
SessionState::new_ballista_state("scheduler_url".to_string()).unwrap();
460468

461469
assert!(!state.config().round_robin_repartition());
462470

463471
let state = SessionStateBuilder::new().build();
464472

465473
assert!(state.config().round_robin_repartition());
466474
let state = state
467-
.upgrade_for_ballista("scheduler_url".to_string(), "session_id".to_string())
475+
.upgrade_for_ballista("scheduler_url".to_string())
468476
.unwrap();
469477

470478
assert!(!state.config().round_robin_repartition());

0 commit comments

Comments
 (0)