Skip to content

Add basic integration test for entity cache invalidation #7150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 7, 2025
2 changes: 1 addition & 1 deletion apollo-router/src/axum_factory/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! axum factory is useful to create an [`AxumHttpServerFactory`] which implements [`crate::http_server_factory::HttpServerFactory`]
mod axum_http_server_factory;
pub(crate) mod axum_http_server_factory;
pub(crate) mod compression;
pub(crate) mod connection_handle;
mod listeners;
Expand Down
1 change: 0 additions & 1 deletion apollo-router/src/axum_factory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use tower::Service;
use tower::ServiceExt;
use tower::service_fn;

pub(crate) use super::axum_http_server_factory::make_axum_router;
use super::*;
use crate::ApolloRouterError;
use crate::Configuration;
Expand Down
14 changes: 10 additions & 4 deletions apollo-router/src/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,14 @@ impl RedisCacheStorage {
tracing::trace!("insert result {:?}", r);
}

pub(crate) async fn delete<K: KeyType>(&self, keys: Vec<RedisKey<K>>) -> Option<u32> {
let mut h: HashMap<u16, Vec<String>> = HashMap::new();
/// Delete keys *without* adding the `namepsace` prefix because `keys` is from
/// `scan_with_namespaced_results` and already includes it.
pub(crate) async fn delete_from_scan_result(
&self,
keys: Vec<fred::types::RedisKey>,
) -> Option<u32> {
let mut h: HashMap<u16, Vec<fred::types::RedisKey>> = HashMap::new();
for key in keys.into_iter() {
let key = self.make_key(key);
let hash = ClusterRouting::hash_key(key.as_bytes());
let entry = h.entry(hash).or_default();
entry.push(key);
Expand All @@ -581,11 +585,13 @@ impl RedisCacheStorage {
Some(total)
}

pub(crate) fn scan(
/// The keys returned in `ScanResult` do include the prefix from `namespace` configuration.
pub(crate) fn scan_with_namespaced_results(
&self,
pattern: String,
count: Option<u32>,
) -> Pin<Box<dyn Stream<Item = Result<ScanResult, RedisError>> + Send>> {
let pattern = self.make_key(RedisKey(pattern));
if self.is_cluster {
Box::pin(self.inner.next().scan_cluster(pattern, count, None))
} else {
Expand Down
18 changes: 8 additions & 10 deletions apollo-router/src/plugins/cache/invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use tracing::Instrument;

use super::entity::Storage as EntityStorage;
use crate::cache::redis::RedisCacheStorage;
use crate::cache::redis::RedisKey;
use crate::plugins::cache::entity::ENTITY_CACHE_VERSION;
use crate::plugins::cache::entity::hash_entity_key;

Expand Down Expand Up @@ -109,7 +108,8 @@ impl Invalidation {
key_prefix
);

let mut stream = redis_storage.scan(key_prefix.clone(), Some(self.scan_count));
let mut stream =
redis_storage.scan_with_namespaced_results(key_prefix.clone(), Some(self.scan_count));
let mut count = 0u64;
let mut error = None;

Expand All @@ -124,15 +124,13 @@ impl Invalidation {
error = Some(e);
break;
}
Ok(scan_res) => {
if let Some(keys) = scan_res.results() {
let keys = keys
.iter()
.filter_map(|k| k.as_str())
.map(|k| RedisKey(k.to_string()))
.collect::<Vec<_>>();
Ok(mut scan_res) => {
if let Some(keys) = scan_res.take_results() {
if !keys.is_empty() {
let deleted = redis_storage.delete(keys).await.unwrap_or(0) as u64;
let deleted = redis_storage
.delete_from_scan_result(keys)
.await
.unwrap_or(0) as u64;
count += deleted;
}
}
Expand Down
4 changes: 1 addition & 3 deletions apollo-router/src/plugins/mock_subgraphs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,7 @@ fn resolve_normal_field<'a>(
field_name: &'a str,
arguments: &'a JsonMap,
) -> Result<ResolvedValue<'a>, execution::resolver::ResolverError> {
if !arguments.is_empty() {
return Err("arguments not supported".into()); // TODO?
}
let _ignored = arguments; // TODO: find some way to vary response based on arguments?
let mock = mocks
.get(field_name)
.ok_or("field not found in mocked data")?;
Expand Down
4 changes: 2 additions & 2 deletions apollo-router/src/services/router/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ pub(crate) async fn into_bytes<B: HttpBody>(body: B) -> Result<Bytes, B::Error>
// and convert types

/// Create an empty RouterBody
pub(crate) fn empty() -> UnsyncBoxBody<Bytes, AxumError> {
pub(crate) fn empty() -> RouterBody {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed_unsync()
}

/// Create a Full RouterBody using the supplied chunk
pub(crate) fn from_bytes<T: Into<Bytes>>(chunk: T) -> UnsyncBoxBody<Bytes, AxumError> {
pub fn from_bytes<T: Into<Bytes>>(chunk: T) -> RouterBody {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed_unsync()
Expand Down
10 changes: 4 additions & 6 deletions apollo-router/src/test_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::services::router::service::RouterCreator;
use crate::services::subgraph;
use crate::services::supergraph;
use crate::spec::Schema;
#[cfg(test)]
use crate::uplink::license_enforcement::LicenseState;

/// Mocks for services the Apollo Router must integrate with.
Expand Down Expand Up @@ -340,10 +339,10 @@ impl<'a> TestHarness<'a> {
.boxed_clone())
}

#[cfg(test)]
pub(crate) async fn build_http_service(self) -> Result<HttpService, BoxError> {
/// Build the HTTP service
pub async fn build_http_service(self) -> Result<HttpService, BoxError> {
use crate::axum_factory::ListenAddrAndRouter;
use crate::axum_factory::tests::make_axum_router;
use crate::axum_factory::axum_http_server_factory::make_axum_router;
use crate::router_factory::RouterFactory;

let (config, supergraph_creator) = self.build_common().await?;
Expand All @@ -369,8 +368,7 @@ impl<'a> TestHarness<'a> {
}

/// An HTTP-level service, as would be given to Hyper’s server
#[cfg(test)]
pub(crate) type HttpService = tower::util::BoxService<
pub type HttpService = tower::util::BoxService<
http::Request<crate::services::router::Body>,
http::Response<axum::body::Body>,
std::convert::Infallible,
Expand Down
Loading