Skip to content

Commit 295d66e

Browse files
authored
Evaluator starts to read/write cache if enabled (#12)
* Add a timestamp to cache entry. * Move `Fingerprinter` to `utils`. * Add `with()` and `write()` APIs to the `Fingerprinter` for convenience. * Create a `Fingerprint` type and directly use it as cache key. * Read/write cache in evaluator.
1 parent 4469db5 commit 295d66e

File tree

9 files changed

+230
-104
lines changed

9 files changed

+230
-104
lines changed

src/builder/analyzer.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::setup::{
88
self, DesiredMode, FlowSetupMetadata, FlowSetupState, ResourceIdentifier, SourceSetupState,
99
TargetSetupState, TargetSetupStateCommon,
1010
};
11+
use crate::utils::fingerprint::Fingerprinter;
1112
use crate::{
1213
api_bail, api_error,
1314
base::{
@@ -698,24 +699,32 @@ impl<'a> AnalyzerContext<'a> {
698699
let output = scope
699700
.data
700701
.add_field(reactive_op.name.clone(), &output_type)?;
701-
let op_name = reactive_op.name.clone();
702+
let reactive_op = reactive_op.clone();
702703
async move {
703704
let executor = executor.await.with_context(|| {
704-
format!("Failed to build executor for transform op: {op_name}")
705+
format!("Failed to build executor for transform op: {}", reactive_op.name)
705706
})?;
707+
let behavior_version = executor.behavior_version();
706708
let function_exec_info = AnalyzedFunctionExecInfo {
707709
enable_caching: executor.enable_caching(),
708-
behavior_version: executor.behavior_version(),
710+
behavior_version,
711+
fingerprinter: Fingerprinter::default()
712+
.with(&reactive_op.name)?
713+
.with(&reactive_op.spec)?
714+
.with(&behavior_version)?
715+
.with(&output_type.without_attrs())?,
716+
output_type: output_type.typ.clone(),
709717
};
710718
if function_exec_info.enable_caching
711719
&& function_exec_info.behavior_version.is_some()
712720
{
713721
api_bail!(
714-
"When caching is enabled, behavior version must be specified for transform op: {op_name}",
722+
"When caching is enabled, behavior version must be specified for transform op: {}",
723+
reactive_op.name
715724
);
716725
}
717726
Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
718-
name: op_name,
727+
name: reactive_op.name,
719728
inputs: input_value_mappings,
720729
function_exec_info,
721730
executor,

src/builder/plan.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::base::schema::ValueType;
66
use crate::base::value;
77
use crate::execution::db_tracking_setup;
88
use crate::ops::interface::*;
9+
use crate::utils::fingerprint::Fingerprinter;
910

1011
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1112
pub struct AnalyzedLocalFieldReference {
@@ -65,6 +66,10 @@ pub struct AnalyzedSourceOp {
6566
pub struct AnalyzedFunctionExecInfo {
6667
pub enable_caching: bool,
6768
pub behavior_version: Option<u32>,
69+
70+
/// Fingerprinter of the function's behavior.
71+
pub fingerprinter: Fingerprinter,
72+
pub output_type: ValueType,
6873
}
6974

7075
pub struct AnalyzedTransformOp {

src/execution/db_tracking.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use super::{db_tracking_setup::TrackingTableSetupState, memoization::MemoizationInfo};
2-
use crate::utils::db::WriteAction;
2+
use crate::utils::{db::WriteAction, fingerprint::Fingerprint};
33
use anyhow::Result;
44
use sqlx::PgPool;
55

6-
pub type ValueFingerprint = String;
7-
pub type TrackedTargetKey = (serde_json::Value, i64, Option<ValueFingerprint>);
6+
pub type TrackedTargetKey = (serde_json::Value, i64, Option<Fingerprint>);
87
pub type TrackedTargetKeyForSource = Vec<(i32, Vec<TrackedTargetKey>)>;
98

109
#[derive(sqlx::FromRow, Debug)]

src/execution/evaluator.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,29 @@ async fn evaluate_op_scope(
304304
match reactive_op {
305305
AnalyzedReactiveOp::Transform(op) => {
306306
let input_values = assemble_input_values(&op.inputs, scoped_entries);
307-
let output_value = op.executor.evaluate(input_values).await?;
307+
let output_value = if let Some(cache) = op
308+
.function_exec_info
309+
.enable_caching
310+
.then_some(cache)
311+
.flatten()
312+
{
313+
let key = op
314+
.function_exec_info
315+
.fingerprinter
316+
.clone()
317+
.with(&input_values)?
318+
.to_fingerprint();
319+
cache
320+
.evaluate(
321+
key,
322+
&op.function_exec_info.output_type,
323+
/*ttl=*/ None,
324+
move || op.executor.evaluate(input_values),
325+
)
326+
.await?
327+
} else {
328+
op.executor.evaluate(input_values).await?
329+
};
308330
head_scope.define_field(&op.output, output_value)?;
309331
}
310332

src/execution/indexer.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,16 @@ use log::error;
66
use serde::Serialize;
77
use sqlx::PgPool;
88

9-
use super::db_tracking::{self, read_source_tracking_info};
9+
use super::db_tracking::{self, read_source_tracking_info, TrackedTargetKey};
1010
use super::db_tracking_setup;
11-
use super::fingerprint::Fingerprinter;
1211
use super::memoization::{EvaluationCache, MemoizationInfo};
1312
use crate::base::schema;
1413
use crate::base::spec::FlowInstanceSpec;
1514
use crate::base::value::{self, FieldValues, KeyValue};
1615
use crate::builder::plan::*;
1716
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry};
1817
use crate::utils::db::WriteAction;
19-
20-
use self::db_tracking::{TrackedTargetKey, ValueFingerprint};
18+
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
2119

2220
use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};
2321

@@ -96,8 +94,8 @@ struct TrackingInfoForTarget<'a> {
9694
// Existing keys info. Keyed by target key.
9795
// Will be removed after new rows for the same key are added into `new_staging_keys_info` and `mutation.upserts`,
9896
// hence all remaining ones are to be deleted.
99-
existing_staging_keys_info: HashMap<serde_json::Value, Vec<(i64, Option<ValueFingerprint>)>>,
100-
existing_keys_info: HashMap<serde_json::Value, Vec<(i64, Option<ValueFingerprint>)>>,
97+
existing_staging_keys_info: HashMap<serde_json::Value, Vec<(i64, Option<Fingerprint>)>>,
98+
existing_keys_info: HashMap<serde_json::Value, Vec<(i64, Option<Fingerprint>)>>,
10199

102100
// New keys info for staging.
103101
new_staging_keys_info: Vec<TrackedTargetKey>,
@@ -215,9 +213,11 @@ async fn precommit_source_tracking_info(
215213
.fields
216214
.push(value.fields[*field as usize].clone());
217215
}
218-
let mut fingerprinter = Fingerprinter::default();
219-
field_values.serialize(&mut fingerprinter)?;
220-
let curr_fp = Some(fingerprinter.to_base64());
216+
let curr_fp = Some(
217+
Fingerprinter::default()
218+
.with(&field_values)?
219+
.to_fingerprint(),
220+
);
221221

222222
let existing_target_keys = target_info.existing_keys_info.remove(&primary_key_json);
223223
let existing_staging_target_keys = target_info
@@ -439,9 +439,8 @@ pub async fn update_source_entry<'a>(
439439
.map(|info| info.memoization_info.map(|info| info.0))
440440
.flatten()
441441
.flatten();
442-
let evaluation_cache = memoization_info
443-
.map(|info| EvaluationCache::from_stored(info.cache))
444-
.unwrap_or_default();
442+
let evaluation_cache =
443+
EvaluationCache::new(process_timestamp, memoization_info.map(|info| info.cache));
445444
let value_builder =
446445
evaluate_source_entry(plan, source_op_idx, schema, key, Some(&evaluation_cache)).await?;
447446

src/execution/memoization.rs

Lines changed: 105 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,25 @@
11
use anyhow::Result;
22
use serde::{Deserialize, Serialize};
3-
4-
use base64::prelude::*;
53
use std::{
64
collections::HashMap,
5+
future::Future,
76
sync::{Arc, Mutex},
87
};
98

10-
use crate::base::{schema, value};
11-
12-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13-
pub struct CacheKey(Vec<u8>);
14-
15-
impl Serialize for CacheKey {
16-
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
17-
where
18-
S: serde::Serializer,
19-
{
20-
serializer.serialize_str(&BASE64_STANDARD.encode(&self.0))
21-
}
22-
}
9+
use crate::{
10+
base::{schema, value},
11+
service::error::{SharedError, SharedResultExt},
12+
utils::fingerprint::Fingerprint,
13+
};
2314

24-
impl<'de> Deserialize<'de> for CacheKey {
25-
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
26-
where
27-
D: serde::Deserializer<'de>,
28-
{
29-
let s = String::deserialize(deserializer)?;
30-
let bytes = BASE64_STANDARD
31-
.decode(s)
32-
.map_err(serde::de::Error::custom)?;
33-
Ok(CacheKey(bytes))
34-
}
15+
#[derive(Debug, Clone, Serialize, Deserialize)]
16+
pub struct CacheEntry {
17+
time_sec: i64,
18+
value: serde_json::Value,
3519
}
36-
3720
#[derive(Debug, Clone, Serialize, Deserialize)]
3821
pub struct MemoizationInfo {
39-
pub cache: HashMap<CacheKey, serde_json::Value>,
22+
pub cache: HashMap<Fingerprint, CacheEntry>,
4023
}
4124

4225
impl Default for MemoizationInfo {
@@ -47,66 +30,132 @@ impl Default for MemoizationInfo {
4730
}
4831
}
4932

50-
enum EvaluationCacheEntry {
33+
struct EvaluationCacheEntry {
34+
time: chrono::DateTime<chrono::Utc>,
35+
data: EvaluationCacheData,
36+
}
37+
38+
enum EvaluationCacheData {
5139
/// Existing entry in previous runs, but not in current run yet.
5240
Previous(serde_json::Value),
5341
/// Value appeared in current run.
54-
Current(Arc<async_lock::OnceCell<value::Value>>),
42+
Current(Arc<async_lock::OnceCell<Result<value::Value, SharedError>>>),
5543
}
5644

57-
#[derive(Default)]
5845
pub struct EvaluationCache {
59-
cache: Mutex<HashMap<CacheKey, EvaluationCacheEntry>>,
46+
current_time: chrono::DateTime<chrono::Utc>,
47+
cache: Mutex<HashMap<Fingerprint, EvaluationCacheEntry>>,
6048
}
6149

6250
impl EvaluationCache {
63-
pub fn from_stored(cache: HashMap<CacheKey, serde_json::Value>) -> Self {
51+
pub fn new(
52+
current_time: chrono::DateTime<chrono::Utc>,
53+
existing_cache: Option<HashMap<Fingerprint, CacheEntry>>,
54+
) -> Self {
6455
Self {
56+
current_time,
6557
cache: Mutex::new(
66-
cache
58+
existing_cache
6759
.into_iter()
68-
.map(|(k, v)| (k, EvaluationCacheEntry::Previous(v)))
60+
.map(|e| e.into_iter())
61+
.flatten()
62+
.map(|(k, e)| {
63+
(
64+
k,
65+
EvaluationCacheEntry {
66+
time: chrono::DateTime::from_timestamp(e.time_sec, 0)
67+
.unwrap_or(chrono::DateTime::<chrono::Utc>::MIN_UTC),
68+
data: EvaluationCacheData::Previous(e.value),
69+
},
70+
)
71+
})
6972
.collect(),
7073
),
7174
}
7275
}
7376

74-
pub fn into_stored(self) -> Result<HashMap<CacheKey, serde_json::Value>> {
77+
pub fn into_stored(self) -> Result<HashMap<Fingerprint, CacheEntry>> {
7578
Ok(self
7679
.cache
7780
.into_inner()?
7881
.into_iter()
79-
.filter_map(|(k, v)| match v {
80-
EvaluationCacheEntry::Previous(_) => None,
81-
EvaluationCacheEntry::Current(entry) => {
82-
entry.get().map(|v| Ok((k, serde_json::to_value(v)?)))
83-
}
82+
.filter_map(|(k, e)| match e.data {
83+
EvaluationCacheData::Previous(_) => None,
84+
EvaluationCacheData::Current(entry) => match entry.get() {
85+
Some(Ok(v)) => Some(serde_json::to_value(v).map(|value| {
86+
(
87+
k,
88+
CacheEntry {
89+
time_sec: e.time.timestamp(),
90+
value,
91+
},
92+
)
93+
})),
94+
_ => None,
95+
},
8496
})
85-
.collect::<Result<_>>()?)
97+
.collect::<Result<_, _>>()?)
8698
}
8799

88100
pub fn get(
89101
&self,
90-
key: CacheKey,
102+
key: Fingerprint,
91103
typ: &schema::ValueType,
92-
) -> Result<Arc<async_lock::OnceCell<value::Value>>> {
104+
ttl: Option<chrono::Duration>,
105+
) -> Result<Arc<async_lock::OnceCell<Result<value::Value, SharedError>>>> {
93106
let mut cache = self.cache.lock().unwrap();
94-
let result = match cache.entry(key) {
95-
std::collections::hash_map::Entry::Occupied(mut entry) => match &mut entry.get_mut() {
96-
EvaluationCacheEntry::Previous(value) => {
97-
let value = value::Value::from_json(std::mem::take(value), typ)?;
98-
let cell = Arc::new(async_lock::OnceCell::from(value));
99-
entry.insert(EvaluationCacheEntry::Current(cell.clone()));
107+
let result = {
108+
match cache.entry(key) {
109+
std::collections::hash_map::Entry::Occupied(mut entry)
110+
if !ttl
111+
.map(|ttl| entry.get().time + ttl < self.current_time)
112+
.unwrap_or(false) =>
113+
{
114+
let entry_mut = &mut entry.get_mut();
115+
match &mut entry_mut.data {
116+
EvaluationCacheData::Previous(value) => {
117+
let value = value::Value::from_json(std::mem::take(value), typ)?;
118+
let cell = Arc::new(async_lock::OnceCell::from(Ok(value)));
119+
let time = entry_mut.time;
120+
entry.insert(EvaluationCacheEntry {
121+
time,
122+
data: EvaluationCacheData::Current(cell.clone()),
123+
});
124+
cell
125+
}
126+
EvaluationCacheData::Current(cell) => cell.clone(),
127+
}
128+
}
129+
entry => {
130+
let cell = Arc::new(async_lock::OnceCell::new());
131+
entry.insert_entry(EvaluationCacheEntry {
132+
time: self.current_time,
133+
data: EvaluationCacheData::Current(cell.clone()),
134+
});
100135
cell
101136
}
102-
EvaluationCacheEntry::Current(cell) => cell.clone(),
103-
},
104-
std::collections::hash_map::Entry::Vacant(entry) => {
105-
let cell = Arc::new(async_lock::OnceCell::new());
106-
entry.insert(EvaluationCacheEntry::Current(cell.clone()));
107-
cell
108137
}
109138
};
110139
Ok(result)
111140
}
141+
142+
pub async fn evaluate<Fut>(
143+
&self,
144+
key: Fingerprint,
145+
typ: &schema::ValueType,
146+
ttl: Option<chrono::Duration>,
147+
compute: impl FnOnce() -> Fut,
148+
) -> Result<value::Value>
149+
where
150+
Fut: Future<Output = Result<value::Value>>,
151+
{
152+
let cell = self.get(key, typ, ttl)?;
153+
let result = cell
154+
.get_or_init(|| {
155+
let fut = compute();
156+
async move { fut.await.map_err(SharedError::new) }
157+
})
158+
.await;
159+
Ok(result.clone().std_result()?)
160+
}
112161
}

src/execution/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,4 @@ pub mod query;
55
mod db_tracking;
66
pub mod db_tracking_setup;
77

8-
mod fingerprint;
98
mod memoization;

0 commit comments

Comments
 (0)