Skip to content

Commit cc6c9ac

Browse files
authored
Reduce Usage of Compression Format Detection (#267)
We use a KL-divergence based predictor to select between the LZ4 and BG-LZ4 schemes. This is done for every chunk and seems to consume about 20% CPU time. Instead, we only run this for the first Xorb, lock the scheme and use it from then on. - Removes compression scheme from RemoteClient construction and instead as an argument to put (xorb) - FileUploadSession is used to track compression scheme - Other changes demanded by clippy.
1 parent 7028446 commit cc6c9ac

File tree

8 files changed

+78
-58
lines changed

8 files changed

+78
-58
lines changed

cas_client/src/interface.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::path::PathBuf;
55
use std::sync::Arc;
66

77
use async_trait::async_trait;
8+
use cas_object::CompressionScheme;
89
use cas_types::{FileRange, QueryReconstructionResponse};
910
use mdb_shard::shard_file_reconstructor::FileReconstructor;
1011
use merklehash::MerkleHash;
@@ -35,6 +36,7 @@ pub trait UploadClient {
3536
hash: &MerkleHash,
3637
data: Vec<u8>,
3738
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
39+
compression: Option<CompressionScheme>,
3840
) -> Result<usize>;
3941

4042
/// Check if a XORB already exists.

cas_client/src/local_client.rs

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55

66
use anyhow::anyhow;
77
use async_trait::async_trait;
8-
use cas_object::CasObject;
8+
use cas_object::{CasObject, CompressionScheme};
99
use cas_types::{FileRange, Key};
1010
use file_utils::SafeFileCreator;
1111
use heed::types::*;
@@ -230,6 +230,7 @@ impl UploadClient for LocalClient {
230230
hash: &MerkleHash,
231231
data: Vec<u8>,
232232
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
233+
compression: Option<CompressionScheme>,
233234
) -> Result<usize> {
234235
// no empty writes
235236
if chunk_and_boundaries.is_empty() || data.is_empty() {
@@ -252,13 +253,7 @@ impl UploadClient for LocalClient {
252253
info!("Writing XORB {hash:?} to local path {file_path:?}");
253254

254255
let mut file = SafeFileCreator::new(&file_path)?;
255-
let (_, bytes_written) = CasObject::serialize(
256-
&mut file,
257-
hash,
258-
&data,
259-
&chunk_and_boundaries,
260-
Some(cas_object::CompressionScheme::None),
261-
)?;
256+
let (_, bytes_written) = CasObject::serialize(&mut file, hash, &data, &chunk_and_boundaries, compression)?;
262257
file.close()?;
263258

264259
// attempt to set to readonly on unix.
@@ -451,7 +446,10 @@ mod tests {
451446

452447
// Act & Assert
453448
let client = LocalClient::temporary().unwrap();
454-
assert!(client.put("key", &hash, data, vec![(hash, chunk_boundaries)]).await.is_ok());
449+
assert!(client
450+
.put("key", &hash, data, vec![(hash, chunk_boundaries)], None)
451+
.await
452+
.is_ok());
455453

456454
let returned_data = client.get(&hash).unwrap();
457455
assert_eq!(data_again, returned_data);
@@ -465,7 +463,7 @@ mod tests {
465463

466464
// Act & Assert
467465
let client = LocalClient::temporary().unwrap();
468-
assert!(client.put("", &c.info.cashash, data, chunk_boundaries).await.is_ok());
466+
assert!(client.put("", &c.info.cashash, data, chunk_boundaries, None).await.is_ok());
469467

470468
let returned_data = client.get(&c.info.cashash).unwrap();
471469
assert_eq!(data_again, returned_data);
@@ -479,14 +477,14 @@ mod tests {
479477
// Act & Assert
480478
let client = LocalClient::temporary().unwrap();
481479
assert!(client
482-
.put("", &c.info.cashash, data.clone(), chunk_and_boundaries.clone())
480+
.put("", &c.info.cashash, data.clone(), chunk_and_boundaries.clone(), None)
483481
.await
484482
.is_ok());
485483

486484
let ranges: Vec<(u32, u32)> = vec![(0, 1), (2, 3)];
487485
let returned_ranges = client.get_object_range(&c.info.cashash, ranges).unwrap();
488486

489-
let expected = vec![
487+
let expected = [
490488
data[0..chunk_and_boundaries[0].1 as usize].to_vec(),
491489
data[chunk_and_boundaries[1].1 as usize..chunk_and_boundaries[2].1 as usize].to_vec(),
492490
];
@@ -504,7 +502,7 @@ mod tests {
504502

505503
// Act
506504
let client = LocalClient::temporary().unwrap();
507-
assert!(client.put("", &c.info.cashash, data, chunk_boundaries).await.is_ok());
505+
assert!(client.put("", &c.info.cashash, data, chunk_boundaries, None).await.is_ok());
508506
let len = client.get_length(&c.info.cashash).unwrap();
509507

510508
// Assert
@@ -530,13 +528,13 @@ mod tests {
530528
// write "hello world"
531529
let client = LocalClient::temporary().unwrap();
532530
client
533-
.put("default", &hello_hash, hello.clone(), vec![(hello_hash, hello.len() as u32)])
531+
.put("default", &hello_hash, hello.clone(), vec![(hello_hash, hello.len() as u32)], None)
534532
.await
535533
.unwrap();
536534

537535
// put the same value a second time. This should be ok.
538536
client
539-
.put("default", &hello_hash, hello.clone(), vec![(hello_hash, hello.len() as u32)])
537+
.put("default", &hello_hash, hello.clone(), vec![(hello_hash, hello.len() as u32)], None)
540538
.await
541539
.unwrap();
542540

@@ -555,7 +553,13 @@ mod tests {
555553
assert_eq!(
556554
CasClientError::InvalidArguments,
557555
client
558-
.put("hellp2", &hello_hash, "hellp wod".as_bytes().to_vec(), vec![(hello_hash, hello.len() as u32)],)
556+
.put(
557+
"hellp2",
558+
&hello_hash,
559+
"hellp wod".as_bytes().to_vec(),
560+
vec![(hello_hash, hello.len() as u32)],
561+
None
562+
)
559563
.await
560564
.unwrap_err()
561565
);
@@ -569,6 +573,7 @@ mod tests {
569573
&hello_hash,
570574
"hello world again".as_bytes().to_vec(),
571575
vec![(hello_hash, hello.len() as u32)],
576+
None
572577
)
573578
.await
574579
.unwrap_err()
@@ -577,7 +582,7 @@ mod tests {
577582
// empty writes should fail
578583
assert_eq!(
579584
CasClientError::InvalidArguments,
580-
client.put("key", &hello_hash, vec![], vec![],).await.unwrap_err()
585+
client.put("key", &hello_hash, vec![], vec![], None).await.unwrap_err()
581586
);
582587

583588
// compute a hash of something we do not have in the store
@@ -621,7 +626,7 @@ mod tests {
621626
// insert should succeed
622627
let client = LocalClient::temporary().unwrap();
623628
client
624-
.put("key", &final_hash, "helloworld".as_bytes().to_vec(), vec![(hello_hash, 5), (world_hash, 10)])
629+
.put("key", &final_hash, "helloworld".as_bytes().to_vec(), vec![(hello_hash, 5), (world_hash, 10)], None)
625630
.await
626631
.unwrap();
627632
}

cas_client/src/remote_client.rs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ utils::configurable_bool_constants! {
5959

6060
pub struct RemoteClient {
6161
endpoint: String,
62-
compression: Option<CompressionScheme>,
6362
dry_run: bool,
6463
http_client: Arc<ClientWithMiddleware>,
6564
authenticated_http_client: Arc<ClientWithMiddleware>,
@@ -74,7 +73,6 @@ impl RemoteClient {
7473
pub fn new(
7574
threadpool: Arc<ThreadPool>,
7675
endpoint: &str,
77-
compression: Option<CompressionScheme>,
7876
auth: &Option<AuthConfig>,
7977
cache_config: &Option<CacheConfig>,
8078
shard_cache_directory: PathBuf,
@@ -101,7 +99,6 @@ impl RemoteClient {
10199

102100
Self {
103101
endpoint: endpoint.to_string(),
104-
compression,
105102
dry_run,
106103
authenticated_http_client: Arc::new(
107104
http_client::build_auth_http_client(auth, RetryConfig::default()).unwrap(),
@@ -126,13 +123,14 @@ impl UploadClient for RemoteClient {
126123
hash: &MerkleHash,
127124
data: Vec<u8>,
128125
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
126+
compression: Option<CompressionScheme>,
129127
) -> Result<usize> {
130128
let key = Key {
131129
prefix: prefix.to_string(),
132130
hash: *hash,
133131
};
134132

135-
let (was_uploaded, nbytes_trans) = self.upload(&key, data, chunk_and_boundaries).await?;
133+
let (was_uploaded, nbytes_trans) = self.upload(&key, data, chunk_and_boundaries, compression).await?;
136134

137135
if !was_uploaded {
138136
debug!("{key:?} not inserted into CAS.");
@@ -281,13 +279,14 @@ impl RemoteClient {
281279
key: &Key,
282280
contents: Vec<u8>,
283281
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
282+
compression: Option<CompressionScheme>,
284283
) -> Result<(bool, usize)> {
285284
let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?;
286285

287286
let mut writer = Cursor::new(Vec::new());
288287

289288
let (_, nbytes_trans) =
290-
CasObject::serialize(&mut writer, &key.hash, &contents, &chunk_and_boundaries, self.compression)?;
289+
CasObject::serialize(&mut writer, &key.hash, &contents, &chunk_and_boundaries, compression)?;
291290
// free memory before the "slow" network transfer below
292291
drop(contents);
293292

@@ -754,18 +753,14 @@ mod tests {
754753
let (c, _, data, chunk_boundaries) = build_cas_object(3, ChunkSize::Random(512, 10248), CompressionScheme::LZ4);
755754

756755
let threadpool = Arc::new(ThreadPool::new().unwrap());
757-
let client = RemoteClient::new(
758-
threadpool.clone(),
759-
CAS_ENDPOINT,
760-
Some(CompressionScheme::LZ4),
761-
&None,
762-
&None,
763-
"".into(),
764-
false,
765-
);
756+
let client = RemoteClient::new(threadpool.clone(), CAS_ENDPOINT, &None, &None, "".into(), false);
766757
// Act
767758
let result = threadpool
768-
.external_run_async_task(async move { client.put(prefix, &c.info.cashash, data, chunk_boundaries).await })
759+
.external_run_async_task(async move {
760+
client
761+
.put(prefix, &c.info.cashash, data, chunk_boundaries, Some(CompressionScheme::LZ4))
762+
.await
763+
})
769764
.unwrap();
770765

771766
// Assert
@@ -1093,7 +1088,7 @@ mod tests {
10931088
file_range: FileRange::new(SKIP_BYTES, FILE_SIZE - SKIP_BYTES),
10941089
expected_data: [
10951090
&raw_data[SKIP_BYTES as usize..(5 * CHUNK_SIZE) as usize],
1096-
&raw_data[(6 * CHUNK_SIZE) as usize as usize..(NUM_CHUNKS * CHUNK_SIZE) as usize - SKIP_BYTES as usize],
1091+
&raw_data[(6 * CHUNK_SIZE) as usize..(NUM_CHUNKS * CHUNK_SIZE) as usize - SKIP_BYTES as usize],
10971092
]
10981093
.concat(),
10991094
expect_error: false,
@@ -1132,7 +1127,7 @@ mod tests {
11321127

11331128
// test reconstruct and sequential write
11341129
let test = test_case.clone();
1135-
let client = RemoteClient::new(threadpool.clone(), endpoint, None, &None, &None, "".into(), false);
1130+
let client = RemoteClient::new(threadpool.clone(), endpoint, &None, &None, "".into(), false);
11361131
let provider = BufferProvider::default();
11371132
let buf = provider.buf.clone();
11381133
let writer = OutputProvider::Buffer(provider);
@@ -1150,7 +1145,7 @@ mod tests {
11501145

11511146
// test reconstruct and parallel write
11521147
let test = test_case;
1153-
let client = RemoteClient::new(threadpool.clone(), endpoint, None, &None, &None, "".into(), false);
1148+
let client = RemoteClient::new(threadpool.clone(), endpoint, &None, &None, "".into(), false);
11541149
let provider = BufferProvider::default();
11551150
let buf = provider.buf.clone();
11561151
let writer = OutputProvider::Buffer(provider);

cas_object/src/cas_chunk_format.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ mod tests {
213213
use std::io::Cursor;
214214

215215
use rand::Rng;
216-
use CompressionScheme;
217216

218217
use super::*;
219218

cas_object/src/cas_object_format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1856,7 +1856,7 @@ mod tests {
18561856
+ size_of::<CasObjectIdent>()
18571857
+ size_of::<u8>();
18581858

1859-
let chunks = xorb_bytes[start_pos..].chunks(10).map(|c| Ok(c)).collect::<Vec<_>>();
1859+
let chunks = xorb_bytes[start_pos..].chunks(10).map(Ok).collect::<Vec<_>>();
18601860
let mut xorb_footer_async_reader = futures::stream::iter(chunks).into_async_read();
18611861
let cas_object_result =
18621862
CasObject::deserialize_async(&mut xorb_footer_async_reader, CAS_OBJECT_FORMAT_VERSION).await;

cas_object/src/compression_scheme.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub enum CompressionScheme {
2323
LZ4 = 1,
2424
ByteGrouping4LZ4 = 2, // 4 byte groups
2525
}
26+
pub const NUM_COMPRESSION_SCHEMES: usize = 3;
2627

2728
impl Display for CompressionScheme {
2829
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -288,47 +289,39 @@ mod tests {
288289
let random_u8s: Vec<_> = (0..n).map(|_| rng.gen_range(0..255)).collect();
289290
let random_f32s_ng1_1: Vec<_> = (0..n / size_of::<f32>())
290291
.map(|_| rng.gen_range(-1.0f32..=1.0))
291-
.map(|f| f.to_le_bytes())
292-
.flatten()
292+
.flat_map(|f| f.to_le_bytes())
293293
.collect();
294294
let random_f32s_0_2: Vec<_> = (0..n / size_of::<f32>())
295295
.map(|_| rng.gen_range(0f32..=2.0))
296-
.map(|f| f.to_le_bytes())
297-
.flatten()
296+
.flat_map(|f| f.to_le_bytes())
298297
.collect();
299298
let random_f64s_ng1_1: Vec<_> = (0..n / size_of::<f64>())
300299
.map(|_| rng.gen_range(-1.0f64..=1.0))
301-
.map(|f| f.to_le_bytes())
302-
.flatten()
300+
.flat_map(|f| f.to_le_bytes())
303301
.collect();
304302
let random_f64s_0_2: Vec<_> = (0..n / size_of::<f64>())
305303
.map(|_| rng.gen_range(0f64..=2.0))
306-
.map(|f| f.to_le_bytes())
307-
.flatten()
304+
.flat_map(|f| f.to_le_bytes())
308305
.collect();
309306

310307
// f16, a.k.a binary16 format: sign (1 bit), exponent (5 bit), mantissa (10 bit)
311308
let random_f16s_ng1_1: Vec<_> = (0..n / size_of::<f16>())
312309
.map(|_| f16::from_f32(rng.gen_range(-1.0f32..=1.0)))
313-
.map(|f| f.to_le_bytes())
314-
.flatten()
310+
.flat_map(|f| f.to_le_bytes())
315311
.collect();
316312
let random_f16s_0_2: Vec<_> = (0..n / size_of::<f16>())
317313
.map(|_| f16::from_f32(rng.gen_range(0f32..=2.0)))
318-
.map(|f| f.to_le_bytes())
319-
.flatten()
314+
.flat_map(|f| f.to_le_bytes())
320315
.collect();
321316

322317
// bf16 format: sign (1 bit), exponent (8 bit), mantissa (7 bit)
323318
let random_bf16s_ng1_1: Vec<_> = (0..n / size_of::<bf16>())
324319
.map(|_| bf16::from_f32(rng.gen_range(-1.0f32..=1.0)))
325-
.map(|f| f.to_le_bytes())
326-
.flatten()
320+
.flat_map(|f| f.to_le_bytes())
327321
.collect();
328322
let random_bf16s_0_2: Vec<_> = (0..n / size_of::<bf16>())
329323
.map(|_| bf16::from_f32(rng.gen_range(0f32..=2.0)))
330-
.map(|f| f.to_le_bytes())
331-
.flatten()
324+
.flat_map(|f| f.to_le_bytes())
332325
.collect();
333326

334327
let dataset = [

0 commit comments

Comments
 (0)