Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Soft treshhold for the max batch size in bytes #269

Merged
merged 4 commits into from
Apr 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 70 additions & 17 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ pub struct ProducerOptions {
pub schema: Option<Schema>,
/// batch message size
pub batch_size: Option<u32>,
/// batch size in bytes treshold (only relevant when batch_size active).
/// batch is sent when batch size in bytes is reached
pub batch_byte_size: Option<usize>,
/// algorithm used to compress the messages
pub compression: Option<Compression>,
/// producer access mode: shared = 0, exclusive = 1, waitforexclusive =2,
Expand Down Expand Up @@ -425,7 +428,7 @@ struct TopicProducer<Exe: Executor> {
name: ProducerName,
topic: String,
message_id: SerialId,
//putting it in a mutex because we must send multiple messages at once
// putting it in a mutex because we must send multiple messages at once
// while we might be pushing more messages from elsewhere
batch: Option<Mutex<Batch>>,
compression: Option<Compression>,
Expand All @@ -449,6 +452,7 @@ impl<Exe: Executor> TopicProducer<Exe> {

let topic = topic.clone();
let batch_size = options.batch_size;
let batch_byte_size = options.batch_byte_size;
let compression = options.compression.clone();
let mut connection = client.manager.get_connection(&addr).await?;

Expand All @@ -463,14 +467,18 @@ impl<Exe: Executor> TopicProducer<Exe> {
)
.await?;

let batch = batch_size
.map(|batch_size| Batch::new(batch_size, batch_byte_size))
.map(Mutex::new);

Ok(TopicProducer {
client,
connection,
id: producer_id,
name: producer_name,
topic,
message_id: sequence_ids,
batch: batch_size.map(Batch::new).map(Mutex::new),
batch,
compression,
options,
})
Expand Down Expand Up @@ -722,7 +730,11 @@ impl<Exe: Executor> TopicProducer<Exe> {
)
.await?;

self.batch = self.options.batch_size.map(Batch::new).map(Mutex::new);
self.batch = self
.options
.batch_size
.map(|batch_size| Batch::new(batch_size, self.options.batch_byte_size))
.map(Mutex::new);

Ok(())
}
Expand Down Expand Up @@ -859,38 +871,79 @@ impl<Exe: Executor> ProducerBuilder<Exe> {
}
}

struct BatchStorage {
size: usize,
storage: VecDeque<(
oneshot::Sender<Result<CommandSendReceipt, Error>>,
BatchedMessage,
)>,
}

impl BatchStorage {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn new(length: u32) -> BatchStorage {
BatchStorage {
size: 0,
storage: VecDeque::with_capacity(length as usize),
}
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn push_back(
&mut self,
tx: oneshot::Sender<Result<CommandSendReceipt, Error>>,
batched: BatchedMessage,
) {
self.size += batched.metadata.payload_size as usize;
self.storage.push_back((tx, batched))
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn get_messages(
&mut self,
) -> Vec<(
oneshot::Sender<Result<CommandSendReceipt, Error>>,
BatchedMessage,
)> {
self.size = 0;
self.storage.drain(..).collect()
}
}

struct Batch {
// max number of message
pub length: u32,
// message bytes threshold
pub size: Option<usize>,
// put it in a mutex because the design of Producer requires an immutable TopicProducer,
// so we cannot have a mutable Batch in a send_raw(&mut self, ...)
#[allow(clippy::type_complexity)]
pub storage: Mutex<
VecDeque<(
oneshot::Sender<Result<proto::CommandSendReceipt, Error>>,
BatchedMessage,
)>,
>,
pub storage: Mutex<BatchStorage>,
}

impl Batch {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn new(length: u32) -> Batch {
pub fn new(length: u32, size: Option<usize>) -> Batch {
Batch {
length,
storage: Mutex::new(VecDeque::with_capacity(length as usize)),
size,
storage: Mutex::new(BatchStorage::new(length)),
}
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn is_full(&self) -> bool {
self.storage.lock().await.len() >= self.length as usize
let s = self.storage.lock().await;
match self.size {
None => s.storage.len() >= self.length as usize,
Some(size) => s.storage.len() >= self.length as usize || s.size >= size,
}
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn push_back(
&self,
msg: (
oneshot::Sender<Result<proto::CommandSendReceipt, Error>>,
oneshot::Sender<Result<CommandSendReceipt, Error>>,
ProducerMessage,
),
) {
Expand All @@ -913,17 +966,17 @@ impl Batch {
},
payload: message.payload,
};
self.storage.lock().await.push_back((tx, batched))
self.storage.lock().await.push_back(tx, batched)
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn get_messages(
&self,
) -> Vec<(
oneshot::Sender<Result<proto::CommandSendReceipt, Error>>,
oneshot::Sender<Result<CommandSendReceipt, Error>>,
BatchedMessage,
)> {
self.storage.lock().await.drain(..).collect()
self.storage.lock().await.get_messages()
}
}

Expand Down