Skip to content

Commit 2fe54ea

Browse files
authored
Revert "Reduce Usage of Compression Format Detection" (#275)
Reverts #267
1 parent cc6c9ac commit 2fe54ea

File tree

8 files changed

+58
-78
lines changed

8 files changed

+58
-78
lines changed

cas_client/src/interface.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::path::PathBuf;
55
use std::sync::Arc;
66

77
use async_trait::async_trait;
8-
use cas_object::CompressionScheme;
98
use cas_types::{FileRange, QueryReconstructionResponse};
109
use mdb_shard::shard_file_reconstructor::FileReconstructor;
1110
use merklehash::MerkleHash;
@@ -36,7 +35,6 @@ pub trait UploadClient {
3635
hash: &MerkleHash,
3736
data: Vec<u8>,
3837
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
39-
compression: Option<CompressionScheme>,
4038
) -> Result<usize>;
4139

4240
/// Check if a XORB already exists.

cas_client/src/local_client.rs

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55

66
use anyhow::anyhow;
77
use async_trait::async_trait;
8-
use cas_object::{CasObject, CompressionScheme};
8+
use cas_object::CasObject;
99
use cas_types::{FileRange, Key};
1010
use file_utils::SafeFileCreator;
1111
use heed::types::*;
@@ -230,7 +230,6 @@ impl UploadClient for LocalClient {
230230
hash: &MerkleHash,
231231
data: Vec<u8>,
232232
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
233-
compression: Option<CompressionScheme>,
234233
) -> Result<usize> {
235234
// no empty writes
236235
if chunk_and_boundaries.is_empty() || data.is_empty() {
@@ -253,7 +252,13 @@ impl UploadClient for LocalClient {
253252
info!("Writing XORB {hash:?} to local path {file_path:?}");
254253

255254
let mut file = SafeFileCreator::new(&file_path)?;
256-
let (_, bytes_written) = CasObject::serialize(&mut file, hash, &data, &chunk_and_boundaries, compression)?;
255+
let (_, bytes_written) = CasObject::serialize(
256+
&mut file,
257+
hash,
258+
&data,
259+
&chunk_and_boundaries,
260+
Some(cas_object::CompressionScheme::None),
261+
)?;
257262
file.close()?;
258263

259264
// attempt to set to readonly on unix.
@@ -446,10 +451,7 @@ mod tests {
446451

447452
// Act & Assert
448453
let client = LocalClient::temporary().unwrap();
449-
assert!(client
450-
.put("key", &hash, data, vec![(hash, chunk_boundaries)], None)
451-
.await
452-
.is_ok());
454+
assert!(client.put("key", &hash, data, vec![(hash, chunk_boundaries)]).await.is_ok());
453455

454456
let returned_data = client.get(&hash).unwrap();
455457
assert_eq!(data_again, returned_data);
@@ -463,7 +465,7 @@ mod tests {
463465

464466
// Act & Assert
465467
let client = LocalClient::temporary().unwrap();
466-
assert!(client.put("", &c.info.cashash, data, chunk_boundaries, None).await.is_ok());
468+
assert!(client.put("", &c.info.cashash, data, chunk_boundaries).await.is_ok());
467469

468470
let returned_data = client.get(&c.info.cashash).unwrap();
469471
assert_eq!(data_again, returned_data);
@@ -477,14 +479,14 @@ mod tests {
477479
// Act & Assert
478480
let client = LocalClient::temporary().unwrap();
479481
assert!(client
480-
.put("", &c.info.cashash, data.clone(), chunk_and_boundaries.clone(), None)
482+
.put("", &c.info.cashash, data.clone(), chunk_and_boundaries.clone())
481483
.await
482484
.is_ok());
483485

484486
let ranges: Vec<(u32, u32)> = vec![(0, 1), (2, 3)];
485487
let returned_ranges = client.get_object_range(&c.info.cashash, ranges).unwrap();
486488

487-
let expected = [
489+
let expected = vec![
488490
data[0..chunk_and_boundaries[0].1 as usize].to_vec(),
489491
data[chunk_and_boundaries[1].1 as usize..chunk_and_boundaries[2].1 as usize].to_vec(),
490492
];
@@ -502,7 +504,7 @@ mod tests {
502504

503505
// Act
504506
let client = LocalClient::temporary().unwrap();
505-
assert!(client.put("", &c.info.cashash, data, chunk_boundaries, None).await.is_ok());
507+
assert!(client.put("", &c.info.cashash, data, chunk_boundaries).await.is_ok());
506508
let len = client.get_length(&c.info.cashash).unwrap();
507509

508510
// Assert
@@ -528,13 +530,13 @@ mod tests {
528530
// write "hello world"
529531
let client = LocalClient::temporary().unwrap();
530532
client
531-
.put("default", &hello_hash, hello.clone(), vec![(hello_hash, hello.len() as u32)], None)
533+
.put("default", &hello_hash, hello.clone(), vec![(hello_hash, hello.len() as u32)])
532534
.await
533535
.unwrap();
534536

535537
// put the same value a second time. This should be ok.
536538
client
537-
.put("default", &hello_hash, hello.clone(), vec![(hello_hash, hello.len() as u32)], None)
539+
.put("default", &hello_hash, hello.clone(), vec![(hello_hash, hello.len() as u32)])
538540
.await
539541
.unwrap();
540542

@@ -553,13 +555,7 @@ mod tests {
553555
assert_eq!(
554556
CasClientError::InvalidArguments,
555557
client
556-
.put(
557-
"hellp2",
558-
&hello_hash,
559-
"hellp wod".as_bytes().to_vec(),
560-
vec![(hello_hash, hello.len() as u32)],
561-
None
562-
)
558+
.put("hellp2", &hello_hash, "hellp wod".as_bytes().to_vec(), vec![(hello_hash, hello.len() as u32)],)
563559
.await
564560
.unwrap_err()
565561
);
@@ -573,7 +569,6 @@ mod tests {
573569
&hello_hash,
574570
"hello world again".as_bytes().to_vec(),
575571
vec![(hello_hash, hello.len() as u32)],
576-
None
577572
)
578573
.await
579574
.unwrap_err()
@@ -582,7 +577,7 @@ mod tests {
582577
// empty writes should fail
583578
assert_eq!(
584579
CasClientError::InvalidArguments,
585-
client.put("key", &hello_hash, vec![], vec![], None).await.unwrap_err()
580+
client.put("key", &hello_hash, vec![], vec![],).await.unwrap_err()
586581
);
587582

588583
// compute a hash of something we do not have in the store
@@ -626,7 +621,7 @@ mod tests {
626621
// insert should succeed
627622
let client = LocalClient::temporary().unwrap();
628623
client
629-
.put("key", &final_hash, "helloworld".as_bytes().to_vec(), vec![(hello_hash, 5), (world_hash, 10)], None)
624+
.put("key", &final_hash, "helloworld".as_bytes().to_vec(), vec![(hello_hash, 5), (world_hash, 10)])
630625
.await
631626
.unwrap();
632627
}

cas_client/src/remote_client.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ utils::configurable_bool_constants! {
5959

6060
pub struct RemoteClient {
6161
endpoint: String,
62+
compression: Option<CompressionScheme>,
6263
dry_run: bool,
6364
http_client: Arc<ClientWithMiddleware>,
6465
authenticated_http_client: Arc<ClientWithMiddleware>,
@@ -73,6 +74,7 @@ impl RemoteClient {
7374
pub fn new(
7475
threadpool: Arc<ThreadPool>,
7576
endpoint: &str,
77+
compression: Option<CompressionScheme>,
7678
auth: &Option<AuthConfig>,
7779
cache_config: &Option<CacheConfig>,
7880
shard_cache_directory: PathBuf,
@@ -99,6 +101,7 @@ impl RemoteClient {
99101

100102
Self {
101103
endpoint: endpoint.to_string(),
104+
compression,
102105
dry_run,
103106
authenticated_http_client: Arc::new(
104107
http_client::build_auth_http_client(auth, RetryConfig::default()).unwrap(),
@@ -123,14 +126,13 @@ impl UploadClient for RemoteClient {
123126
hash: &MerkleHash,
124127
data: Vec<u8>,
125128
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
126-
compression: Option<CompressionScheme>,
127129
) -> Result<usize> {
128130
let key = Key {
129131
prefix: prefix.to_string(),
130132
hash: *hash,
131133
};
132134

133-
let (was_uploaded, nbytes_trans) = self.upload(&key, data, chunk_and_boundaries, compression).await?;
135+
let (was_uploaded, nbytes_trans) = self.upload(&key, data, chunk_and_boundaries).await?;
134136

135137
if !was_uploaded {
136138
debug!("{key:?} not inserted into CAS.");
@@ -279,14 +281,13 @@ impl RemoteClient {
279281
key: &Key,
280282
contents: Vec<u8>,
281283
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
282-
compression: Option<CompressionScheme>,
283284
) -> Result<(bool, usize)> {
284285
let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?;
285286

286287
let mut writer = Cursor::new(Vec::new());
287288

288289
let (_, nbytes_trans) =
289-
CasObject::serialize(&mut writer, &key.hash, &contents, &chunk_and_boundaries, compression)?;
290+
CasObject::serialize(&mut writer, &key.hash, &contents, &chunk_and_boundaries, self.compression)?;
290291
// free memory before the "slow" network transfer below
291292
drop(contents);
292293

@@ -753,14 +754,18 @@ mod tests {
753754
let (c, _, data, chunk_boundaries) = build_cas_object(3, ChunkSize::Random(512, 10248), CompressionScheme::LZ4);
754755

755756
let threadpool = Arc::new(ThreadPool::new().unwrap());
756-
let client = RemoteClient::new(threadpool.clone(), CAS_ENDPOINT, &None, &None, "".into(), false);
757+
let client = RemoteClient::new(
758+
threadpool.clone(),
759+
CAS_ENDPOINT,
760+
Some(CompressionScheme::LZ4),
761+
&None,
762+
&None,
763+
"".into(),
764+
false,
765+
);
757766
// Act
758767
let result = threadpool
759-
.external_run_async_task(async move {
760-
client
761-
.put(prefix, &c.info.cashash, data, chunk_boundaries, Some(CompressionScheme::LZ4))
762-
.await
763-
})
768+
.external_run_async_task(async move { client.put(prefix, &c.info.cashash, data, chunk_boundaries).await })
764769
.unwrap();
765770

766771
// Assert
@@ -1088,7 +1093,7 @@ mod tests {
10881093
file_range: FileRange::new(SKIP_BYTES, FILE_SIZE - SKIP_BYTES),
10891094
expected_data: [
10901095
&raw_data[SKIP_BYTES as usize..(5 * CHUNK_SIZE) as usize],
1091-
&raw_data[(6 * CHUNK_SIZE) as usize..(NUM_CHUNKS * CHUNK_SIZE) as usize - SKIP_BYTES as usize],
1096+
&raw_data[(6 * CHUNK_SIZE) as usize as usize..(NUM_CHUNKS * CHUNK_SIZE) as usize - SKIP_BYTES as usize],
10921097
]
10931098
.concat(),
10941099
expect_error: false,
@@ -1127,7 +1132,7 @@ mod tests {
11271132

11281133
// test reconstruct and sequential write
11291134
let test = test_case.clone();
1130-
let client = RemoteClient::new(threadpool.clone(), endpoint, &None, &None, "".into(), false);
1135+
let client = RemoteClient::new(threadpool.clone(), endpoint, None, &None, &None, "".into(), false);
11311136
let provider = BufferProvider::default();
11321137
let buf = provider.buf.clone();
11331138
let writer = OutputProvider::Buffer(provider);
@@ -1145,7 +1150,7 @@ mod tests {
11451150

11461151
// test reconstruct and parallel write
11471152
let test = test_case;
1148-
let client = RemoteClient::new(threadpool.clone(), endpoint, &None, &None, "".into(), false);
1153+
let client = RemoteClient::new(threadpool.clone(), endpoint, None, &None, &None, "".into(), false);
11491154
let provider = BufferProvider::default();
11501155
let buf = provider.buf.clone();
11511156
let writer = OutputProvider::Buffer(provider);

cas_object/src/cas_chunk_format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ mod tests {
213213
use std::io::Cursor;
214214

215215
use rand::Rng;
216+
use CompressionScheme;
216217

217218
use super::*;
218219

cas_object/src/cas_object_format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1856,7 +1856,7 @@ mod tests {
18561856
+ size_of::<CasObjectIdent>()
18571857
+ size_of::<u8>();
18581858

1859-
let chunks = xorb_bytes[start_pos..].chunks(10).map(Ok).collect::<Vec<_>>();
1859+
let chunks = xorb_bytes[start_pos..].chunks(10).map(|c| Ok(c)).collect::<Vec<_>>();
18601860
let mut xorb_footer_async_reader = futures::stream::iter(chunks).into_async_read();
18611861
let cas_object_result =
18621862
CasObject::deserialize_async(&mut xorb_footer_async_reader, CAS_OBJECT_FORMAT_VERSION).await;

cas_object/src/compression_scheme.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ pub enum CompressionScheme {
2323
LZ4 = 1,
2424
ByteGrouping4LZ4 = 2, // 4 byte groups
2525
}
26-
pub const NUM_COMPRESSION_SCHEMES: usize = 3;
2726

2827
impl Display for CompressionScheme {
2928
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -289,39 +288,47 @@ mod tests {
289288
let random_u8s: Vec<_> = (0..n).map(|_| rng.gen_range(0..255)).collect();
290289
let random_f32s_ng1_1: Vec<_> = (0..n / size_of::<f32>())
291290
.map(|_| rng.gen_range(-1.0f32..=1.0))
292-
.flat_map(|f| f.to_le_bytes())
291+
.map(|f| f.to_le_bytes())
292+
.flatten()
293293
.collect();
294294
let random_f32s_0_2: Vec<_> = (0..n / size_of::<f32>())
295295
.map(|_| rng.gen_range(0f32..=2.0))
296-
.flat_map(|f| f.to_le_bytes())
296+
.map(|f| f.to_le_bytes())
297+
.flatten()
297298
.collect();
298299
let random_f64s_ng1_1: Vec<_> = (0..n / size_of::<f64>())
299300
.map(|_| rng.gen_range(-1.0f64..=1.0))
300-
.flat_map(|f| f.to_le_bytes())
301+
.map(|f| f.to_le_bytes())
302+
.flatten()
301303
.collect();
302304
let random_f64s_0_2: Vec<_> = (0..n / size_of::<f64>())
303305
.map(|_| rng.gen_range(0f64..=2.0))
304-
.flat_map(|f| f.to_le_bytes())
306+
.map(|f| f.to_le_bytes())
307+
.flatten()
305308
.collect();
306309

307310
// f16, a.k.a binary16 format: sign (1 bit), exponent (5 bit), mantissa (10 bit)
308311
let random_f16s_ng1_1: Vec<_> = (0..n / size_of::<f16>())
309312
.map(|_| f16::from_f32(rng.gen_range(-1.0f32..=1.0)))
310-
.flat_map(|f| f.to_le_bytes())
313+
.map(|f| f.to_le_bytes())
314+
.flatten()
311315
.collect();
312316
let random_f16s_0_2: Vec<_> = (0..n / size_of::<f16>())
313317
.map(|_| f16::from_f32(rng.gen_range(0f32..=2.0)))
314-
.flat_map(|f| f.to_le_bytes())
318+
.map(|f| f.to_le_bytes())
319+
.flatten()
315320
.collect();
316321

317322
// bf16 format: sign (1 bit), exponent (8 bit), mantissa (7 bit)
318323
let random_bf16s_ng1_1: Vec<_> = (0..n / size_of::<bf16>())
319324
.map(|_| bf16::from_f32(rng.gen_range(-1.0f32..=1.0)))
320-
.flat_map(|f| f.to_le_bytes())
325+
.map(|f| f.to_le_bytes())
326+
.flatten()
321327
.collect();
322328
let random_bf16s_0_2: Vec<_> = (0..n / size_of::<bf16>())
323329
.map(|_| bf16::from_f32(rng.gen_range(0f32..=2.0)))
324-
.flat_map(|f| f.to_le_bytes())
330+
.map(|f| f.to_le_bytes())
331+
.flatten()
325332
.collect();
326333

327334
let dataset = [

0 commit comments

Comments
 (0)