Skip to content

Commit 4f7e001

Browse files
authored
Recursively traverse sub folders in Google Drive source. (#161)
1 parent c3a3206 commit 4f7e001

File tree

1 file changed

+30
-9
lines changed

1 file changed

+30
-9
lines changed

src/ops/sources/google_drive.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@ use google_drive3::{
99
use http_body_util::BodyExt;
1010
use hyper_rustls::HttpsConnector;
1111
use hyper_util::client::legacy::connect::HttpConnector;
12+
use indexmap::IndexSet;
1213
use log::debug;
1314

1415
use crate::ops::sdk::*;
1516

17+
const FOLDER_MIME_TYPE: &'static str = "application/vnd.google-apps.folder";
18+
1619
#[derive(Debug, Deserialize)]
1720
pub struct Spec {
1821
service_account_credential_path: String,
@@ -28,7 +31,6 @@ struct Executor {
2831

2932
impl Executor {
3033
async fn new(spec: Spec) -> Result<Self> {
31-
// let user_secret = read_authorized_user_secret(spec.service_account_credential_path).await?;
3234
let service_account_key =
3335
read_service_account_key(spec.service_account_credential_path).await?;
3436
let auth = ServiceAccountAuthenticator::builder(service_account_key)
@@ -64,12 +66,18 @@ fn escape_string(s: &str) -> String {
6466
escaped
6567
}
6668

67-
#[async_trait]
68-
impl SourceExecutor for Executor {
69-
async fn list_keys(&self) -> Result<Vec<KeyValue>> {
70-
let query = format!("'{}' in parents", escape_string(&self.root_folder_id));
69+
impl Executor {
70+
async fn traverse_folder(
71+
&self,
72+
folder_id: &str,
73+
visited_folder_ids: &mut IndexSet<String>,
74+
result: &mut IndexSet<KeyValue>,
75+
) -> Result<()> {
76+
if !visited_folder_ids.insert(folder_id.to_string()) {
77+
return Ok(());
78+
}
79+
let query = format!("'{}' in parents", escape_string(folder_id));
7180
let mut next_page_token: Option<String> = None;
72-
let mut result = Vec::new();
7381
loop {
7482
let mut list_call = self
7583
.drive_hub
@@ -83,9 +91,12 @@ impl SourceExecutor for Executor {
8391
let (_, files) = list_call.doit().await?;
8492
if let Some(files) = files.files {
8593
for file in files {
86-
debug!("file: {:?}", file);
8794
if let Some(id) = file.id {
88-
result.push(KeyValue::Str(Arc::from(id)));
95+
if file.mime_type.as_ref() == Some(&FOLDER_MIME_TYPE.to_string()) {
96+
Box::pin(self.traverse_folder(&id, visited_folder_ids, result)).await?;
97+
} else {
98+
result.insert(KeyValue::Str(Arc::from(id)));
99+
}
89100
}
90101
}
91102
}
@@ -94,7 +105,17 @@ impl SourceExecutor for Executor {
94105
break;
95106
}
96107
}
97-
Ok(result)
108+
Ok(())
109+
}
110+
}
111+
112+
#[async_trait]
113+
impl SourceExecutor for Executor {
114+
async fn list_keys(&self) -> Result<Vec<KeyValue>> {
115+
let mut result = IndexSet::new();
116+
self.traverse_folder(&self.root_folder_id, &mut IndexSet::new(), &mut result)
117+
.await?;
118+
Ok(result.into_iter().collect())
98119
}
99120

100121
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {

0 commit comments

Comments
 (0)