Skip to content

Commit ffd719a

Browse files
authored
Release GIL (#153)
1 parent f348e95 commit ffd719a

File tree

4 files changed

+113
-57
lines changed

4 files changed

+113
-57
lines changed

python/pyproject.toml

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ devel = [
2323
"mypy~=1.13.0",
2424
"ruff~=0.7.2",
2525
"pytest~=8.3",
26+
"pytest-benchmark<5"
2627
]
2728

2829
[project.urls]
@@ -51,6 +52,7 @@ testpaths = [
5152
"tests",
5253
"hdfs_native",
5354
]
55+
addopts = "-m 'not benchmark'"
5456

5557
[tool.ruff.lint]
5658
extend-select = ["I"]

python/src/lib.rs

+67-50
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl PyFileStatus {
7171

7272
#[pyclass(name = "FileStatusIter")]
7373
struct PyFileStatusIter {
74-
inner: ListStatusIterator,
74+
inner: Arc<ListStatusIterator>,
7575
rt: Arc<Runtime>,
7676
}
7777

@@ -81,10 +81,11 @@ impl PyFileStatusIter {
8181
slf
8282
}
8383

84-
fn __next__(mut slf: PyRefMut<'_, Self>) -> PyHdfsResult<Option<PyFileStatus>> {
85-
// This is dumb, figure out how to get around the double borrow here
84+
fn __next__(slf: PyRefMut<'_, Self>) -> PyHdfsResult<Option<PyFileStatus>> {
85+
// Kinda dumb, but lets us release the GIL while getting the next value
86+
let inner = Arc::clone(&slf.inner);
8687
let rt = Arc::clone(&slf.rt);
87-
if let Some(result) = rt.block_on(slf.inner.next()) {
88+
if let Some(result) = slf.py().allow_threads(|| rt.block_on(inner.next())) {
8889
Ok(Some(PyFileStatus::from(result?)))
8990
} else {
9091
Ok(None)
@@ -150,21 +151,21 @@ impl RawFileReader {
150151
self.inner.tell()
151152
}
152153

153-
pub fn read(&mut self, len: i64) -> PyHdfsResult<Cow<[u8]>> {
154+
pub fn read(&mut self, len: i64, py: Python) -> PyHdfsResult<Cow<[u8]>> {
154155
let read_len = if len < 0 {
155156
self.inner.remaining()
156157
} else {
157158
len as usize
158159
};
159160
Ok(Cow::from(
160-
self.rt.block_on(self.inner.read(read_len))?.to_vec(),
161+
py.allow_threads(|| self.rt.block_on(self.inner.read(read_len)))?
162+
.to_vec(),
161163
))
162164
}
163165

164-
pub fn read_range(&self, offset: usize, len: usize) -> PyHdfsResult<Cow<[u8]>> {
166+
pub fn read_range(&self, offset: usize, len: usize, py: Python) -> PyHdfsResult<Cow<[u8]>> {
165167
Ok(Cow::from(
166-
self.rt
167-
.block_on(self.inner.read_range(offset, len))?
168+
py.allow_threads(|| self.rt.block_on(self.inner.read_range(offset, len)))?
168169
.to_vec(),
169170
))
170171
}
@@ -254,12 +255,12 @@ struct RawFileWriter {
254255

255256
#[pymethods]
256257
impl RawFileWriter {
257-
pub fn write(&mut self, buf: Vec<u8>) -> PyHdfsResult<usize> {
258-
Ok(self.rt.block_on(self.inner.write(Bytes::from(buf)))?)
258+
pub fn write(&mut self, buf: Vec<u8>, py: Python) -> PyHdfsResult<usize> {
259+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.write(Bytes::from(buf))))?)
259260
}
260261

261-
pub fn close(&mut self) -> PyHdfsResult<()> {
262-
Ok(self.rt.block_on(self.inner.close())?)
262+
pub fn close(&mut self, py: Python) -> PyHdfsResult<()> {
263+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.close()))?)
263264
}
264265
}
265266

@@ -294,93 +295,109 @@ impl RawClient {
294295
})
295296
}
296297

297-
pub fn get_file_info(&self, path: &str) -> PyHdfsResult<PyFileStatus> {
298-
Ok(self
299-
.rt
300-
.block_on(self.inner.get_file_info(path))
301-
.map(PyFileStatus::from)?)
298+
pub fn get_file_info(&self, path: &str, py: Python) -> PyHdfsResult<PyFileStatus> {
299+
Ok(py.allow_threads(|| {
300+
self.rt
301+
.block_on(self.inner.get_file_info(path))
302+
.map(PyFileStatus::from)
303+
})?)
302304
}
303305

304306
pub fn list_status(&self, path: &str, recursive: bool) -> PyFileStatusIter {
305307
let inner = self.inner.list_status_iter(path, recursive);
306308
PyFileStatusIter {
307-
inner,
309+
inner: Arc::new(inner),
308310
rt: Arc::clone(&self.rt),
309311
}
310312
}
311313

312-
pub fn read(&self, path: &str) -> PyHdfsResult<RawFileReader> {
313-
let file_reader = self.rt.block_on(self.inner.read(path))?;
314+
pub fn read(&self, path: &str, py: Python) -> PyHdfsResult<RawFileReader> {
315+
let file_reader = py.allow_threads(|| self.rt.block_on(self.inner.read(path)))?;
314316

315317
Ok(RawFileReader {
316318
inner: file_reader,
317319
rt: Arc::clone(&self.rt),
318320
})
319321
}
320322

321-
pub fn create(&self, src: &str, write_options: PyWriteOptions) -> PyHdfsResult<RawFileWriter> {
322-
let file_writer = self
323-
.rt
324-
.block_on(self.inner.create(src, WriteOptions::from(write_options)))?;
323+
pub fn create(
324+
&self,
325+
src: &str,
326+
write_options: PyWriteOptions,
327+
py: Python,
328+
) -> PyHdfsResult<RawFileWriter> {
329+
let file_writer = py.allow_threads(|| {
330+
self.rt
331+
.block_on(self.inner.create(src, WriteOptions::from(write_options)))
332+
})?;
325333

326334
Ok(RawFileWriter {
327335
inner: file_writer,
328336
rt: Arc::clone(&self.rt),
329337
})
330338
}
331339

332-
pub fn append(&self, src: &str) -> PyHdfsResult<RawFileWriter> {
333-
let file_writer = self.rt.block_on(self.inner.append(src))?;
340+
pub fn append(&self, src: &str, py: Python) -> PyHdfsResult<RawFileWriter> {
341+
let file_writer = py.allow_threads(|| self.rt.block_on(self.inner.append(src)))?;
334342

335343
Ok(RawFileWriter {
336344
inner: file_writer,
337345
rt: Arc::clone(&self.rt),
338346
})
339347
}
340348

341-
pub fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> PyHdfsResult<()> {
342-
Ok(self
343-
.rt
344-
.block_on(self.inner.mkdirs(path, permission, create_parent))?)
349+
pub fn mkdirs(
350+
&self,
351+
path: &str,
352+
permission: u32,
353+
create_parent: bool,
354+
py: Python,
355+
) -> PyHdfsResult<()> {
356+
Ok(py.allow_threads(|| {
357+
self.rt
358+
.block_on(self.inner.mkdirs(path, permission, create_parent))
359+
})?)
345360
}
346361

347-
pub fn rename(&self, src: &str, dst: &str, overwrite: bool) -> PyHdfsResult<()> {
348-
Ok(self.rt.block_on(self.inner.rename(src, dst, overwrite))?)
362+
pub fn rename(&self, src: &str, dst: &str, overwrite: bool, py: Python) -> PyHdfsResult<()> {
363+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.rename(src, dst, overwrite)))?)
349364
}
350365

351-
pub fn delete(&self, path: &str, recursive: bool) -> PyHdfsResult<bool> {
352-
Ok(self.rt.block_on(self.inner.delete(path, recursive))?)
366+
pub fn delete(&self, path: &str, recursive: bool, py: Python) -> PyHdfsResult<bool> {
367+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.delete(path, recursive)))?)
353368
}
354369

355-
pub fn set_times(&self, path: &str, mtime: u64, atime: u64) -> PyHdfsResult<()> {
356-
Ok(self.rt.block_on(self.inner.set_times(path, mtime, atime))?)
370+
pub fn set_times(&self, path: &str, mtime: u64, atime: u64, py: Python) -> PyHdfsResult<()> {
371+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.set_times(path, mtime, atime)))?)
357372
}
358373

359374
pub fn set_owner(
360375
&self,
361376
path: &str,
362377
owner: Option<&str>,
363378
group: Option<&str>,
379+
py: Python,
364380
) -> PyHdfsResult<()> {
365-
Ok(self.rt.block_on(self.inner.set_owner(path, owner, group))?)
381+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.set_owner(path, owner, group)))?)
366382
}
367383

