Skip to content

Commit 9c46675

Browse files
committed
Release GIL when blocking on inner calls
1 parent f348e95 commit 9c46675

File tree

2 files changed

+82
-45
lines changed

2 files changed

+82
-45
lines changed

python/src/lib.rs

+61-45
Original file line numberDiff line numberDiff line change
@@ -150,21 +150,21 @@ impl RawFileReader {
150150
self.inner.tell()
151151
}
152152

153-
pub fn read(&mut self, len: i64) -> PyHdfsResult<Cow<[u8]>> {
153+
pub fn read(&mut self, len: i64, py: Python) -> PyHdfsResult<Cow<[u8]>> {
154154
let read_len = if len < 0 {
155155
self.inner.remaining()
156156
} else {
157157
len as usize
158158
};
159159
Ok(Cow::from(
160-
self.rt.block_on(self.inner.read(read_len))?.to_vec(),
160+
py.allow_threads(|| self.rt.block_on(self.inner.read(read_len)))?
161+
.to_vec(),
161162
))
162163
}
163164

164-
pub fn read_range(&self, offset: usize, len: usize) -> PyHdfsResult<Cow<[u8]>> {
165+
pub fn read_range(&self, offset: usize, len: usize, py: Python) -> PyHdfsResult<Cow<[u8]>> {
165166
Ok(Cow::from(
166-
self.rt
167-
.block_on(self.inner.read_range(offset, len))?
167+
py.allow_threads(|| self.rt.block_on(self.inner.read_range(offset, len)))?
168168
.to_vec(),
169169
))
170170
}
@@ -254,12 +254,12 @@ struct RawFileWriter {
254254

255255
#[pymethods]
256256
impl RawFileWriter {
257-
pub fn write(&mut self, buf: Vec<u8>) -> PyHdfsResult<usize> {
258-
Ok(self.rt.block_on(self.inner.write(Bytes::from(buf)))?)
257+
pub fn write(&mut self, buf: Vec<u8>, py: Python) -> PyHdfsResult<usize> {
258+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.write(Bytes::from(buf))))?)
259259
}
260260

261-
pub fn close(&mut self) -> PyHdfsResult<()> {
262-
Ok(self.rt.block_on(self.inner.close())?)
261+
pub fn close(&mut self, py: Python) -> PyHdfsResult<()> {
262+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.close()))?)
263263
}
264264
}
265265

@@ -294,11 +294,12 @@ impl RawClient {
294294
})
295295
}
296296

297-
pub fn get_file_info(&self, path: &str) -> PyHdfsResult<PyFileStatus> {
298-
Ok(self
299-
.rt
300-
.block_on(self.inner.get_file_info(path))
301-
.map(PyFileStatus::from)?)
297+
pub fn get_file_info(&self, path: &str, py: Python) -> PyHdfsResult<PyFileStatus> {
298+
Ok(py.allow_threads(|| {
299+
self.rt
300+
.block_on(self.inner.get_file_info(path))
301+
.map(PyFileStatus::from)
302+
})?)
302303
}
303304

304305
pub fn list_status(&self, path: &str, recursive: bool) -> PyFileStatusIter {
@@ -309,78 +310,93 @@ impl RawClient {
309310
}
310311
}
311312

312-
pub fn read(&self, path: &str) -> PyHdfsResult<RawFileReader> {
313-
let file_reader = self.rt.block_on(self.inner.read(path))?;
313+
pub fn read(&self, path: &str, py: Python) -> PyHdfsResult<RawFileReader> {
314+
let file_reader = py.allow_threads(|| self.rt.block_on(self.inner.read(path)))?;
314315

315316
Ok(RawFileReader {
316317
inner: file_reader,
317318
rt: Arc::clone(&self.rt),
318319
})
319320
}
320321

321-
pub fn create(&self, src: &str, write_options: PyWriteOptions) -> PyHdfsResult<RawFileWriter> {
322-
let file_writer = self
323-
.rt
324-
.block_on(self.inner.create(src, WriteOptions::from(write_options)))?;
322+
pub fn create(
323+
&self,
324+
src: &str,
325+
write_options: PyWriteOptions,
326+
py: Python,
327+
) -> PyHdfsResult<RawFileWriter> {
328+
let file_writer = py.allow_threads(|| {
329+
self.rt
330+
.block_on(self.inner.create(src, WriteOptions::from(write_options)))
331+
})?;
325332

326333
Ok(RawFileWriter {
327334
inner: file_writer,
328335
rt: Arc::clone(&self.rt),
329336
})
330337
}
331338

332-
pub fn append(&self, src: &str) -> PyHdfsResult<RawFileWriter> {
333-
let file_writer = self.rt.block_on(self.inner.append(src))?;
339+
pub fn append(&self, src: &str, py: Python) -> PyHdfsResult<RawFileWriter> {
340+
let file_writer = py.allow_threads(|| self.rt.block_on(self.inner.append(src)))?;
334341

335342
Ok(RawFileWriter {
336343
inner: file_writer,
337344
rt: Arc::clone(&self.rt),
338345
})
339346
}
340347

341-
pub fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> PyHdfsResult<()> {
342-
Ok(self
343-
.rt
344-
.block_on(self.inner.mkdirs(path, permission, create_parent))?)
348+
pub fn mkdirs(
349+
&self,
350+
path: &str,
351+
permission: u32,
352+
create_parent: bool,
353+
py: Python,
354+
) -> PyHdfsResult<()> {
355+
Ok(py.allow_threads(|| {
356+
self.rt
357+
.block_on(self.inner.mkdirs(path, permission, create_parent))
358+
})?)
345359
}
346360

347-
pub fn rename(&self, src: &str, dst: &str, overwrite: bool) -> PyHdfsResult<()> {
348-
Ok(self.rt.block_on(self.inner.rename(src, dst, overwrite))?)
361+
pub fn rename(&self, src: &str, dst: &str, overwrite: bool, py: Python) -> PyHdfsResult<()> {
362+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.rename(src, dst, overwrite)))?)
349363
}
350364

351-
pub fn delete(&self, path: &str, recursive: bool) -> PyHdfsResult<bool> {
352-
Ok(self.rt.block_on(self.inner.delete(path, recursive))?)
365+
pub fn delete(&self, path: &str, recursive: bool, py: Python) -> PyHdfsResult<bool> {
366+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.delete(path, recursive)))?)
353367
}
354368

355-
pub fn set_times(&self, path: &str, mtime: u64, atime: u64) -> PyHdfsResult<()> {
356-
Ok(self.rt.block_on(self.inner.set_times(path, mtime, atime))?)
369+
pub fn set_times(&self, path: &str, mtime: u64, atime: u64, py: Python) -> PyHdfsResult<()> {
370+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.set_times(path, mtime, atime)))?)
357371
}
358372

