Skip to content

Commit 6a753c4

Browse files
committed
Release GIL for listing
1 parent 6465408 commit 6a753c4

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

python/src/lib.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl PyFileStatus {
7171

7272
#[pyclass(name = "FileStatusIter")]
7373
struct PyFileStatusIter {
74-
inner: ListStatusIterator,
74+
inner: Arc<ListStatusIterator>,
7575
rt: Arc<Runtime>,
7676
}
7777

@@ -81,10 +81,11 @@ impl PyFileStatusIter {
8181
slf
8282
}
8383

84-
fn __next__(mut slf: PyRefMut<'_, Self>) -> PyHdfsResult<Option<PyFileStatus>> {
85-
// This is dumb, figure out how to get around the double borrow here
84+
fn __next__(slf: PyRefMut<'_, Self>) -> PyHdfsResult<Option<PyFileStatus>> {
85+
// Kinda dumb, but lets us release the GIL while getting the next value
86+
let inner = Arc::clone(&slf.inner);
8687
let rt = Arc::clone(&slf.rt);
87-
if let Some(result) = rt.block_on(slf.inner.next()) {
88+
if let Some(result) = slf.py().allow_threads(|| rt.block_on(inner.next())) {
8889
Ok(Some(PyFileStatus::from(result?)))
8990
} else {
9091
Ok(None)
@@ -305,7 +306,7 @@ impl RawClient {
305306
pub fn list_status(&self, path: &str, recursive: bool) -> PyFileStatusIter {
306307
let inner = self.inner.list_status_iter(path, recursive);
307308
PyFileStatusIter {
308-
inner,
309+
inner: Arc::new(inner),
309310
rt: Arc::clone(&self.rt),
310311
}
311312
}

rust/src/client.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ impl DirListingIterator {
588588
pub struct ListStatusIterator {
589589
mount_table: Arc<MountTable>,
590590
recursive: bool,
591-
iters: Vec<DirListingIterator>,
591+
iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
592592
}
593593

594594
impl ListStatusIterator {
@@ -598,20 +598,21 @@ impl ListStatusIterator {
598598
ListStatusIterator {
599599
mount_table,
600600
recursive,
601-
iters: vec![initial],
601+
iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
602602
}
603603
}
604604

605-
pub async fn next(&mut self) -> Option<Result<FileStatus>> {
605+
pub async fn next(&self) -> Option<Result<FileStatus>> {
606606
let mut next_file: Option<Result<FileStatus>> = None;
607+
let mut iters = self.iters.lock().await;
607608
while next_file.is_none() {
608-
if let Some(iter) = self.iters.last_mut() {
609+
if let Some(iter) = iters.last_mut() {
609610
if let Some(file_result) = iter.next().await {
610611
if let Ok(file) = file_result {
611612
// Return the directory as the next result, but start traversing into that directory
612613
// next if we're doing a recursive listing
613614
if file.isdir && self.recursive {
614-
self.iters.push(DirListingIterator::new(
615+
iters.push(DirListingIterator::new(
615616
file.path.clone(),
616617
&self.mount_table,
617618
false,
@@ -624,7 +625,7 @@ impl ListStatusIterator {
624625
}
625626
} else {
626627
// We've exhausted this directory
627-
self.iters.pop();
628+
iters.pop();
628629
}
629630
} else {
630631
// There's nothing left, just return None
@@ -636,7 +637,7 @@ impl ListStatusIterator {
636637
}
637638

638639
pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
639-
let listing = stream::unfold(self, |mut state| async move {
640+
let listing = stream::unfold(self, |state| async move {
640641
let next = state.next().await;
641642
next.map(|n| (n, state))
642643
});

0 commit comments

Comments
 (0)