Skip to content

feat: allow lazily chunking unsorted iteration #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/my_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn main() {

// Open finished file
let all_items = if unsorted_read {
UnsortedShardReader::<T1>::open(tmp.path())?.collect::<Result<_, _>>()?
UnsortedShardReader::<T1>::open(tmp.path()).collect::<Result<_, _>>()?
} else {
let reader = ShardReader::<T1>::open(tmp.path())?;

Expand Down
16 changes: 16 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1853,10 +1853,20 @@ mod shard_tests {
}

// Check the unsorted read
assert_eq!(n_items, UnsortedShardReader::<T1>::len(&[tmp.path()])?);
let unsorted_reader = UnsortedShardReader::<T1>::open(tmp.path());
let all_items_res: Result<Vec<_>, Error> = unsorted_reader.collect();
let all_items = all_items_res?;
assert!(set_compare(&true_items, &all_items));

// Check that the skip_lazy feature produces the expected results.
let mut unsorted_reader_skip = UnsortedShardReader::<T1>::open(tmp.path());
let to_skip = (disk_chunk_size * 3) + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe scramble this value somehow to get different boundaries, or try a few different carefully chosen skip values here? You could skip to a file boundary, a chunk boundary, or the middle of a chunk and ideally the tests would hit all 3 cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ultimately didn't merge the cellranger patch that made use of this feature, so I'm a bit on the fence about merging it at all at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into this again now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I enhanced the tests by writing two shard files of size n_items, to test the set reading logic. I expanded the skip tests to include these offsets:

            check_unsorted_skip(0)?;
            check_unsorted_skip(1)?;
            check_unsorted_skip(disk_chunk_size)?;
            check_unsorted_skip((disk_chunk_size * 3) + 1)?;
            check_unsorted_skip(n_items)?; // skip entire first file
            check_unsorted_skip(n_items + 1)?; // skip entire first file plus next item
            check_unsorted_skip(n_items * 2)?; // skip everything

let skipped = unsorted_reader_skip.skip_lazy(to_skip)?;
assert_eq!(to_skip, skipped);
let all_items_res_skip: Result<Vec<_>, Error> = unsorted_reader_skip.collect();
let all_items_skip = all_items_res_skip?;
assert_eq!(&all_items[to_skip..], &all_items_skip);
}
Ok(())
}
Expand Down Expand Up @@ -2117,5 +2127,11 @@ mod shard_tests {
let shard_files = Vec::<PathBuf>::new();
let reader = UnsortedShardReader::<u8>::open_set(&shard_files);
assert_eq!(reader.count(), 0);

// Test that skipping an empty set works correctly.
let mut reader = UnsortedShardReader::<u8>::open_set(&shard_files);
let skipped = reader.skip_lazy(10).unwrap();
assert_eq!(0, skipped);
assert_eq!(reader.count(), 0);
}
}
31 changes: 30 additions & 1 deletion src/unsorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,28 @@ where
}
}

/// Compute the total number of elements in this set of shard files.
pub fn len<P: AsRef<Path>>(shard_files: &[P]) -> Result<usize, Error>
where
<S as SortKey<T>>::Key: 'static,
{
// Create a set reader, and consume all the files, just getting counts.
let files_reader = Self::open_set(shard_files);
let mut count = 0;
for file in files_reader.shard_reader_iter {
let file = file?;
count += file.len();
}
Ok(count)
}

/// Skip the first count items that would be returned by iteration.
/// This avoids reading anything into memory besides the indexes for each
/// file, and the first chunk that wouldn't be skipped.
///
/// Returns the number of items skipped. This will only ever be less than
/// count if we exhausted all shard files.
pub fn skip(&mut self, count: usize) -> Result<usize, Error> {
pub fn skip_lazy(&mut self, count: usize) -> Result<usize, Error> {
let mut skipped = 0;
loop {
let Some(mut file_reader) = self.take_active_reader().transpose()? else {
Expand All @@ -97,6 +112,7 @@ where
Ok(skipped)
}

/// Take the current active shard reader, or advance to the next.
fn take_active_reader(&mut self) -> Option<Result<UnsortedShardFileReader<T, S>, Error>> {
let reader = if let Some(reader) = self.active_shard_reader.take() {
reader
Expand Down Expand Up @@ -160,6 +176,7 @@ struct UnsortedShardFileReader<T, S = DefaultSort>
where
S: SortKey<T>,
{
count: usize,
file_index_iter: Box<dyn Iterator<Item = KeylessShardRecord>>,
shard_iter: Option<UnsortedShardIter<T>>,
phantom: PhantomData<S>,
Expand All @@ -178,6 +195,7 @@ where
<S as SortKey<T>>::Key: 'static,
{
let reader = ShardReaderSingle::<T, S>::open(path)?;
let count = reader.len();
let mut file_index_iter = reader.index.into_iter().map(|r| KeylessShardRecord {
offset: r.offset,
len_bytes: r.len_bytes,
Expand All @@ -188,12 +206,18 @@ where
return Ok(None);
};
Ok(Some(Self {
count,
shard_iter: Some(UnsortedShardIter::new(reader.file, first_index_entry)?),
file_index_iter: Box::new(file_index_iter),
phantom: Default::default(),
}))
}

/// Return the total number of items in this shard file.
pub fn len(&self) -> usize {
self.count
}

/// Get the next item out of this file.
pub fn next(&mut self) -> Result<Option<T>, Error> {
loop {
Expand Down Expand Up @@ -240,6 +264,11 @@ where
skipped,
exhausted: false,
});
} else {
// Advance to the next shard, if there is one.
if let Some(next_index_record) = self.file_index_iter.next() {
self.shard_iter = Some(shard_iter.reset(next_index_record)?);
}
}
}
}
Expand Down