Skip to content

Commit 55a55e2

Browse files
committed
feat: make source get API return ordinal optionally
1 parent 4423133 commit 55a55e2

File tree

7 files changed

+117
-41
lines changed

7 files changed

+117
-41
lines changed

src/execution/dumper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ impl<'a> Dumper<'a> {
167167
async fn evaluate_and_dump_for_source(&self, import_op: &AnalyzedImportOp) -> Result<()> {
168168
let mut keys_by_filename_prefix: IndexMap<String, Vec<value::KeyValue>> = IndexMap::new();
169169

170-
let mut rows_stream = import_op.executor.list(SourceExecutorListOptions {
170+
let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions {
171171
include_ordinal: false,
172172
});
173173
while let Some(rows) = rows_stream.next().await {

src/execution/row_indexer.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ use super::stats;
1313
use crate::base::schema;
1414
use crate::base::value::{self, FieldValues, KeyValue};
1515
use crate::builder::plan::*;
16-
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal};
16+
use crate::ops::interface::{
17+
ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorGetOptions,
18+
};
1719
use crate::utils::db::WriteAction;
1820
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
1921

@@ -460,7 +462,18 @@ pub async fn evaluate_source_entry_with_memory(
460462
None
461463
};
462464
let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options);
463-
let source_value = match import_op.executor.get_value(key).await? {
465+
let source_value = match import_op
466+
.executor
467+
.get_value(
468+
key,
469+
&SourceExecutorGetOptions {
470+
include_value: true,
471+
include_ordinal: false,
472+
},
473+
)
474+
.await?
475+
.value
476+
{
464477
Some(d) => d,
465478
None => return Ok(None),
466479
};

src/execution/source_indexer.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,17 @@ impl SourceIndexingContext {
101101
// also happens for update cases and there's no way to keep them always in sync for many sources.
102102
//
103103
// We only need source version <= actual version for value.
104-
import_op.executor.get_value(&key).await?
104+
import_op
105+
.executor
106+
.get_value(
107+
&key,
108+
&interface::SourceExecutorGetOptions {
109+
include_value: true,
110+
include_ordinal: false,
111+
},
112+
)
113+
.await?
114+
.value
105115
};
106116
let schema = &self.flow.data_schema;
107117
let result = row_indexer::update_source_row(
@@ -203,7 +213,7 @@ impl SourceIndexingContext {
203213
let import_op = &plan.import_ops[self.source_idx];
204214
let mut rows_stream = import_op
205215
.executor
206-
.list(interface::SourceExecutorListOptions {
216+
.list(&interface::SourceExecutorListOptions {
207217
include_ordinal: true,
208218
});
209219
let mut join_set = JoinSet::new();

src/ops/interface.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,34 @@ pub struct SourceExecutorListOptions {
6666
pub include_ordinal: bool,
6767
}
6868

69+
#[derive(Debug, Default)]
70+
pub struct SourceExecutorGetOptions {
71+
pub include_ordinal: bool,
72+
pub include_value: bool,
73+
}
74+
75+
#[derive(Debug, Default)]
76+
pub struct SourceValue {
77+
// None if not exists, or not included in the option.
78+
pub value: Option<FieldValues>,
79+
// None if unavailable, or not included in the option.
80+
pub ordinal: Option<Ordinal>,
81+
}
82+
6983
#[async_trait]
7084
pub trait SourceExecutor: Send + Sync {
7185
/// Get the list of keys for the source.
72-
fn list(
73-
&self,
74-
options: SourceExecutorListOptions,
75-
) -> BoxStream<'_, Result<Vec<SourceRowMetadata>>>;
86+
fn list<'a>(
87+
&'a self,
88+
options: &'a SourceExecutorListOptions,
89+
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>>;
7690

7791
// Get the value for the given key.
78-
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>>;
92+
async fn get_value(
93+
&self,
94+
key: &KeyValue,
95+
options: &SourceExecutorGetOptions,
96+
) -> Result<SourceValue>;
7997

8098
async fn change_stream(&self) -> Result<Option<BoxStream<'async_trait, SourceChange>>> {
8199
Ok(None)

src/ops/sources/google_drive.rs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -292,21 +292,25 @@ impl<T> ResultExt<T> for google_drive3::Result<T> {
292292
}
293293
}
294294

295+
fn optional_modified_time(include_ordinal: bool) -> &'static str {
296+
if include_ordinal {
297+
",modifiedTime"
298+
} else {
299+
""
300+
}
301+
}
302+
295303
#[async_trait]
296304
impl SourceExecutor for Executor {
297-
fn list(
298-
&self,
299-
options: SourceExecutorListOptions,
300-
) -> BoxStream<'_, Result<Vec<SourceRowMetadata>>> {
305+
fn list<'a>(
306+
&'a self,
307+
options: &'a SourceExecutorListOptions,
308+
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>> {
301309
let mut seen_ids = HashSet::new();
302310
let mut folder_ids = self.root_folder_ids.clone();
303311
let fields = format!(
304312
"files(id,name,mimeType,trashed{})",
305-
if options.include_ordinal {
306-
",modifiedTime"
307-
} else {
308-
""
309-
}
313+
optional_modified_time(options.include_ordinal)
310314
);
311315
let mut new_folder_ids = Vec::new();
312316
try_stream! {
@@ -333,20 +337,35 @@ impl SourceExecutor for Executor {
333337
.boxed()
334338
}
335339

336-
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {
340+
async fn get_value(
341+
&self,
342+
key: &KeyValue,
343+
options: &SourceExecutorGetOptions,
344+
) -> Result<SourceValue> {
337345
let file_id = key.str_value()?;
346+
let fields = format!(
347+
"id,name,mimeType,trashed{}",
348+
optional_modified_time(options.include_ordinal)
349+
);
338350
let resp = self
339351
.drive_hub
340352
.files()
341353
.get(file_id)
342354
.add_scope(Scope::Readonly)
343-
.param("fields", "id,name,mimeType,trashed")
355+
.param("fields", &fields)
344356
.doit()
345357
.await
346358
.or_not_found()?;
347359
let file = match resp {
348360
Some((_, file)) if file.trashed != Some(true) => file,
349-
_ => return Ok(None),
361+
_ => {
362+
return Ok(SourceValue::default());
363+
}
364+
};
365+
let ordinal = if options.include_ordinal {
366+
file.modified_time.map(|t| t.try_into()).transpose()?
367+
} else {
368+
None
350369
};
351370
let type_n_body = if let Some(export_mime_type) = file
352371
.mime_type
@@ -396,7 +415,7 @@ impl SourceExecutor for Executor {
396415
}
397416
None => None,
398417
};
399-
Ok(value)
418+
Ok(SourceValue { value, ordinal })
400419
}
401420

402421
async fn change_stream(&self) -> Result<Option<BoxStream<'async_trait, SourceChange>>> {

src/ops/sources/local_file.rs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ impl Executor {
4040

4141
#[async_trait]
4242
impl SourceExecutor for Executor {
43-
fn list(
44-
&self,
45-
options: SourceExecutorListOptions,
46-
) -> BoxStream<'_, Result<Vec<SourceRowMetadata>>> {
43+
fn list<'a>(
44+
&'a self,
45+
options: &'a SourceExecutorListOptions,
46+
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>> {
4747
let root_component_size = self.root_path.components().count();
4848
let mut dirs = Vec::new();
4949
dirs.push(Cow::Borrowed(&self.root_path));
@@ -84,24 +84,40 @@ impl SourceExecutor for Executor {
8484
.boxed()
8585
}
8686

87-
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {
87+
async fn get_value(
88+
&self,
89+
key: &KeyValue,
90+
options: &SourceExecutorGetOptions,
91+
) -> Result<SourceValue> {
8892
if !self.is_file_included(key.str_value()?.as_ref()) {
89-
return Ok(None);
93+
return Ok(SourceValue {
94+
value: None,
95+
ordinal: None,
96+
});
9097
}
9198
let path = self.root_path.join(key.str_value()?.as_ref());
92-
let value = match std::fs::read(path) {
93-
Ok(content) => {
94-
let content = if self.binary {
95-
fields_value!(content)
96-
} else {
97-
fields_value!(String::from_utf8_lossy(&content).to_string())
98-
};
99-
Some(content)
99+
let ordinal = if options.include_ordinal {
100+
Some(path.metadata()?.modified()?.try_into()?)
101+
} else {
102+
None
103+
};
104+
let value = if options.include_value {
105+
match std::fs::read(path) {
106+
Ok(content) => {
107+
let content = if self.binary {
108+
fields_value!(content)
109+
} else {
110+
fields_value!(String::from_utf8_lossy(&content).to_string())
111+
};
112+
Some(content)
113+
}
114+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
115+
Err(e) => Err(e)?,
100116
}
101-
Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
102-
Err(e) => Err(e)?,
117+
} else {
118+
None
103119
};
104-
Ok(value)
120+
Ok(SourceValue { value, ordinal })
105121
}
106122
}
107123

src/service/flows.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ pub async fn get_keys(
8989
)
9090
})?;
9191

92-
let mut rows_stream = import_op.executor.list(SourceExecutorListOptions {
92+
let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions {
9393
include_ordinal: false,
9494
});
9595
let mut keys = Vec::new();

0 commit comments

Comments
 (0)