368-
pub fn set_permission(&self, path: &str, permission: u32) -> PyHdfsResult<()> {
369-
Ok(self
370-
.rt
371-
.block_on(self.inner.set_permission(path, permission))?)
384+
pub fn set_permission(&self, path: &str, permission: u32, py: Python) -> PyHdfsResult<()> {
385+
Ok(py.allow_threads(|| {
386+
self.rt
387+
.block_on(self.inner.set_permission(path, permission))
388+
})?)
372389
}
373390

374-
pub fn set_replication(&self, path: &str, replication: u32) -> PyHdfsResult<bool> {
375-
Ok(self
376-
.rt
377-
.block_on(self.inner.set_replication(path, replication))?)
391+
pub fn set_replication(&self, path: &str, replication: u32, py: Python) -> PyHdfsResult<bool> {
392+
Ok(py.allow_threads(|| {
393+
self.rt
394+
.block_on(self.inner.set_replication(path, replication))
395+
})?)
378396
}
379397

380-
pub fn get_content_summary(&self, path: &str) -> PyHdfsResult<PyContentSummary> {
381-
Ok(self
382-
.rt
383-
.block_on(self.inner.get_content_summary(path))?
398+
pub fn get_content_summary(&self, path: &str, py: Python) -> PyHdfsResult<PyContentSummary> {
399+
Ok(py
400+
.allow_threads(|| self.rt.block_on(self.inner.get_content_summary(path)))?
384401
.into())
385402
}
386403
}

python/tests/test_benchmark.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from concurrent.futures import ThreadPoolExecutor, as_completed
2+
3+
import pytest
4+
from pytest_benchmark.fixture import BenchmarkFixture
5+
6+
from hdfs_native import Client
7+
8+
9+
@pytest.mark.benchmark
10+
def test_benchmark_threading(client: Client, benchmark: BenchmarkFixture):
11+
def do_work():
12+
def func(path: str):
13+
client.create(path).close()
14+
return client.delete(path)
15+
16+
with ThreadPoolExecutor(100) as executor:
17+
futures = []
18+
for i in range(1000):
19+
futures.append(executor.submit(func, f"/bench{i}"))
20+
21+
for future in as_completed(futures):
22+
assert future.result()
23+
24+
benchmark(do_work)
25+
26+
27+
@pytest.mark.benchmark
28+
def test_benchmark_listing(client: Client, benchmark: BenchmarkFixture):
29+
for i in range(1000):
30+
client.create(f"/bench{i}").close()
31+
32+
def do_work():
33+
for _ in client.list_status("/"):
34+
pass
35+
36+
benchmark(do_work)

rust/src/client.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ impl DirListingIterator {
588588
pub struct ListStatusIterator {
589589
mount_table: Arc<MountTable>,
590590
recursive: bool,
591-
iters: Vec<DirListingIterator>,
591+
iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
592592
}
593593

594594
impl ListStatusIterator {
@@ -598,20 +598,21 @@ impl ListStatusIterator {
598598
ListStatusIterator {
599599
mount_table,
600600
recursive,
601-
iters: vec![initial],
601+
iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
602602
}
603603
}
604604

605-
pub async fn next(&mut self) -> Option<Result<FileStatus>> {
605+
pub async fn next(&self) -> Option<Result<FileStatus>> {
606606
let mut next_file: Option<Result<FileStatus>> = None;
607+
let mut iters = self.iters.lock().await;
607608
while next_file.is_none() {
608-
if let Some(iter) = self.iters.last_mut() {
609+
if let Some(iter) = iters.last_mut() {
609610
if let Some(file_result) = iter.next().await {
610611
if let Ok(file) = file_result {
611612
// Return the directory as the next result, but start traversing into that directory
612613
// next if we're doing a recursive listing
613614
if file.isdir && self.recursive {
614-
self.iters.push(DirListingIterator::new(
615+
iters.push(DirListingIterator::new(
615616
file.path.clone(),
616617
&self.mount_table,
617618
false,
@@ -624,7 +625,7 @@ impl ListStatusIterator {
624625
}
625626
} else {
626627
// We've exhausted this directory
627-
self.iters.pop();
628+
iters.pop();
628629
}
629630
} else {
630631
// There's nothing left, just return None
@@ -636,7 +637,7 @@ impl ListStatusIterator {
636637
}
637638

638639
pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
639-
let listing = stream::unfold(self, |mut state| async move {
640+
let listing = stream::unfold(self, |state| async move {
640641
let next = state.next().await;
641642
next.map(|n| (n, state))
642643
});

0 commit comments

Comments
 (0)