From babbda99236506a90577abd9913b4193ce7c7557 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 8 Apr 2025 16:12:06 +0200 Subject: [PATCH 1/6] Add total size bytes gauge for search after cache --- quickwit/quickwit-search/src/metrics.rs | 9 +++++++- .../quickwit-search/src/scroll_context.rs | 22 ++++++++++++++++--- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 06158bf9c29..a36a63c4898 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -17,7 +17,7 @@ use bytesize::ByteSize; use once_cell::sync::Lazy; use quickwit_common::metrics::{ - exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge_vec, + exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, }; @@ -34,6 +34,7 @@ pub struct SearchMetrics { pub leaf_search_single_split_tasks_pending: IntGauge, pub leaf_search_single_split_tasks_ongoing: IntGauge, pub leaf_search_single_split_warmup_num_bytes: Histogram, + pub search_after_cache_size_bytes: IntGauge, } impl Default for SearchMetrics { @@ -146,6 +147,12 @@ impl Default for SearchMetrics { &[], ["affinity"], ), + search_after_cache_size_bytes: new_gauge( + "search_after_cache_size_bytes", + "Total size of the search after (and scroll) cache in bytes.", + "search", + &[], + ), } } } diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index 05f6265e7b9..2ad0e0af94d 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -22,6 +22,7 @@ use std::time::Duration; use anyhow::Context; use base64::prelude::BASE64_STANDARD; use base64::Engine; +use quickwit_common::metrics::GaugeGuard; use quickwit_metastore::SplitMetadata; use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError}; use quickwit_proto::types::IndexUid; @@ -120,9 +121,14 @@ impl ScrollContext { } } +struct TrackedValue { + value: Vec, + _total_size_metric_guard: GaugeGuard<'static>, +} + #[derive(Clone)] pub(crate) struct MiniKV { - ttl_with_cache: Arc, Vec>>>, + ttl_with_cache: Arc, TrackedValue>>>, } impl Default for MiniKV { @@ -135,14 +141,24 @@ impl Default for MiniKV { impl MiniKV { pub async fn put(&self, key: Vec, payload: Vec, ttl: Duration) { + let mut metric_guard = + GaugeGuard::from_gauge(&crate::SEARCH_METRICS.search_after_cache_size_bytes); + metric_guard.add(payload.len() as i64); let mut cache_lock = self.ttl_with_cache.write().await; - cache_lock.insert(key, payload, ttl); + cache_lock.insert( + key, + TrackedValue { + value: payload, + _total_size_metric_guard: metric_guard, + }, + ttl, + ); } pub async fn get(&self, key: &[u8]) -> Option> { let cache_lock = self.ttl_with_cache.read().await; let search_after_context_bytes = cache_lock.get(key)?; - Some(search_after_context_bytes.clone()) + Some(search_after_context_bytes.value.clone()) } } From dffb5bf3bc3480608cfaa9722dce2692d2efd37c Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 9 Apr 2025 11:42:02 +0200 Subject: [PATCH 2/6] Rename cache size const --- quickwit/quickwit-search/src/scroll_context.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index 2ad0e0af94d..57059105e87 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -23,6 +23,7 @@ use anyhow::Context; use base64::prelude::BASE64_STANDARD; use base64::Engine; use quickwit_common::metrics::GaugeGuard; +use quickwit_common::shared_consts::SCROLL_BATCH_LEN; use quickwit_metastore::SplitMetadata; use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError}; use quickwit_proto::types::IndexUid; @@ -35,14 +36,13 @@ use crate::root::IndexMetasForLeafSearch; use crate::service::SearcherContext; use crate::ClusterClient; -/// Maximum capacity of the search after cache. +/// Maximum number of contexts in the search after cache. /// -/// For the moment this value is hardcoded. /// TODO make configurable. /// /// Assuming a search context of 1MB, this can /// amount to up to 1GB. -const SCROLL_BATCH_LEN: usize = 1_000; +const SEARCH_AFTER_CACHE_SIZE: usize = 1_000; #[derive(Serialize, Deserialize)] pub(crate) struct ScrollContext { @@ -134,7 +134,7 @@ pub(crate) struct MiniKV { impl Default for MiniKV { fn default() -> MiniKV { MiniKV { - ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(SCROLL_BATCH_LEN))), + ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(SEARCH_AFTER_CACHE_SIZE))), } } } From 198042c033ed092360107ca369ab9c8087779ac7 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 9 Apr 2025 13:12:48 +0200 Subject: [PATCH 3/6] Clarify terminology --- quickwit/quickwit-search/src/cluster_client.rs | 2 +- quickwit/quickwit-search/src/metrics.rs | 9 +++++---- quickwit/quickwit-search/src/scroll_context.rs | 18 ++++++++++-------- quickwit/quickwit-search/src/service.rs | 8 ++++---- 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 08860e79009..785f7e5629c 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -191,7 +191,7 @@ impl ClusterClient { client.leaf_list_terms(request.clone()).await } - /// Attempts to store a given search context within the cluster. + /// Attempts to store a given key value pair within the cluster. /// /// This function may fail silently, if no clients was available. pub async fn put_kv(&self, key: &[u8], payload: &[u8], ttl: Duration) { diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index a36a63c4898..9fceaa2727c 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -34,7 +34,7 @@ pub struct SearchMetrics { pub leaf_search_single_split_tasks_pending: IntGauge, pub leaf_search_single_split_tasks_ongoing: IntGauge, pub leaf_search_single_split_warmup_num_bytes: Histogram, - pub search_after_cache_size_bytes: IntGauge, + pub searcher_local_kv_store_size_bytes: IntGauge, } impl Default for SearchMetrics { @@ -147,9 +147,10 @@ impl Default for SearchMetrics { &[], ["affinity"], ), - search_after_cache_size_bytes: new_gauge( - "search_after_cache_size_bytes", - "Total size of the search after (and scroll) cache in bytes.", + searcher_local_kv_store_size_bytes: new_gauge( + "searcher_local_kv_store_size_bytes", + "Size of the searcher kv store in bytes. This store is used to cache scroll \ + contexts.", "search", &[], ), diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index 57059105e87..480dbf08350 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -36,13 +36,15 @@ use crate::root::IndexMetasForLeafSearch; use crate::service::SearcherContext; use crate::ClusterClient; -/// Maximum number of contexts in the search after cache. +/// Maximum number of values in the local search KV store. +/// +/// Currently this store is only used for caching scroll contexts. /// /// TODO make configurable. /// /// Assuming a search context of 1MB, this can /// amount to up to 1GB. -const SEARCH_AFTER_CACHE_SIZE: usize = 1_000; +const LOCAL_KV_CACHE_SIZE: usize = 1_000; #[derive(Serialize, Deserialize)] pub(crate) struct ScrollContext { @@ -122,7 +124,7 @@ impl ScrollContext { } struct TrackedValue { - value: Vec, + content: Vec, _total_size_metric_guard: GaugeGuard<'static>, } @@ -134,7 +136,7 @@ pub(crate) struct MiniKV { impl Default for MiniKV { fn default() -> MiniKV { MiniKV { - ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(SEARCH_AFTER_CACHE_SIZE))), + ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(LOCAL_KV_CACHE_SIZE))), } } } @@ -142,13 +144,13 @@ impl Default for MiniKV { impl MiniKV { pub async fn put(&self, key: Vec, payload: Vec, ttl: Duration) { let mut metric_guard = - GaugeGuard::from_gauge(&crate::SEARCH_METRICS.search_after_cache_size_bytes); + GaugeGuard::from_gauge(&crate::SEARCH_METRICS.searcher_local_kv_store_size_bytes); metric_guard.add(payload.len() as i64); let mut cache_lock = self.ttl_with_cache.write().await; cache_lock.insert( key, TrackedValue { - value: payload, + content: payload, _total_size_metric_guard: metric_guard, }, ttl, @@ -157,8 +159,8 @@ impl MiniKV { pub async fn get(&self, key: &[u8]) -> Option> { let cache_lock = self.ttl_with_cache.read().await; - let search_after_context_bytes = cache_lock.get(key)?; - Some(search_after_context_bytes.value.clone()) + let tracked_value = cache_lock.get(key)?; + Some(tracked_value.content.clone()) } } diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 717554b975d..a4f286c4c32 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -57,7 +57,7 @@ pub struct SearchServiceImpl { storage_resolver: StorageResolver, cluster_client: ClusterClient, searcher_context: Arc, - search_after_cache: MiniKV, + local_kv_store: MiniKV, } /// Trait representing a search service. @@ -165,7 +165,7 @@ impl SearchServiceImpl { storage_resolver, cluster_client, searcher_context, - search_after_cache: MiniKV::default(), + local_kv_store: MiniKV::default(), } } } @@ -322,13 +322,13 @@ impl SearchService for SearchServiceImpl { async fn put_kv(&self, put_request: PutKvRequest) { let ttl = Duration::from_secs(put_request.ttl_secs as u64); - self.search_after_cache + self.local_kv_store .put(put_request.key, put_request.payload, ttl) .await; } async fn get_kv(&self, get_request: GetKvRequest) -> Option> { - let payload: Vec = self.search_after_cache.get(&get_request.key).await?; + let payload: Vec = self.local_kv_store.get(&get_request.key).await?; Some(payload) } From 158f1648990ac0de66dcf1465a3406e9ec653583 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 9 Apr 2025 13:26:19 +0200 Subject: [PATCH 4/6] Also clarify docs --- docs/reference/es_compatible_api.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/reference/es_compatible_api.md b/docs/reference/es_compatible_api.md index e771e57155a..ebd2237822e 100644 --- a/docs/reference/es_compatible_api.md +++ b/docs/reference/es_compatible_api.md @@ -134,7 +134,7 @@ If a parameter appears both as a query string parameter and in the JSON payload, | `q` | `String` | The search query. | (Optional) | | `size` | `Integer` | Number of hits to return. | 10 | | `sort` | `String` | Describes how documents should be ranked. See [Sort order](#sort-order) | (Optional) | -| `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_scroll--scroll-api). | (Optional) | +| `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_searchscroll--scroll-api). | (Optional) | | `allow_partial_search_results` | `Boolean` | Returns a partial response if some (but not all) of the split searches were unsuccessful. | `true` | #### Supported Request Body parameters @@ -279,6 +279,11 @@ First, the client needs to call the `search api` with a `scroll` query parameter Each subsequent call to the `_search/scroll` endpoint will return a new `scroll_id` pointing to the next page. +:::tip + +Using `_search` and then `_search/scroll` is somewhat similar to using `_search` with the `search_after` parameter, except that it creates a lightweight snapshot view of the dataset during the initial call to `_search`. Further calls to `_search/scroll` only return results from that view, thus ensuring more consistent results. + +::: ### `_cat`   Cat API From 6755c073575d53b50b2e4cfb2976f84107528540 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 10 Apr 2025 12:06:48 +0200 Subject: [PATCH 5/6] Clarify ClusterClient.put_kv contract --- quickwit/quickwit-search/src/cluster_client.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 785f7e5629c..e0ed2b1faca 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -193,7 +193,10 @@ impl ClusterClient { /// Attempts to store a given key value pair within the cluster. /// - /// This function may fail silently, if no clients was available. + /// Tries to replicate the pair to [`TARGET_NUM_REPLICATION`] nodes, but this function may fail + /// silently (e.g if no client was available). Even in case of success, this storage is not + /// persistent. For instance during a rolling upgrade, all replicas will be lost as there is no + /// mechanism to maintain the replication count. pub async fn put_kv(&self, key: &[u8], payload: &[u8], ttl: Duration) { let clients: Vec = self .search_job_placer @@ -216,8 +219,8 @@ impl ClusterClient { // course, this may still result in the replication over more nodes, but this is not // a problem. // - // The requests are made in a concurrent manner, up to two at a time. As soon as 2 requests - // are successful, we stop. + // The requests are made in a concurrent manner, up to TARGET_NUM_REPLICATION at a time. As + // soon as TARGET_NUM_REPLICATION requests are successful, we stop. let put_kv_futs = clients .into_iter() .map(|client| replicate_kv_to_one_server(client, key, payload, ttl)); From aa08edd5abe2fb58a864a245ead4abece8401f62 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 10 Apr 2025 12:22:30 +0200 Subject: [PATCH 6/6] Document MiniKV limitations --- quickwit/quickwit-search/src/scroll_context.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index 480dbf08350..6482fc647d3 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -38,8 +38,6 @@ use crate::ClusterClient; /// Maximum number of values in the local search KV store. /// -/// Currently this store is only used for caching scroll contexts. -/// /// TODO make configurable. /// /// Assuming a search context of 1MB, this can @@ -128,6 +126,13 @@ struct TrackedValue { _total_size_metric_guard: GaugeGuard<'static>, } +/// In memory key value store with TTL and limited size. +/// +/// Once the capacity [LOCAL_KV_CACHE_SIZE] is reached, the oldest entries are +/// removed. +/// +/// Currently this store is only used for caching scroll contexts. Using it for +/// other purposes is risky as use cases would compete for its capacity. #[derive(Clone)] pub(crate) struct MiniKV { ttl_with_cache: Arc, TrackedValue>>>,