Skip to content

Commit cee1510

Browse files
committed
core extension cleanup
1 parent 37dbf2a commit cee1510

File tree

2 files changed

+31
-20
lines changed

2 files changed

+31
-20
lines changed

ballista/core/src/extension.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ pub trait SessionConfigExt {
5555
/// ballista configuration initialized
5656
fn new_with_ballista() -> SessionConfig;
5757

58+
/// update [SessionConfig] with Ballista specific settings
59+
fn upgrade_for_ballista(self) -> SessionConfig;
60+
61+
/// return ballista specific configuration or
62+
/// creates one if does not exist
63+
fn ballista_config(&self) -> BallistaConfig;
64+
5865
/// Overrides ballista's [LogicalExtensionCodec]
5966
fn with_ballista_logical_extension_codec(
6067
self,
@@ -154,19 +161,9 @@ impl SessionStateExt for SessionState {
154161
let codec_logical = self.config().ballista_logical_extension_codec();
155162
let planner_override = self.config().ballista_query_planner();
156163

157-
let new_config = self
158-
.config()
159-
.options()
160-
.extensions
161-
.get::<BallistaConfig>()
162-
.cloned()
163-
.unwrap_or_else(BallistaConfig::default);
164+
let session_config = self.config().clone().upgrade_for_ballista();
164165

165-
let session_config = self
166-
.config()
167-
.clone()
168-
.with_option_extension(new_config.clone())
169-
.ballista_restricted_configuration();
166+
let ballista_config = session_config.ballista_config();
170167

171168
let builder =
172169
SessionStateBuilder::new_from_existing(self).with_config(session_config);
@@ -176,7 +173,7 @@ impl SessionStateExt for SessionState {
176173
None => {
177174
let planner = BallistaQueryPlanner::<LogicalPlanNode>::with_extension(
178175
scheduler_url,
179-
new_config,
176+
ballista_config,
180177
codec_logical,
181178
);
182179
builder.with_query_planner(Arc::new(planner))
@@ -195,6 +192,26 @@ impl SessionConfigExt for SessionConfig {
195192
.with_target_partitions(16)
196193
.ballista_restricted_configuration()
197194
}
195+
196+
fn upgrade_for_ballista(self) -> SessionConfig {
197+
// if ballista config is not provided
198+
// one is created and session state is updated
199+
let ballista_config = self.ballista_config();
200+
201+
// session config has ballista config extension and
202+
// default datafusion configuration is altered
203+
// to fit ballista execution
204+
self.with_option_extension(ballista_config)
205+
.ballista_restricted_configuration()
206+
}
207+
208+
fn ballista_config(&self) -> BallistaConfig {
209+
self.options()
210+
.extensions
211+
.get::<BallistaConfig>()
212+
.cloned()
213+
.unwrap_or_else(BallistaConfig::default)
214+
}
198215
fn with_ballista_logical_extension_codec(
199216
self,
200217
codec: Arc<dyn LogicalExtensionCodec>,

ballista/executor/src/standalone.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use crate::metrics::LoggingMetricsCollector;
1919
use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService};
2020
use arrow_flight::flight_service_server::FlightServiceServer;
21-
use ballista_core::config::BallistaConfig;
2221
use ballista_core::extension::SessionConfigExt;
2322
use ballista_core::registry::BallistaFunctionRegistry;
2423
use ballista_core::utils::default_config_producer;
@@ -57,12 +56,7 @@ pub async fn new_standalone_executor_from_state(
5756
datafusion_proto::protobuf::PhysicalPlanNode,
5857
> = BallistaCodec::new(logical, physical);
5958

60-
let config = session_state
61-
.config()
62-
.clone()
63-
.with_option_extension(BallistaConfig::default()) // TODO: do we need this statement
64-
;
65-
59+
let config = session_state.config().clone().upgrade_for_ballista();
6660
let runtime = session_state.runtime_env().clone();
6761

6862
let config_producer: ConfigProducer = Arc::new(move || config.clone());

0 commit comments

Comments
 (0)