Skip to content

Commit b2d20a2

Browse files
authored
Partial implementation for Google Drive source - list (#154)
#108 * `cargo add google-drive3` * Partial implementation for Google Drive source - to start integration. * Add spec for `GoogleDrive` source.
1 parent 69c5d63 commit b2d20a2

File tree

5 files changed

+158
-0
lines changed

5 files changed

+158
-0
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,8 @@ tree-sitter-yaml = "0.7.0"
7979

8080
globset = "0.4.16"
8181
unicase = "2.8.1"
82+
google-drive3 = "6.0.0"
83+
hyper-util = "0.1.10"
84+
hyper-rustls = { version = "0.27.5", features = ["ring"] }
85+
yup-oauth2 = "12.1.0"
86+
rustls = { version = "0.23.25", features = ["ring"] }

python/cocoindex/sources.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,13 @@ class LocalFile(op.SourceSpec):
1616
# If provided, files matching these patterns will be excluded.
1717
# See https://docs.rs/globset/latest/globset/index.html#syntax for the syntax of the patterns.
1818
excluded_patterns: list[str] | None = None
19+
20+
21+
class GoogleDrive(op.SourceSpec):
22+
"""Import data from Google Drive."""
23+
24+
_op_category = op.OpCategory.SOURCE
25+
26+
service_account_credential_path: str
27+
root_folder_id: str
28+
binary: bool = False

src/ops/registration.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ use std::sync::{Arc, LazyLock, RwLock, RwLockReadGuard};
77

88
fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result<()> {
99
sources::local_file::Factory.register(registry)?;
10+
sources::google_drive::Factory.register(registry)?;
11+
1012
functions::split_recursively::Factory.register(registry)?;
1113
functions::extract_by_llm::Factory.register(registry)?;
14+
1215
Arc::new(storages::postgres::Factory::default()).register(registry)?;
1316

1417
Ok(())

src/ops/sources/google_drive.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
use std::sync::Arc;
2+
3+
use google_drive3::{
4+
api::Scope,
5+
yup_oauth2::{read_service_account_key, ServiceAccountAuthenticator},
6+
DriveHub,
7+
};
8+
use hyper_rustls::HttpsConnector;
9+
use hyper_util::client::legacy::connect::HttpConnector;
10+
11+
use crate::ops::sdk::*;
12+
13+
#[derive(Debug, Deserialize)]
14+
pub struct Spec {
15+
service_account_credential_path: String,
16+
binary: bool,
17+
root_folder_id: String,
18+
}
19+
20+
struct Executor {
21+
drive_hub: DriveHub<HttpsConnector<HttpConnector>>,
22+
binary: bool,
23+
root_folder_id: String,
24+
}
25+
26+
impl Executor {
27+
async fn new(spec: Spec) -> Result<Self> {
28+
// let user_secret = read_authorized_user_secret(spec.service_account_credential_path).await?;
29+
let service_account_key =
30+
read_service_account_key(spec.service_account_credential_path).await?;
31+
let auth = ServiceAccountAuthenticator::builder(service_account_key)
32+
.build()
33+
.await?;
34+
let client =
35+
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
36+
.build(
37+
hyper_rustls::HttpsConnectorBuilder::new()
38+
.with_provider_and_native_roots(rustls::crypto::ring::default_provider())?
39+
.https_only()
40+
.enable_http1()
41+
.build(),
42+
);
43+
let drive_hub = DriveHub::new(client, auth);
44+
Ok(Self {
45+
drive_hub,
46+
binary: spec.binary,
47+
root_folder_id: spec.root_folder_id,
48+
})
49+
}
50+
}
51+
52+
fn escape_string(s: &str) -> String {
53+
let mut escaped = String::with_capacity(s.len());
54+
for c in s.chars() {
55+
match c {
56+
'\'' | '\\' => escaped.push('\\'),
57+
_ => {}
58+
}
59+
escaped.push(c);
60+
}
61+
escaped
62+
}
63+
64+
#[async_trait]
65+
impl SourceExecutor for Executor {
66+
async fn list_keys(&self) -> Result<Vec<KeyValue>> {
67+
let query = format!("'{}' in parents", escape_string(&self.root_folder_id));
68+
let mut next_page_token: Option<String> = None;
69+
let mut result = Vec::new();
70+
loop {
71+
let mut list_call = self
72+
.drive_hub
73+
.files()
74+
.list()
75+
.q(&query)
76+
.add_scope(Scope::Readonly);
77+
if let Some(next_page_token) = &next_page_token {
78+
list_call = list_call.page_token(next_page_token);
79+
}
80+
let (resp, files) = list_call.doit().await?;
81+
if let Some(files) = files.files {
82+
for file in files {
83+
if let Some(name) = file.name {
84+
result.push(KeyValue::Str(Arc::from(name)));
85+
}
86+
}
87+
}
88+
next_page_token = files.next_page_token;
89+
if next_page_token.is_none() {
90+
break;
91+
}
92+
}
93+
Ok(result)
94+
}
95+
96+
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {
97+
unimplemented!()
98+
}
99+
}
100+
101+
pub struct Factory;
102+
103+
#[async_trait]
104+
impl SourceFactoryBase for Factory {
105+
type Spec = Spec;
106+
107+
fn name(&self) -> &str {
108+
"GoogleDrive"
109+
}
110+
111+
fn get_output_schema(
112+
&self,
113+
spec: &Spec,
114+
_context: &FlowInstanceContext,
115+
) -> Result<EnrichedValueType> {
116+
Ok(make_output_type(CollectionSchema::new(
117+
CollectionKind::Table,
118+
vec![
119+
FieldSchema::new("filename", make_output_type(BasicValueType::Str)),
120+
FieldSchema::new(
121+
"content",
122+
make_output_type(if spec.binary {
123+
BasicValueType::Bytes
124+
} else {
125+
BasicValueType::Str
126+
}),
127+
),
128+
],
129+
)))
130+
}
131+
132+
async fn build_executor(
133+
self: Arc<Self>,
134+
spec: Spec,
135+
_context: Arc<FlowInstanceContext>,
136+
) -> Result<Box<dyn SourceExecutor>> {
137+
Ok(Box::new(Executor::new(spec).await?))
138+
}
139+
}

src/ops/sources/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
pub mod google_drive;
12
pub mod local_file;

0 commit comments

Comments
 (0)