Skip to content

Commit 7846462

Browse files
authored
Merge pull request #1340 from Lorak-mmk/caching_session_skip_metadata
CachingSession: Allow skipping result metadata
2 parents 27945b9 + 38adab1 commit 7846462

File tree

4 files changed

+153
-6
lines changed

4 files changed

+153
-6
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
.vscode
2+
.zed
23
target/
34
Cargo.lock
45
/book/book

scylla/src/client/caching_session.rs

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ pub struct CachingSession<S = RandomState>
3737
where
3838
S: Clone + BuildHasher,
3939
{
40-
session: Session,
40+
session: Arc<Session>,
4141
/// The prepared statement cache size
4242
/// If a prepared statement is added while the limit is reached, the oldest prepared statement
4343
/// is removed from the cache
4444
max_capacity: usize,
4545
cache: DashMap<String, RawPreparedStatementData, S>,
46+
use_cached_metadata: bool,
4647
}
4748

4849
impl<S> fmt::Debug for CachingSession<S>
@@ -64,9 +65,10 @@ where
6465
{
6566
pub fn from(session: Session, cache_size: usize) -> Self {
6667
Self {
67-
session,
68+
session: Arc::new(session),
6869
max_capacity: cache_size,
6970
cache: Default::default(),
71+
use_cached_metadata: false,
7072
}
7173
}
7274
}
@@ -79,9 +81,10 @@ where
7981
/// and a [`BuildHasher`], using a customer hasher.
8082
pub fn with_hasher(session: Session, cache_size: usize, hasher: S) -> Self {
8183
Self {
82-
session,
84+
session: Arc::new(session),
8385
max_capacity: cache_size,
8486
cache: DashMap::with_hasher(hasher),
87+
use_cached_metadata: false,
8588
}
8689
}
8790
}
@@ -207,10 +210,15 @@ where
207210
query.config,
208211
);
209212
stmt.set_partitioner_name(raw.partitioner_name.clone());
213+
stmt.set_use_cached_result_metadata(self.use_cached_metadata);
210214
Ok(stmt)
211215
} else {
212216
let query_contents = query.contents.clone();
213-
let prepared = self.session.prepare(query).await?;
217+
let prepared = {
218+
let mut stmt = self.session.prepare(query).await?;
219+
stmt.set_use_cached_result_metadata(self.use_cached_metadata);
220+
stmt
221+
};
214222

215223
if self.max_capacity == self.cache.len() {
216224
// Cache is full, remove the first entry
@@ -280,20 +288,26 @@ pub struct CachingSessionBuilder<S = RandomState>
280288
where
281289
S: Clone + BuildHasher,
282290
{
283-
session: Session,
291+
session: Arc<Session>,
284292
max_capacity: usize,
285293
hasher: S,
294+
use_cached_metadata: bool,
286295
}
287296

288297
impl CachingSessionBuilder<RandomState> {
289298
/// Wraps a [Session] and creates a new [CachingSessionBuilder] instance,
290299
/// which can be used to create a new [CachingSession].
291300
///
292301
pub fn new(session: Session) -> Self {
302+
Self::new_shared(Arc::new(session))
303+
}
304+
305+
pub fn new_shared(session: Arc<Session>) -> Self {
293306
Self {
294307
session,
295308
max_capacity: DEFAULT_MAX_CAPACITY,
296309
hasher: RandomState::default(),
310+
use_cached_metadata: false,
297311
}
298312
}
299313
}
@@ -308,9 +322,33 @@ where
308322
self
309323
}
310324

325+
/// Make use of cached metadata to decode results
326+
/// of the statement's execution.
327+
///
328+
/// If true, the driver will request the server not to
329+
/// attach the result metadata in response to the statement execution.
330+
///
331+
/// The driver will cache the result metadata received from the server
332+
/// after statement preparation and will use it
333+
/// to deserialize the results of statement execution.
334+
///
335+
/// See documentation of [`PreparedStatement`] for more details on limitations
336+
/// of this functionality.
337+
///
338+
/// This option is false by default.
339+
pub fn use_cached_result_metadata(mut self, use_cached_metadata: bool) -> Self {
340+
self.use_cached_metadata = use_cached_metadata;
341+
self
342+
}
343+
311344
/// Finishes configuration of [CachingSession].
312345
pub fn build(self) -> CachingSession<S> {
313-
CachingSession::with_hasher(self.session, self.max_capacity, self.hasher)
346+
CachingSession {
347+
session: self.session,
348+
max_capacity: self.max_capacity,
349+
cache: DashMap::with_hasher(self.hasher),
350+
use_cached_metadata: self.use_cached_metadata,
351+
}
314352
}
315353
}
316354

