4
4
import posixpath
5
5
import logging
6
6
7
- from funcy import cached_property , retry
7
+ from funcy import cached_property , retry , compose , decorator
8
+ from funcy .py3 import cat
8
9
10
+ from dvc .remote .gdrive .utils import TrackFileReadProgress , FOLDER_MIME_TYPE
9
11
from dvc .scheme import Schemes
10
12
from dvc .path_info import CloudURLInfo
11
13
from dvc .remote .base import RemoteBASE
12
14
from dvc .config import Config
13
15
from dvc .exceptions import DvcException
14
- from dvc .remote .gdrive .pydrive import (
15
- RequestListFile ,
16
- RequestListFilePaginated ,
17
- RequestCreateFolder ,
18
- RequestUploadFile ,
19
- RequestDownloadFile ,
20
- )
21
- from dvc .remote .gdrive .utils import FOLDER_MIME_TYPE
22
16
23
17
logger = logging .getLogger (__name__ )
24
18
19
+
25
20
class GDriveRetriableError (DvcException ):
26
21
def __init__ (self , msg ):
27
22
super (GDriveRetriableError , self ).__init__ (msg )
28
23
24
+
25
+ @decorator
26
+ def _wrap_pydrive_retriable (call ):
27
+ try :
28
+ result = call ()
29
+ except Exception as exception :
30
+ retry_codes = ["403" , "500" , "502" , "503" , "504" ]
31
+ if any (
32
+ "HttpError {}" .format (code ) in str (exception )
33
+ for code in retry_codes
34
+ ):
35
+ raise GDriveRetriableError (msg = "Google API request failed" )
36
+ raise
37
+ return result
38
+
39
+
40
+ gdrive_retry = compose (
41
+ # 8 tries, start at 0.5s, multiply by golden ratio, cap at 10s
42
+ retry (
43
+ 8 , GDriveRetriableError , timeout = lambda a : min (0.5 * 1.618 ** a , 10 )
44
+ ),
45
+ _wrap_pydrive_retriable ,
46
+ )
47
+
48
+
29
49
class RemoteGDrive (RemoteBASE ):
30
50
scheme = Schemes .GDRIVE
31
51
path_cls = CloudURLInfo
@@ -62,28 +82,57 @@ def init_drive(self):
62
82
self .root_id = self .get_path_id (self .path_info , create = True )
63
83
self .cached_dirs , self .cached_ids = self .cache_root_dirs ()
64
84
65
- # 8 tries, start at 0.5s, multiply by golden ratio, cap at 10s
66
- @retry (8 ,
67
- errors = (GDriveRetriableError ),
68
- timeout = lambda a : min (0.5 * 1.618 ** a , 10 ))
69
- def execute_request (self , request ):
70
- from pydrive .files import ApiRequestError
71
- try :
72
- result = request .execute ()
73
- except Exception as exception :
74
- retry_codes = ["403" , "500" , "502" , "503" , "504" ]
75
- if any ("HttpError {}" .format (code ) in str (exception ) for code in retry_codes ):
76
- raise GDriveRetriableError ("Google API request failed" )
77
- raise
78
- return result
85
+ def request_list_file (self , query ):
86
+ return self .drive .ListFile ({"q" : query , "maxResults" : 1000 }).GetList ()
87
+
88
+ def request_create_folder (self , title , parent_id ):
89
+ item = self .drive .CreateFile (
90
+ {
91
+ "title" : title ,
92
+ "parents" : [{"id" : parent_id }],
93
+ "mimeType" : FOLDER_MIME_TYPE ,
94
+ }
95
+ )
96
+ item .Upload ()
97
+ return item
98
+
99
+ def request_upload_file (
100
+ self , args , no_progress_bar = True , from_file = "" , progress_name = ""
101
+ ):
102
+ item = self .drive .CreateFile (
103
+ {"title" : args ["title" ], "parents" : [{"id" : args ["parent_id" ]}]}
104
+ )
105
+ self .upload_file (item , no_progress_bar , from_file , progress_name )
106
+ return item
107
+
108
+ def upload_file (self , item , no_progress_bar , from_file , progress_name ):
109
+ with open (from_file , "rb" ) as opened_file :
110
+ if not no_progress_bar :
111
+ opened_file = TrackFileReadProgress (progress_name , opened_file )
112
+ if os .stat (from_file ).st_size :
113
+ item .content = opened_file
114
+ item .Upload ()
115
+
116
+ def request_download_file (
117
+ self , file_id , to_file , progress_name , no_progress_bar
118
+ ):
119
+ from dvc .progress import Tqdm
120
+
121
+ gdrive_file = self .drive .CreateFile ({"id" : file_id })
122
+ if not no_progress_bar :
123
+ tqdm = Tqdm (desc = progress_name , total = int (gdrive_file ["fileSize" ]))
124
+ gdrive_file .GetContentFile (to_file )
125
+ if not no_progress_bar :
126
+ tqdm .close ()
79
127
80
128
def list_drive_item (self , query ):
81
- list_request = RequestListFilePaginated (self .drive , query )
82
- page_list = self .execute_request (list_request )
83
- while page_list :
84
- for item in page_list :
85
- yield item
86
- page_list = self .execute_request (list_request )
129
+ file_list = self .drive .ListFile ({"q" : query , "maxResults" : 1000 })
130
+
131
+ # Isolate and decorate fetching of remote drive items in pages
132
+ get_list = gdrive_retry (lambda : next (file_list , None ))
133
+
134
+ # Fetch pages until None is received, lazily flatten the thing
135
+ return cat (iter (get_list , None ))
87
136
88
137
def cache_root_dirs (self ):
89
138
cached_dirs = {}
@@ -139,11 +188,9 @@ def drive(self):
139
188
return gdrive
140
189
141
190
def create_drive_item (self , parent_id , title ):
142
- upload_request = RequestCreateFolder (
143
- {"drive" : self .drive , "title" : title , "parent_id" : parent_id }
144
- )
145
- result = self .execute_request (upload_request )
146
- return result
191
+ return gdrive_retry (
192
+ lambda : self .request_create_folder (title , parent_id )
193
+ )()
147
194
148
195
def get_drive_item (self , name , parents_ids ):
149
196
if not parents_ids :
@@ -154,8 +201,7 @@ def get_drive_item(self, name, parents_ids):
154
201
155
202
query += " and trashed=false and title='{}'" .format (name )
156
203
157
- list_request = RequestListFile (self .drive , query )
158
- item_list = self .execute_request (list_request )
204
+ item_list = gdrive_retry (lambda : self .request_list_file (query ))()
159
205
return next (iter (item_list ), None )
160
206
161
207
def resolve_remote_file (self , parents_ids , path_parts , create ):
@@ -213,30 +259,22 @@ def _upload(self, from_file, to_info, name, no_progress_bar):
213
259
else :
214
260
parent_id = to_info .bucket
215
261
216
- upload_request = RequestUploadFile (
217
- {
218
- "drive" : self .drive ,
219
- "title" : to_info .name ,
220
- "parent_id" : parent_id ,
221
- },
222
- no_progress_bar ,
223
- from_file ,
224
- name ,
225
- )
226
- self .execute_request (upload_request )
262
+ gdrive_retry (
263
+ lambda : self .request_upload_file (
264
+ {"title" : to_info .name , "parent_id" : parent_id },
265
+ no_progress_bar ,
266
+ from_file ,
267
+ name ,
268
+ )
269
+ )()
227
270
228
271
def _download (self , from_info , to_file , name , no_progress_bar ):
229
272
file_id = self .get_path_id (from_info )
230
- download_request = RequestDownloadFile (
231
- {
232
- "drive" : self .drive ,
233
- "file_id" : file_id ,
234
- "to_file" : to_file ,
235
- "progress_name" : name ,
236
- "no_progress_bar" : no_progress_bar ,
237
- }
238
- )
239
- self .execute_request (download_request )
273
+ gdrive_retry (
274
+ lambda : self .request_download_file (
275
+ file_id , to_file , name , no_progress_bar
276
+ )
277
+ )()
240
278
241
279
def list_cache_paths (self ):
242
280
file_id = self .get_path_id (self .path_info )
0 commit comments