diff --git a/resp-decoder/src/main/java/org/infinispan/server/resp/NettyChannelState.java b/resp-decoder/src/main/java/org/infinispan/server/resp/NettyChannelState.java index fdc80f9..735f235 100644 --- a/resp-decoder/src/main/java/org/infinispan/server/resp/NettyChannelState.java +++ b/resp-decoder/src/main/java/org/infinispan/server/resp/NettyChannelState.java @@ -13,8 +13,7 @@ public class NettyChannelState { public EmbeddedChannel channel; public enum DECODER { - CURRENT, - NEW + CURRENT } @Param @@ -27,14 +26,13 @@ public void initializeState() { channel.config().setAllocator(PooledByteBufAllocator.DEFAULT); switch (decoder) { - case NEW: - channel.pipeline() - .addLast(new NewDecoder(new NewRespHandler())); - break; case CURRENT: channel.pipeline() .addLast(new RespDecoder(new OurRespHandler())); break; } + + // Ensure the ByteBufPool is initialized + channel.pipeline().fireChannelRegistered(); } } diff --git a/resp-decoder/src/main/java/org/infinispan/server/resp/NewDecoder.java b/resp-decoder/src/main/java/org/infinispan/server/resp/NewDecoder.java deleted file mode 100644 index 9238d5e..0000000 --- a/resp-decoder/src/main/java/org/infinispan/server/resp/NewDecoder.java +++ /dev/null @@ -1,147 +0,0 @@ -package org.infinispan.server.resp; - -import java.util.List; -import java.util.concurrent.CompletionStage; - -import org.infinispan.commons.util.Util; -import org.infinispan.util.concurrent.CompletionStages; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.util.concurrent.FastThreadLocal; - -public class NewDecoder extends RespDecoder { - public NewDecoder(NewRespHandler initialHandler) { - super(initialHandler); - initialHandler.writer = this::flushBuffer; - initialHandler.allocator = this::retrieveBuffer; - } - - protected static final int THREAD_LOCAL_CAPACITY = 1024; - - private ChannelHandlerContext ctx; - - private boolean isReading; - private ByteBuf pendingBuffer; - - private static final FastThreadLocal BYTE_BUF_FAST_THREAD_LOCAL = new FastThreadLocal<>() { - @Override - protected ByteBuf initialValue() { - return Unpooled.directBuffer(THREAD_LOCAL_CAPACITY, THREAD_LOCAL_CAPACITY); - } - - @Override - protected void onRemoval(ByteBuf value) { - value.release(); - } - }; - - protected void flushBuffer(ByteBuf buffer) { - assert buffer == pendingBuffer : "Buffer mismatch expected " + pendingBuffer + " but was " + buffer; - // Only flush the buffer it is done outside of the read loop, as we handle that ourselves - if (!isReading) { - ctx.writeAndFlush(pendingBuffer, ctx.voidPromise()); - pendingBuffer = null; - } - } - - private void flushPendingBuffer() { - if (pendingBuffer != null) { - ctx.writeAndFlush(pendingBuffer, ctx.voidPromise()); - pendingBuffer = null; - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - flushPendingBuffer(); - super.channelInactive(ctx); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - super.handlerAdded(ctx); - } - - protected ByteBuf retrieveBuffer(int requiredBytes) { - if (pendingBuffer != null) { - if (requiredBytes < pendingBuffer.writableBytes()) { - return pendingBuffer; - } - ctx.write(pendingBuffer, ctx.voidPromise()); - pendingBuffer = null; - } - ByteBuf buf = BYTE_BUF_FAST_THREAD_LOCAL.get(); - if (requiredBytes < buf.writableBytes()) { - // This will reserve the buffer for our usage only, other channels in same event loop may try to reserve - if (buf.refCnt() == 1) { - buf.retain(); - buf.clear(); - pendingBuffer = buf; - return buf; - } - } - int reserveSize = Math.max(requiredBytes, 4096); - pendingBuffer = ctx.alloc().buffer(reserveSize, reserveSize); - return pendingBuffer; - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - isReading = false; - flushPendingBuffer(); - super.channelReadComplete(ctx); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - isReading = true; - super.channelRead(ctx, msg); - } - - @Override - protected boolean handleCommandAndArguments(ChannelHandlerContext ctx, String command, List arguments) { - boolean canContinue = handleCommandAndArgumentsOverride(ctx, command, arguments); - if (canContinue) { - if (ctx.channel().bytesBeforeUnwritable() < pendingBuffer.readableBytes()) { - ctx.writeAndFlush(pendingBuffer); - pendingBuffer = null; - // TODO: we should probably check for writeability still and block reading if possible until is cleared - } - } else { - assert !ctx.channel().config().isAutoRead(); - } - return canContinue; - } - - protected boolean handleCommandAndArgumentsOverride(ChannelHandlerContext ctx, String command, List arguments) { - if (log.isTraceEnabled()) { - log.tracef("Received command: %s with arguments %s for %s", command, Util.toStr(arguments), ctx.channel()); - } - - CompletionStage stage = requestHandler.handleRequest(ctx, command, arguments); - if (CompletionStages.isCompletedSuccessfully(stage)) { - requestHandler = CompletionStages.join(stage); - return true; - } - log.tracef("Disabling auto read for channel %s until previous command is complete", ctx.channel()); - // Disable reading any more from socket - until command is complete - ctx.channel().config().setAutoRead(false); - stage.whenComplete((handler, t) -> { - assert ctx.channel().eventLoop().inEventLoop(); - log.tracef("Re-enabling auto read for channel %s as previous command is complete", ctx.channel()); - ctx.channel().config().setAutoRead(true); - if (t != null) { - exceptionCaught(ctx, t); - } else { - // Instate the new handler if there was no exception - requestHandler = handler; - } - - ctx.read(); - }); - return false; - } -} diff --git a/resp-decoder/src/main/java/org/infinispan/server/resp/NewRespHandler.java b/resp-decoder/src/main/java/org/infinispan/server/resp/NewRespHandler.java deleted file mode 100644 index 74ee1b1..0000000 --- a/resp-decoder/src/main/java/org/infinispan/server/resp/NewRespHandler.java +++ /dev/null @@ -1,71 +0,0 @@ -package org.infinispan.server.resp; - -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.Consumer; -import java.util.function.IntFunction; - -import org.infinispan.commons.util.concurrent.CompletableFutures; -import org.infinispan.util.function.TriConsumer; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.util.AttributeKey; - -public class NewRespHandler extends RespRequestHandler { - private static final CompletableFuture GET_FUTURE = CompletableFuture.completedFuture(new byte[] { 0x1, 0x12}); - public static byte[] OK = "+OK\r\n".getBytes(StandardCharsets.US_ASCII); - - public IntFunction allocator; - public Consumer writer; - - @Override - public CompletionStage handleRequest(ChannelHandlerContext ctx, String type, List arguments) { - switch (type) { - case "GET": - return stageToReturn(GET_FUTURE, ctx, GET_TRICONSUMER); - case "SET": - return stageToReturn(CompletableFutures.completedNull(), ctx, SET_TRICONSUMER); - } - return super.handleRequest(ctx, type, arguments); - } - - private final TriConsumer SET_TRICONSUMER = (ignore, innerCtx, t) -> { - if (t != null) { - throw new AssertionError(t); - } else { - ByteBuf buf = allocator.apply(OK.length); - writer.accept(buf.writeBytes(OK)); - } - }; - - private final TriConsumer GET_TRICONSUMER = (innerValueBytes, innerCtx, t) -> { - if (t != null) { - throw new AssertionError(t); - } else if (innerValueBytes != null) { - ByteBuf buf = bytesToResult(innerValueBytes, allocator); - writer.accept(buf); - } else { - innerCtx.writeAndFlush(RespRequestHandler.stringToByteBuf("$-1\r\n", innerCtx.alloc()), innerCtx.voidPromise()); - } - }; - - protected static ByteBuf bytesToResult(byte[] result, IntFunction allocator) { - int length = result.length; - int stringLength = stringSize(length); - - // Need 5 extra for $ and 2 sets of /r/n - int exactSize = stringLength + length + 5; - ByteBuf buffer = allocator.apply(exactSize); - buffer.writeByte('$'); - // This method is anywhere from 10-100% faster than ByteBufUtil.writeAscii and avoids allocations - setIntChars(length, stringLength, buffer); - buffer.writeByte('\r').writeByte('\n'); - buffer.writeBytes(result); - buffer.writeByte('\r').writeByte('\n'); - - return buffer; - } -} diff --git a/resp-decoder/src/main/java/org/infinispan/server/resp/OurRespHandler.java b/resp-decoder/src/main/java/org/infinispan/server/resp/OurRespHandler.java index 11601c8..d908208 100644 --- a/resp-decoder/src/main/java/org/infinispan/server/resp/OurRespHandler.java +++ b/resp-decoder/src/main/java/org/infinispan/server/resp/OurRespHandler.java @@ -1,53 +1,43 @@ package org.infinispan.server.resp; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.IntFunction; import org.infinispan.commons.util.concurrent.CompletableFutures; import org.infinispan.util.function.TriConsumer; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; -import io.netty.util.CharsetUtil; +import io.netty.util.AttributeKey; public class OurRespHandler extends RespRequestHandler { private static final CompletableFuture GET_FUTURE = CompletableFuture.completedFuture(new byte[] { 0x1, 0x12}); - public static ByteBuf OK = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("+OK\r\n", CharsetUtil.US_ASCII)); - - // Returns a cached OK status that is retained for multiple uses - static ByteBuf statusOK() { - return OK.duplicate(); - } + public static byte[] OK = "+OK\r\n".getBytes(StandardCharsets.US_ASCII); @Override - public CompletionStage handleRequest(ChannelHandlerContext ctx, String type, List arguments) { + public CompletionStage actualHandleRequest(ChannelHandlerContext ctx, String type, List arguments) { switch (type) { case "GET": return stageToReturn(GET_FUTURE, ctx, GET_TRICONSUMER); case "SET": - return stageToReturn(CompletableFutures.completedNull(), ctx, SET_TRICONSUMER); + return stageToReturn(CompletableFutures.completedNull(), ctx, OK_BICONSUMER); } return super.handleRequest(ctx, type, arguments); } - private static final TriConsumer SET_TRICONSUMER = (ignore, innerCtx, t) -> { - if (t != null) { - throw new AssertionError(t); - } else { - innerCtx.writeAndFlush(statusOK(), innerCtx.voidPromise()); - } - }; + protected static final BiConsumer OK_BICONSUMER = (ignore, alloc) -> + alloc.acquire(OK.length).writeBytes(OK); - private static final TriConsumer GET_TRICONSUMER = (innerValueBytes, innerCtx, t) -> { - if (t != null) { - throw new AssertionError(t); - } else if (innerValueBytes != null) { - ByteBuf buf = bytesToResult(innerValueBytes, innerCtx.alloc()); - innerCtx.writeAndFlush(buf, innerCtx.voidPromise()); + protected static final BiConsumer GET_TRICONSUMER = (innerValueBytes, alloc) -> { + if (innerValueBytes != null) { + bytesToResult(innerValueBytes, alloc); } else { - innerCtx.writeAndFlush(RespRequestHandler.stringToByteBuf("$-1\r\n", innerCtx.alloc()), innerCtx.voidPromise()); + stringToByteBuf("$-1\r\n", alloc); } }; }