359373
pub fn set_owner(
360374
&self,
361375
path: &str,
362376
owner: Option<&str>,
363377
group: Option<&str>,
378+
py: Python,
364379
) -> PyHdfsResult<()> {
365-
Ok(self.rt.block_on(self.inner.set_owner(path, owner, group))?)
380+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.set_owner(path, owner, group)))?)
366381
}
367382

368-
pub fn set_permission(&self, path: &str, permission: u32) -> PyHdfsResult<()> {
369-
Ok(self
370-
.rt
371-
.block_on(self.inner.set_permission(path, permission))?)
383+
pub fn set_permission(&self, path: &str, permission: u32, py: Python) -> PyHdfsResult<()> {
384+
Ok(py.allow_threads(|| {
385+
self.rt
386+
.block_on(self.inner.set_permission(path, permission))
387+
})?)
372388
}
373389

374-
pub fn set_replication(&self, path: &str, replication: u32) -> PyHdfsResult<bool> {
375-
Ok(self
376-
.rt
377-
.block_on(self.inner.set_replication(path, replication))?)
390+
pub fn set_replication(&self, path: &str, replication: u32, py: Python) -> PyHdfsResult<bool> {
391+
Ok(py.allow_threads(|| {
392+
self.rt
393+
.block_on(self.inner.set_replication(path, replication))
394+
})?)
378395
}
379396

380-
pub fn get_content_summary(&self, path: &str) -> PyHdfsResult<PyContentSummary> {
381-
Ok(self
382-
.rt
383-
.block_on(self.inner.get_content_summary(path))?
397+
pub fn get_content_summary(&self, path: &str, py: Python) -> PyHdfsResult<PyContentSummary> {
398+
Ok(py
399+
.allow_threads(|| self.rt.block_on(self.inner.get_content_summary(path)))?
384400
.into())
385401
}
386402
}

python/tests/test_benchmark.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from concurrent.futures import ThreadPoolExecutor
2+
3+
from pytest_benchmark.fixture import BenchmarkFixture
4+
5+
from hdfs_native import Client
6+
7+
8+
def do_work(client: Client):
9+
def delete(path: str):
10+
client.delete(path)
11+
12+
with ThreadPoolExecutor(100) as executor:
13+
for i in range(100):
14+
executor.submit(delete, f"/bench{i}")
15+
16+
17+
def test_threading(client: Client, benchmark: BenchmarkFixture):
18+
for i in range(100):
19+
client.create(f"/bench{i}").close()
20+
21+
benchmark(do_work, client)

0 commit comments

Comments
 (0)