diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 7262cfd11e069..403a804b605e7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -273,6 +273,7 @@ public OpSendMsg createOpSendMsg() throws IOException { // handle mgs size check as non-batched in `ProducerImpl.isMessageSizeExceeded` if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) { + cmd.release(); producer.semaphoreRelease(1); producer.client.getMemoryLimitController().releaseMemory( messages.get(0).getUncompressedSize() + batchAllocatedSizeBytes); @@ -286,6 +287,7 @@ public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); if (encryptedPayload.readableBytes() > getMaxMessageSize()) { + encryptedPayload.release(); producer.semaphoreRelease(messages.size()); messages.forEach(msg -> producer.client.getMemoryLimitController() .releaseMemory(msg.getUncompressedSize()));