Skip to content

Commit 3a4b66f

Browse files
authored
Support reading content for gdrive files. (#155)
1 parent b2d20a2 commit 3a4b66f

File tree

2 files changed

+51
-7
lines changed

2 files changed

+51
-7
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,4 @@ hyper-util = "0.1.10"
8484
hyper-rustls = { version = "0.27.5", features = ["ring"] }
8585
yup-oauth2 = "12.1.0"
8686
rustls = { version = "0.23.25", features = ["ring"] }
87+
http-body-util = "0.1.3"

src/ops/sources/google_drive.rs

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use std::sync::Arc;
22

3+
use futures::future::try_join;
34
use google_drive3::{
45
api::Scope,
56
yup_oauth2::{read_service_account_key, ServiceAccountAuthenticator},
67
DriveHub,
78
};
9+
use http_body_util::BodyExt;
810
use hyper_rustls::HttpsConnector;
911
use hyper_util::client::legacy::connect::HttpConnector;
12+
use log::debug;
1013

1114
use crate::ops::sdk::*;
1215

@@ -37,7 +40,7 @@ impl Executor {
3740
hyper_rustls::HttpsConnectorBuilder::new()
3841
.with_provider_and_native_roots(rustls::crypto::ring::default_provider())?
3942
.https_only()
40-
.enable_http1()
43+
.enable_http2()
4144
.build(),
4245
);
4346
let drive_hub = DriveHub::new(client, auth);
@@ -72,16 +75,17 @@ impl SourceExecutor for Executor {
7275
.drive_hub
7376
.files()
7477
.list()
75-
.q(&query)
76-
.add_scope(Scope::Readonly);
78+
.add_scope(Scope::Readonly)
79+
.q(&query);
7780
if let Some(next_page_token) = &next_page_token {
7881
list_call = list_call.page_token(next_page_token);
7982
}
80-
let (resp, files) = list_call.doit().await?;
83+
let (_, files) = list_call.doit().await?;
8184
if let Some(files) = files.files {
8285
for file in files {
83-
if let Some(name) = file.name {
84-
result.push(KeyValue::Str(Arc::from(name)));
86+
debug!("file: {:?}", file);
87+
if let Some(id) = file.id {
88+
result.push(KeyValue::Str(Arc::from(id)));
8589
}
8690
}
8791
}
@@ -94,7 +98,45 @@ impl SourceExecutor for Executor {
9498
}
9599

96100
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {
97-
unimplemented!()
101+
let file_id = key.str_value()?;
102+
103+
let filename = async {
104+
let (_, file) = self
105+
.drive_hub
106+
.files()
107+
.get(file_id)
108+
.add_scope(Scope::Readonly)
109+
.doit()
110+
.await?;
111+
anyhow::Ok(file.name.unwrap_or_default())
112+
};
113+
let body = async {
114+
let (resp, _) = self
115+
.drive_hub
116+
.files()
117+
.get(file_id)
118+
.add_scope(Scope::Readonly)
119+
.param("alt", "media")
120+
.doit()
121+
.await?;
122+
let content = resp.into_body().collect().await?;
123+
anyhow::Ok(content)
124+
};
125+
let (filename, content) = try_join(filename, body).await?;
126+
127+
let mut fields = Vec::with_capacity(2);
128+
fields.push(filename.into());
129+
if self.binary {
130+
fields.push(content.to_bytes().to_vec().into());
131+
} else {
132+
fields.push(
133+
String::from_utf8_lossy(&content.to_bytes())
134+
.to_string()
135+
.into(),
136+
);
137+
}
138+
139+
Ok(Some(FieldValues { fields }))
98140
}
99141
}
100142

@@ -116,6 +158,7 @@ impl SourceFactoryBase for Factory {
116158
Ok(make_output_type(CollectionSchema::new(
117159
CollectionKind::Table,
118160
vec![
161+
FieldSchema::new("file_id", make_output_type(BasicValueType::Str)),
119162
FieldSchema::new("filename", make_output_type(BasicValueType::Str)),
120163
FieldSchema::new(
121164
"content",

0 commit comments

Comments
 (0)