Skip to content

Commit

Permalink
conditional compilation
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Nov 20, 2022
1 parent 24375c7 commit c889a08
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 113 deletions.
67 changes: 40 additions & 27 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,71 @@
//! Compression strategy configs
use lz4::block::CompressionMode;

/// Wrapper of supported compression algorithms
#[derive(Default, Clone, Debug)]
pub enum Compression {
#[default]
None,
#[cfg(feature = "lz4")]
Lz4(CompressionLz4),
#[cfg(feature = "flate2")]
Zlib(CompressionZlib),
#[cfg(feature = "zstd")]
Zstd(CompressionZstd),
#[cfg(feature = "snap")]
Snappy(CompressionSnappy),
}

/// Options of the [lz4](https://lz4.github.io/lz4/) algorithm
#[cfg(feature = "lz4")]
#[derive(Debug)]
pub struct CompressionLz4 {
/// compression mode of lz4 to be used
pub mode: CompressionMode,
pub mode: lz4::block::CompressionMode,
}

// FIXME: can be omitted if https://github.com/10XGenomics/lz4-rs/pull/31 released
#[cfg(feature = "lz4")]
impl Default for CompressionLz4 {
fn default() -> Self {
CompressionLz4 {
mode: lz4::block::CompressionMode::DEFAULT,
}
}
}

// FIXME: can be omitted if https://github.com/10XGenomics/lz4-rs/pull/30 released
#[cfg(feature = "lz4")]
impl Clone for CompressionLz4 {
fn clone(&self) -> Self {
use lz4::block::CompressionMode;

CompressionLz4 {
mode: match self.mode {
CompressionMode::HIGHCOMPRESSION(i) => CompressionMode::HIGHCOMPRESSION(i),
CompressionMode::FAST(i) => CompressionMode::FAST(i),
CompressionMode::DEFAULT => CompressionMode::DEFAULT,
}
}
}
}

/// Options of the [zlib](https://www.zlib.net/) algorithm
#[cfg(feature = "flate2")]
#[derive(Default, Clone, Copy, Debug)]
pub struct CompressionZlib {
/// compression level of zlib to be used (0-9)
pub level: flate2::Compression,
}

/// Options of the [zstd](http://facebook.github.io/zstd/zstd_manual.html) algorithm
#[cfg(feature = "zstd")]
#[derive(Clone, Copy, Debug)]
pub struct CompressionZstd {
/// compression level of zstd to be used ([`zstd::compression_level_range()`])
pub level: i32,
}

/// Options of the [snappy](http://google.github.io/snappy/) algorithm
#[derive(Default, Clone, Copy, Debug)]
pub struct CompressionSnappy {
// empty for extensions
}

impl Default for CompressionLz4 {
fn default() -> Self {
CompressionLz4 {
mode: CompressionMode::DEFAULT,
}
}
}

#[cfg(feature = "zstd")]
impl Default for CompressionZstd {
fn default() -> Self {
CompressionZstd {
Expand All @@ -56,14 +74,9 @@ impl Default for CompressionZstd {
}
}

impl Clone for CompressionLz4 {
fn clone(&self) -> Self {
CompressionLz4 {
mode: match self.mode {
CompressionMode::HIGHCOMPRESSION(i) => CompressionMode::HIGHCOMPRESSION(i),
CompressionMode::FAST(i) => CompressionMode::FAST(i),
CompressionMode::DEFAULT => CompressionMode::DEFAULT,
}
}
}
/// Options of the [snappy](http://google.github.io/snappy/) algorithm
#[cfg(feature = "snap")]
#[derive(Default, Clone, Copy, Debug)]
pub struct CompressionSnappy {
// empty for extensions
}
132 changes: 46 additions & 86 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,26 +425,6 @@ impl<Exe: Executor> TopicProducer<Exe> {
let batch_size = options.batch_size;
let compression = options.compression.clone();

match compression {
None | Some(Compression::None) => {}
Some(Compression::Lz4(..)) => {
#[cfg(not(feature = "lz4"))]
return Err(Error::Custom("cannot create a producer with LZ4 compression because the 'lz4' cargo feature is not active".to_string()));
}
Some(Compression::Zlib(..)) => {
#[cfg(not(feature = "flate2"))]
return Err(Error::Custom("cannot create a producer with zlib compression because the 'flate2' cargo feature is not active".to_string()));
}
Some(Compression::Zstd(..)) => {
#[cfg(not(feature = "zstd"))]
return Err(Error::Custom("cannot create a producer with zstd compression because the 'zstd' cargo feature is not active".to_string()));
}
Some(Compression::Snappy(..)) => {
#[cfg(not(feature = "snap"))]
return Err(Error::Custom("cannot create a producer with Snappy compression because the 'snap' cargo feature is not active".to_string()));
}
};

let producer_name: ProducerName;
let mut current_retries = 0u32;
let start = std::time::Instant::now();
Expand Down Expand Up @@ -728,81 +708,61 @@ impl<Exe: Executor> TopicProducer<Exe> {
) -> Result<proto::CommandSendReceipt, Error> {
let compressed_message = match self.compression.clone() {
None | Some(Compression::None) => message,
#[cfg(feature = "lz4")]
Some(Compression::Lz4(compression)) => {
#[cfg(not(feature = "lz4"))]
return unimplemented!();
let compressed_payload: Vec<u8> =
lz4::block::compress(&message.payload[..], Some(compression.mode), false)
.map_err(ProducerError::Io)?;

#[cfg(feature = "lz4")]
{
let compressed_payload: Vec<u8> =
lz4::block::compress(&message.payload[..], Some(compression.mode), false)
.map_err(ProducerError::Io)?;

message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(1);
message
}
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(1);
message
}
#[cfg(feature = "flate2")]
Some(Compression::Zlib(compression)) => {
#[cfg(not(feature = "flate2"))]
return unimplemented!();

#[cfg(feature = "flate2")]
{
let mut e =
flate2::write::ZlibEncoder::new(Vec::new(), compression.level);
e.write_all(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = e.finish().map_err(ProducerError::Io)?;

message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(2);
message
}
let mut e =
flate2::write::ZlibEncoder::new(Vec::new(), compression.level);
e.write_all(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = e.finish().map_err(ProducerError::Io)?;

message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(2);
message
}
#[cfg(feature = "zstd")]
Some(Compression::Zstd(compression)) => {
#[cfg(not(feature = "zstd"))]
return unimplemented!();

#[cfg(feature = "zstd")]
{
let compressed_payload =
zstd::encode_all(&message.payload[..], compression.level).map_err(ProducerError::Io)?;
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(3);
message
}
let compressed_payload =
zstd::encode_all(&message.payload[..], compression.level).map_err(ProducerError::Io)?;
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(3);
message
}
#[cfg(feature = "snap")]
Some(Compression::Snappy(..)) => {
#[cfg(not(feature = "snap"))]
return unimplemented!();

#[cfg(feature = "snap")]
{
let compressed_payload: Vec<u8> = Vec::new();
let mut encoder = snap::write::FrameEncoder::new(compressed_payload);
encoder
.write(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = encoder
.into_inner()
//FIXME
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Snappy compression error: {:?}", e),
)
})
.map_err(ProducerError::Io)?;
let compressed_payload: Vec<u8> = Vec::new();
let mut encoder = snap::write::FrameEncoder::new(compressed_payload);
encoder
.write(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = encoder
.into_inner()
//FIXME
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Snappy compression error: {:?}", e),
)
})
.map_err(ProducerError::Io)?;

message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(4);
message
}
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(4);
message
}
};

Expand Down

0 comments on commit c889a08

Please sign in to comment.