Skip to content

Commit

Permalink
feat: support configure compression level (#240)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun authored Nov 23, 2022
1 parent de59974 commit eb0416e
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 121 deletions.
13 changes: 5 additions & 8 deletions examples/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
#[macro_use]
extern crate serde;
use futures::{future::join_all, TryStreamExt};
use pulsar::{
message::proto, message::proto::command_subscribe::SubType, message::Payload, producer,
Consumer, DeserializeMessage, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor,
};
use pulsar::{message::proto::command_subscribe::SubType, message::Payload, producer, Consumer, DeserializeMessage, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor, compression};

#[derive(Debug, Serialize, Deserialize)]
struct TestData {
Expand Down Expand Up @@ -42,10 +39,10 @@ async fn main() -> Result<(), pulsar::Error> {
.with_name("my-producer2".to_string())
.with_options(producer::ProducerOptions {
batch_size: Some(4),
//compression: Some(proto::CompressionType::Lz4),
//compression: Some(proto::CompressionType::Zlib),
//compression: Some(proto::CompressionType::Zstd),
compression: Some(proto::CompressionType::Snappy),
// compression: Some(compression::Compression::Lz4(compression::CompressionLz4::default())),
// compression: Some(compression::Compression::Zlib(compression::CompressionZlib::default())),
// compression: Some(compression::Compression::Zstd(compression::CompressionZstd::default())),
compression: Some(compression::Compression::Snappy(compression::CompressionSnappy::default())),
..Default::default()
})
.build()
Expand Down
82 changes: 82 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Compression strategy configs
/// Wrapper of supported compression algorithms
#[derive(Default, Clone, Debug)]
pub enum Compression {
#[default]
None,
#[cfg(feature = "lz4")]
Lz4(CompressionLz4),
#[cfg(feature = "flate2")]
Zlib(CompressionZlib),
#[cfg(feature = "zstd")]
Zstd(CompressionZstd),
#[cfg(feature = "snap")]
Snappy(CompressionSnappy),
}

/// Options of the [lz4](https://lz4.github.io/lz4/) algorithm
#[cfg(feature = "lz4")]
#[derive(Debug)]
pub struct CompressionLz4 {
/// compression mode of lz4 to be used
pub mode: lz4::block::CompressionMode,
}

// FIXME: can be omitted if https://github.com/10XGenomics/lz4-rs/pull/31 released
#[cfg(feature = "lz4")]
impl Default for CompressionLz4 {
fn default() -> Self {
CompressionLz4 {
mode: lz4::block::CompressionMode::DEFAULT,
}
}
}

// FIXME: can be omitted if https://github.com/10XGenomics/lz4-rs/pull/30 released
#[cfg(feature = "lz4")]
impl Clone for CompressionLz4 {
fn clone(&self) -> Self {
use lz4::block::CompressionMode;

CompressionLz4 {
mode: match self.mode {
CompressionMode::HIGHCOMPRESSION(i) => CompressionMode::HIGHCOMPRESSION(i),
CompressionMode::FAST(i) => CompressionMode::FAST(i),
CompressionMode::DEFAULT => CompressionMode::DEFAULT,
}
}
}
}

/// Options of the [zlib](https://www.zlib.net/) algorithm
#[cfg(feature = "flate2")]
#[derive(Default, Clone, Copy, Debug)]
pub struct CompressionZlib {
/// compression level of zlib to be used (0-9)
pub level: flate2::Compression,
}

/// Options of the [zstd](http://facebook.github.io/zstd/zstd_manual.html) algorithm
#[cfg(feature = "zstd")]
#[derive(Clone, Copy, Debug)]
pub struct CompressionZstd {
/// compression level of zstd to be used ([`zstd::compression_level_range()`])
pub level: i32,
}

#[cfg(feature = "zstd")]
impl Default for CompressionZstd {
fn default() -> Self {
CompressionZstd {
level: zstd::DEFAULT_COMPRESSION_LEVEL,
}
}
}

/// Options of the [snappy](http://google.github.io/snappy/) algorithm
#[cfg(feature = "snap")]
#[derive(Default, Clone, Copy, Debug)]
pub struct CompressionSnappy {
// empty for extensions
}
35 changes: 18 additions & 17 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,12 +1115,23 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
message: CommandMessage,
mut payload: Payload,
) -> Result<(), Error> {
let compression = payload.metadata.compression;
let compression = match payload.metadata.compression {
None => proto::CompressionType::None,
Some(compression) => {
proto::CompressionType::from_i32(compression)
.ok_or_else(|| {
error!("unknown compression type: {}", compression);
Error::Consumer(ConsumerError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
format!("unknown compression type: {}", compression),
)))
})?
}
};

let payload = match compression {
None | Some(0) => payload,
// LZ4
Some(1) => {
proto::CompressionType::None => payload,
proto::CompressionType::Lz4 => {
#[cfg(not(feature = "lz4"))]
{
return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
Expand All @@ -1142,8 +1153,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
payload
}
}
// zlib
Some(2) => {
proto::CompressionType::Zlib => {
#[cfg(not(feature = "flate2"))]
{
return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
Expand All @@ -1166,8 +1176,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
payload
}
}
// zstd
Some(3) => {
proto::CompressionType::Zstd => {
#[cfg(not(feature = "zstd"))]
{
return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
Expand All @@ -1186,8 +1195,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
payload
}
}
// Snappy
Some(4) => {
proto::CompressionType::Snappy => {
#[cfg(not(feature = "snap"))]
{
return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
Expand All @@ -1209,13 +1217,6 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
payload
}
}
Some(i) => {
error!("unknown compression type: {}", i);
return Err(Error::Consumer(ConsumerError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
format!("unknown compression type: {}", i),
))));
}
};

match payload.metadata.num_messages_in_batch {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub use producer::{MultiTopicProducer, Producer, ProducerOptions};

pub mod authentication;
mod client;
pub mod compression;
mod connection;
mod connection_manager;
pub mod consumer;
Expand Down
153 changes: 57 additions & 96 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use crate::client::SerializeMessage;
use crate::connection::{Connection, SerialId};
use crate::error::{ConnectionError, ProducerError};
use crate::executor::Executor;
use crate::message::proto::{self, CommandSendReceipt, CompressionType, EncryptionKeys, Schema};
use crate::message::proto::{self, CommandSendReceipt, EncryptionKeys, Schema};
use crate::message::BatchedMessage;
use crate::{Error, Pulsar};
use futures::task::{Context, Poll};
use futures::Future;
use crate::compression::{Compression};

type ProducerId = u64;
type ProducerName = String;
Expand Down Expand Up @@ -127,7 +128,7 @@ pub struct ProducerOptions {
/// batch message size
pub batch_size: Option<u32>,
/// algorithm used to compress the messages
pub compression: Option<proto::CompressionType>,
pub compression: Option<Compression>,
/// producer access mode: shared = 0, exclusive = 1, waitforexclusive =2, exclusivewithoutfencing =3
pub access_mode: Option<i32>,
}
Expand Down Expand Up @@ -402,7 +403,7 @@ struct TopicProducer<Exe: Executor> {
//putting it in a mutex because we must send multiple messages at once
// while we might be pushing more messages from elsewhere
batch: Option<Mutex<Batch>>,
compression: Option<proto::CompressionType>,
compression: Option<Compression>,
drop_signal: oneshot::Sender<()>,
options: ProducerOptions,
}
Expand All @@ -422,27 +423,7 @@ impl<Exe: Executor> TopicProducer<Exe> {

let topic = topic.clone();
let batch_size = options.batch_size;
let compression = options.compression;

match compression {
None | Some(CompressionType::None) => {}
Some(CompressionType::Lz4) => {
#[cfg(not(feature = "lz4"))]
return Err(Error::Custom("cannot create a producer with LZ4 compression because the 'lz4' cargo feature is not active".to_string()));
}
Some(CompressionType::Zlib) => {
#[cfg(not(feature = "flate2"))]
return Err(Error::Custom("cannot create a producer with zlib compression because the 'flate2' cargo feature is not active".to_string()));
}
Some(CompressionType::Zstd) => {
#[cfg(not(feature = "zstd"))]
return Err(Error::Custom("cannot create a producer with zstd compression because the 'zstd' cargo feature is not active".to_string()));
}
Some(CompressionType::Snappy) => {
#[cfg(not(feature = "snap"))]
return Err(Error::Custom("cannot create a producer with Snappy compression because the 'snap' cargo feature is not active".to_string()));
} //Some() => unimplemented!(),
};
let compression = options.compression.clone();

let producer_name: ProducerName;
let mut current_retries = 0u32;
Expand Down Expand Up @@ -725,83 +706,63 @@ impl<Exe: Executor> TopicProducer<Exe> {
&mut self,
mut message: ProducerMessage,
) -> Result<proto::CommandSendReceipt, Error> {
let compressed_message = match self.compression {
None | Some(CompressionType::None) => message,
Some(CompressionType::Lz4) => {
#[cfg(not(feature = "lz4"))]
return unimplemented!();

#[cfg(feature = "lz4")]
{
let compressed_payload: Vec<u8> =
lz4::block::compress(&message.payload[..], None, false)
.map_err(ProducerError::Io)?;

message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(1);
message
}
}
Some(CompressionType::Zlib) => {
#[cfg(not(feature = "flate2"))]
return unimplemented!();

#[cfg(feature = "flate2")]
{
let mut e =
flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
e.write_all(&message.payload[..])
let compressed_message = match self.compression.clone() {
None | Some(Compression::None) => message,
#[cfg(feature = "lz4")]
Some(Compression::Lz4(compression)) => {
let compressed_payload: Vec<u8> =
lz4::block::compress(&message.payload[..], Some(compression.mode), false)
.map_err(ProducerError::Io)?;
let compressed_payload = e.finish().map_err(ProducerError::Io)?;

message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(2);
message
}
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(proto::CompressionType::Lz4.into());
message
}
Some(CompressionType::Zstd) => {
#[cfg(not(feature = "zstd"))]
return unimplemented!();

#[cfg(feature = "zstd")]
{
let compressed_payload =
zstd::encode_all(&message.payload[..], 0).map_err(ProducerError::Io)?;
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(3);
message
}
#[cfg(feature = "flate2")]
Some(Compression::Zlib(compression)) => {
let mut e =
flate2::write::ZlibEncoder::new(Vec::new(), compression.level);
e.write_all(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = e.finish().map_err(ProducerError::Io)?;

message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(proto::CompressionType::Zlib.into());
message
}
Some(CompressionType::Snappy) => {
#[cfg(not(feature = "snap"))]
return unimplemented!();

#[cfg(feature = "snap")]
{
let compressed_payload: Vec<u8> = Vec::new();
let mut encoder = snap::write::FrameEncoder::new(compressed_payload);
encoder
.write(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = encoder
.into_inner()
//FIXME
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Snappy compression error: {:?}", e),
)
})
.map_err(ProducerError::Io)?;
#[cfg(feature = "zstd")]
Some(Compression::Zstd(compression)) => {
let compressed_payload =
zstd::encode_all(&message.payload[..], compression.level).map_err(ProducerError::Io)?;
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(proto::CompressionType::Zstd.into());
message
}
#[cfg(feature = "snap")]
Some(Compression::Snappy(..)) => {
let compressed_payload: Vec<u8> = Vec::new();
let mut encoder = snap::write::FrameEncoder::new(compressed_payload);
encoder
.write(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = encoder
.into_inner()
//FIXME
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Snappy compression error: {:?}", e),
)
})
.map_err(ProducerError::Io)?;

message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(4);
message
}
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(proto::CompressionType::Snappy.into());
message
}
};

Expand Down

0 comments on commit eb0416e

Please sign in to comment.