From 358112767b2ebd3902ec70d9e7c7bfd6103b74bb Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 13 Feb 2025 01:33:55 +0200 Subject: [PATCH] [fix][client] Fix memory leak when message size exceeds max message size and batching is enabled (#23967) --- .../apache/pulsar/client/impl/BatchMessageContainerImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 7262cfd11e069..403a804b605e7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -273,6 +273,7 @@ public OpSendMsg createOpSendMsg() throws IOException { // handle mgs size check as non-batched in `ProducerImpl.isMessageSizeExceeded` if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) { + cmd.release(); producer.semaphoreRelease(1); producer.client.getMemoryLimitController().releaseMemory( messages.get(0).getUncompressedSize() + batchAllocatedSizeBytes); @@ -286,6 +287,7 @@ public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); if (encryptedPayload.readableBytes() > getMaxMessageSize()) { + encryptedPayload.release(); producer.semaphoreRelease(messages.size()); messages.forEach(msg -> producer.client.getMemoryLimitController() .releaseMemory(msg.getUncompressedSize()));