Skip to content

Commit

Permalink
Update to latest SNAPSHOT
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns committed Mar 30, 2023
1 parent 24a6c1c commit 0161e00
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ public class NettyChannelState {
public EmbeddedChannel channel;

public enum DECODER {
CURRENT,
NEW
CURRENT
}

@Param
Expand All @@ -27,14 +26,13 @@ public void initializeState() {
channel.config().setAllocator(PooledByteBufAllocator.DEFAULT);

switch (decoder) {
case NEW:
channel.pipeline()
.addLast(new NewDecoder(new NewRespHandler()));
break;
case CURRENT:
channel.pipeline()
.addLast(new RespDecoder(new OurRespHandler()));
break;
}

// Ensure the ByteBufPool is initialized
channel.pipeline().fireChannelRegistered();
}
}
147 changes: 0 additions & 147 deletions resp-decoder/src/main/java/org/infinispan/server/resp/NewDecoder.java

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,53 +1,43 @@
package org.infinispan.server.resp;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.util.function.TriConsumer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import io.netty.util.AttributeKey;

public class OurRespHandler extends RespRequestHandler {
private static final CompletableFuture<byte[]> GET_FUTURE = CompletableFuture.completedFuture(new byte[] { 0x1, 0x12});
public static ByteBuf OK = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("+OK\r\n", CharsetUtil.US_ASCII));

// Returns a cached OK status that is retained for multiple uses
static ByteBuf statusOK() {
return OK.duplicate();
}
public static byte[] OK = "+OK\r\n".getBytes(StandardCharsets.US_ASCII);

@Override
public CompletionStage<RespRequestHandler> handleRequest(ChannelHandlerContext ctx, String type, List<byte[]> arguments) {
public CompletionStage<RespRequestHandler> actualHandleRequest(ChannelHandlerContext ctx, String type, List<byte[]> arguments) {
switch (type) {
case "GET":
return stageToReturn(GET_FUTURE, ctx, GET_TRICONSUMER);
case "SET":
return stageToReturn(CompletableFutures.completedNull(), ctx, SET_TRICONSUMER);
return stageToReturn(CompletableFutures.completedNull(), ctx, OK_BICONSUMER);
}
return super.handleRequest(ctx, type, arguments);
}

private static final TriConsumer<byte[], ChannelHandlerContext, Throwable> SET_TRICONSUMER = (ignore, innerCtx, t) -> {
if (t != null) {
throw new AssertionError(t);
} else {
innerCtx.writeAndFlush(statusOK(), innerCtx.voidPromise());
}
};
protected static final BiConsumer<Object, ByteBufPool> OK_BICONSUMER = (ignore, alloc) ->
alloc.acquire(OK.length).writeBytes(OK);

private static final TriConsumer<byte[], ChannelHandlerContext, Throwable> GET_TRICONSUMER = (innerValueBytes, innerCtx, t) -> {
if (t != null) {
throw new AssertionError(t);
} else if (innerValueBytes != null) {
ByteBuf buf = bytesToResult(innerValueBytes, innerCtx.alloc());
innerCtx.writeAndFlush(buf, innerCtx.voidPromise());
protected static final BiConsumer<byte[], ByteBufPool> GET_TRICONSUMER = (innerValueBytes, alloc) -> {
if (innerValueBytes != null) {
bytesToResult(innerValueBytes, alloc);
} else {
innerCtx.writeAndFlush(RespRequestHandler.stringToByteBuf("$-1\r\n", innerCtx.alloc()), innerCtx.voidPromise());
stringToByteBuf("$-1\r\n", alloc);
}
};
}

0 comments on commit 0161e00

Please sign in to comment.