@@ -364,11 +402,13 @@ where
364402
session,
365403
max_capacity,
366404
hasher: _,
405+
use_cached_metadata,
367406
} = self;
368407
CachingSessionBuilder {
369408
session,
370409
max_capacity,
371410
hasher,
411+
use_cached_metadata,
372412
}
373413
}
374414
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use std::sync::Arc;
2+
3+
use scylla::client::caching_session::{CachingSession, CachingSessionBuilder};
4+
use scylla::client::session_builder::SessionBuilder;
5+
use scylla_cql::frame::request::Execute;
6+
use scylla_cql::frame::request::Request;
7+
use scylla_proxy::Condition;
8+
use scylla_proxy::ProxyError;
9+
use scylla_proxy::Reaction;
10+
use scylla_proxy::RequestFrame;
11+
use scylla_proxy::RequestOpcode;
12+
use scylla_proxy::RequestReaction;
13+
use scylla_proxy::RequestRule;
14+
use scylla_proxy::WorkerError;
15+
use tokio::sync::mpsc;
16+
17+
use crate::utils::test_with_3_node_cluster;
18+
19+
#[tokio::test]
20+
#[cfg_attr(scylla_cloud_tests, ignore)]
21+
async fn test_caching_session_metadata_cache() {
22+
let res = test_with_3_node_cluster(
23+
scylla_proxy::ShardAwareness::QueryNode,
24+
|proxy_uris, translation_map, mut running_proxy| async move {
25+
let (feedback_tx, mut feedback_rx) = mpsc::unbounded_channel();
26+
let prepared_request_feedback_rule = RequestRule(
27+
Condition::and(
28+
Condition::not(Condition::ConnectionRegisteredAnyEvent),
29+
Condition::RequestOpcode(RequestOpcode::Execute),
30+
),
31+
RequestReaction::noop().with_feedback_when_performed(feedback_tx),
32+
);
33+
for node in running_proxy.running_nodes.iter_mut() {
34+
node.change_request_rules(Some(vec![prepared_request_feedback_rule.clone()]));
35+
}
36+
37+
async fn verify_statement_metadata(
38+
session: &CachingSession,
39+
statement: &str,
40+
should_have_metadata: bool,
41+
feedback: &mut mpsc::UnboundedReceiver<(RequestFrame, Option<u16>)>,
42+
) {
43+
let _result = session.execute_unpaged(statement, ()).await.unwrap();
44+
let (req_frame, _) = feedback.recv().await.unwrap();
45+
let _ = feedback.try_recv().unwrap_err(); // There should be only one frame.
46+
let request = req_frame.deserialize().unwrap();
47+
let Request::Execute(Execute { parameters, .. }) = request else {
48+
panic!("Unexpected request type");
49+
};
50+
let has_metadata = !parameters.skip_metadata;
51+
assert_eq!(has_metadata, should_have_metadata);
52+
}
53+
54+
const REQUEST: &str = "SELECT * FROM system.local WHERE key = 'local'";
55+
56+
let session = Arc::new(
57+
SessionBuilder::new()
58+
.known_node(proxy_uris[0].as_str())
59+
.address_translator(Arc::new(translation_map.clone()))
60+
.build()
61+
.await
62+
.unwrap(),
63+
);
64+
let caching_session: CachingSession =
65+
CachingSessionBuilder::new_shared(Arc::clone(&session))
66+
.use_cached_result_metadata(false) // Default, set just to be more explicit
67+
.build();
68+
69+
// Skipping metadata was not set, so metadata should be present
70+
verify_statement_metadata(&caching_session, REQUEST, true, &mut feedback_rx).await;
71+
72+
// It should also be present when executing statement already in cache
73+
verify_statement_metadata(&caching_session, REQUEST, true, &mut feedback_rx).await;
74+
75+
let caching_session: CachingSession =
76+
CachingSessionBuilder::new_shared(Arc::clone(&session))
77+
.use_cached_result_metadata(true)
78+
.build();
79+
80+
// Now we set skip_metadata to true, so metadata should not be present for a new query
81+
verify_statement_metadata(&caching_session, REQUEST, false, &mut feedback_rx).await;
82+
83+
// It should also not be present when executing statement already in cache
84+
verify_statement_metadata(&caching_session, REQUEST, false, &mut feedback_rx).await;
85+
86+
// Test also without setting it explicitly, to verify that it is false by default.
87+
let caching_session: CachingSession =
88+
CachingSessionBuilder::new_shared(Arc::clone(&session)).build();
89+
90+
// Skipping metadata was not set, so metadata should be present
91+
verify_statement_metadata(&caching_session, REQUEST, true, &mut feedback_rx).await;
92+
93+
// It should also be present when executing statement already in cache
94+
verify_statement_metadata(&caching_session, REQUEST, true, &mut feedback_rx).await;
95+
96+
running_proxy
97+
},
98+
)
99+
.await;
100+
match res {
101+
Ok(()) => (),
102+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
103+
Err(err) => panic!("{}", err),
104+
}
105+
}

scylla/tests/integration/session/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod authenticate;
2+
mod caching_session;
23
mod history;
34
mod new_session;
45
mod retries;

0 commit comments

Comments
 (0)