Skip to content

Commit

Permalink
Change redis storage to use raw binary
Browse files Browse the repository at this point in the history
  • Loading branch information
zbx1425 committed Dec 26, 2024
1 parent fdaf6cd commit c5128cf
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import cn.zbx1425.worldcomment.data.network.ThumbImage;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import net.minecraft.core.BlockPos;
import net.minecraft.network.FriendlyByteBuf;
Expand All @@ -14,7 +15,6 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Objects;
import java.util.UUID;

Expand Down Expand Up @@ -138,17 +138,15 @@ public JsonObject toJson() {
return json;
}

public String toBinaryString() {
FriendlyByteBuf dest = new FriendlyByteBuf(Unpooled.buffer());
public ByteBuf toBinaryBuffer() {
FriendlyByteBuf dest = new FriendlyByteBuf(Unpooled.buffer(256));
dest.writeResourceLocation(level);
writeBuffer(dest, false);
byte[] destArray = new byte[dest.writerIndex()];
dest.getBytes(0, destArray);
return Base64.getEncoder().encodeToString(destArray);
return dest;
}

public static CommentEntry fromBinaryString(String str) {
FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(Base64.getDecoder().decode(str)));
public static CommentEntry fromBinaryBuffer(ByteBuf buf) {
FriendlyByteBuf src = new FriendlyByteBuf(buf);
ResourceLocation level = src.readResourceLocation();
return new CommentEntry(level, src, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.google.common.hash.Hashing;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.commons.codec.binary.Base64;

import java.io.BufferedReader;
import java.io.InputStreamReader;
Expand All @@ -13,6 +12,7 @@
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Base64;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
Expand Down Expand Up @@ -50,7 +50,7 @@ public void sendBlocking() {
byte[] signatureBytes = Hashing
.hmacSha1(Main.SERVER_CONFIG.uplinkAuthKey.value.getBytes(StandardCharsets.UTF_8))
.hashBytes(postDataBytes).asBytes();
conn.setRequestProperty("Authorization", "NEX-HMAC-SHA1 Signature=" + Base64.encodeBase64String(signatureBytes));
conn.setRequestProperty("Authorization", "NEX-HMAC-SHA1 Signature=" + Base64.getEncoder().encodeToString(signatureBytes));
}

conn.setDoOutput(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,67 @@
import cn.zbx1425.worldcomment.data.CommentEntry;
import cn.zbx1425.worldcomment.data.ServerWorldData;
import io.lettuce.core.api.StatefulRedisConnection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import net.minecraft.network.FriendlyByteBuf;

import java.io.IOException;

public class RedisMessage {

public static final String COMMAND_CHANNEL = "WORLD_COMMENT_COMMAND_CHANNEL";

private static final String INSTANCE_ID = Long.toHexString(ServerWorldData.SNOWFLAKE.nextId());
private static final long INSTANCE_ID = ServerWorldData.SNOWFLAKE.nextId();

public String initiator;
public String action;
public String content;
public long initiator;
public Action action;
public ByteBuf content;

public RedisMessage(String action, String content) {
public RedisMessage(Action action, ByteBuf content) {
this.initiator = INSTANCE_ID;
this.action = action;
this.content = content;
}

public RedisMessage(String redisCommand) {
int firstHash = redisCommand.indexOf(':');
int lastHash = redisCommand.lastIndexOf(':');
this.action = redisCommand.substring(0, firstHash);
this.initiator = redisCommand.substring(firstHash + 1, lastHash);
this.content = redisCommand.substring(lastHash + 1);
public RedisMessage(ByteBuf src) {
this.action = Action.values()[src.readByte()];
this.initiator = src.readLong();
int length = src.readInt();
this.content = src.readBytes(length);
}

public static RedisMessage insert(CommentEntry entry) {
return new RedisMessage("INSERT", entry.toBinaryString());
return new RedisMessage(Action.INSERT, entry.toBinaryBuffer());
}

public static RedisMessage update(CommentEntry entry) {
return new RedisMessage("UPDATE", entry.toBinaryString());
return new RedisMessage(Action.UPDATE, entry.toBinaryBuffer());
}

public void publishAsync(StatefulRedisConnection<String, ByteBuf> connection) {
ByteBuf buffer = Unpooled.buffer(content.readableBytes() + 1 + Long.BYTES);
buffer.writeByte(action.ordinal());
buffer.writeLong(initiator);
connection.async().publish(COMMAND_CHANNEL, buffer);
}

public void publishAsync(StatefulRedisConnection<String, String> connection) {
connection.async().publish(COMMAND_CHANNEL, String.format("%s:%s:%s", action, initiator, content));
public void handle(RedisSynchronizer synchronizer) throws IOException {
if (isFromSelf()) return;
switch (action) {
case INSERT:
synchronizer.handleInsert(CommentEntry.fromBinaryBuffer(content));
break;
case UPDATE:
synchronizer.handleUpdate(CommentEntry.fromBinaryBuffer(content));
break;
}
}

public boolean isFromSelf() {
return initiator.equals(INSTANCE_ID);
return initiator == INSTANCE_ID;
}

public enum Action {
INSERT, UPDATE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,31 @@
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class RedisSynchronizer implements Synchronizer {

private final StatefulRedisPubSubConnection<String, String> redisSub;
private final StatefulRedisConnection<String, String> redisConn;
private final StatefulRedisPubSubConnection<String, ByteBuf> redisSub;
private final StatefulRedisConnection<String, ByteBuf> redisConn;

public static final String HMAP_ALL_KEY = "WORLD_COMMENT_DATA_ALL";

private final ServerWorldData serverWorldData;

public RedisSynchronizer(String URI, ServerWorldData serverWorldData) {
redisConn = RedisClient.create(URI).connect();
redisSub = RedisClient.create(URI).connectPubSub();
redisConn = RedisClient.create(URI).connect(ByteBufCodec.INSTANCE);
redisSub = RedisClient.create(URI).connectPubSub(ByteBufCodec.INSTANCE);
redisSub.addListener(new Listener());
redisSub.sync().subscribe(RedisMessage.COMMAND_CHANNEL);

Expand All @@ -35,12 +40,12 @@ public RedisSynchronizer(String URI, ServerWorldData serverWorldData) {

@Override
public void kvWriteAll(Long2ObjectSortedMap<CommentEntry> all) {
RedisAsyncCommands<String, String> commands = redisConn.async();
RedisAsyncCommands<String, ByteBuf> commands = redisConn.async();
commands.multi();
commands.del(HMAP_ALL_KEY);
HashMap<String, String> data = new HashMap<>();
HashMap<String, ByteBuf> data = new HashMap<>();
for (CommentEntry entry : all.values()) {
data.put(Long.toHexString(entry.id), entry.toBinaryString());
data.put(Long.toHexString(entry.id), entry.toBinaryBuffer());
}
commands.hset(HMAP_ALL_KEY, data);
commands.exec();
Expand All @@ -51,7 +56,7 @@ public void kvWriteEntry(CommentEntry newEntry) {
if (newEntry.deleted) {
redisConn.async().hdel(HMAP_ALL_KEY, Long.toHexString(newEntry.id));
} else {
redisConn.async().hset(HMAP_ALL_KEY, Long.toHexString(newEntry.id), newEntry.toBinaryString());
redisConn.async().hset(HMAP_ALL_KEY, Long.toHexString(newEntry.id), newEntry.toBinaryBuffer());
}
}

Expand All @@ -60,7 +65,7 @@ public void notifyInsert(CommentEntry newEntry) {
RedisMessage.insert(newEntry).publishAsync(redisConn);
}

private void handleInsert(CommentEntry peerEntry) throws IOException {
protected void handleInsert(CommentEntry peerEntry) throws IOException {
serverWorldData.insert(peerEntry, true);
}

Expand All @@ -69,15 +74,15 @@ public void notifyUpdate(CommentEntry newEntry) {
RedisMessage.update(newEntry).publishAsync(redisConn);
}

private void handleUpdate(CommentEntry peerEntry) throws IOException {
protected void handleUpdate(CommentEntry peerEntry) throws IOException {
serverWorldData.update(peerEntry, true);
}

@Override
public void kvReadAllInto(CommentCache comments) throws IOException {
Map<String, String> data = redisConn.sync().hgetall(HMAP_ALL_KEY);
for (String entry : data.values()) {
comments.insert(CommentEntry.fromBinaryString(entry));
Map<String, ByteBuf> data = redisConn.sync().hgetall(HMAP_ALL_KEY);
for (ByteBuf entry : data.values()) {
comments.insert(CommentEntry.fromBinaryBuffer(entry));
}
}

Expand All @@ -87,23 +92,19 @@ public void close() {
redisConn.close();
}

public class Listener implements RedisPubSubListener<String, String> {
public class Listener implements RedisPubSubListener<String, ByteBuf> {
@Override
public void message(String channel, String rawMessage) {
public void message(String channel, ByteBuf rawMessage) {
RedisMessage message = new RedisMessage(rawMessage);
if (message.isFromSelf()) return;
try {
switch (message.action) {
case "INSERT" -> handleInsert(CommentEntry.fromBinaryString(message.content));
case "UPDATE" -> handleUpdate(CommentEntry.fromBinaryString(message.content));
}
message.handle(RedisSynchronizer.this);
} catch (IOException ex) {
Main.LOGGER.error("Redis handler", ex);
}
}

@Override
public void message(String pattern, String channel, String message) { }
public void message(String pattern, String channel, ByteBuf message) { }

@Override
public void subscribed(String channel, long count) { }
Expand All @@ -118,4 +119,30 @@ public void unsubscribed(String channel, long count) { }
public void punsubscribed(String pattern, long count) { }
}

private static class ByteBufCodec implements RedisCodec<String, ByteBuf> {

public static ByteBufCodec INSTANCE = new ByteBufCodec();

@Override
public String decodeKey(ByteBuffer bytes) {
return StringCodec.UTF8.decodeKey(bytes);
}

@Override
public ByteBuf decodeValue(ByteBuffer bytes) {
ByteBuf result = Unpooled.buffer(bytes.remaining());
result.writeBytes(bytes);
return result;
}

@Override
public ByteBuffer encodeKey(String key) {
return StringCodec.UTF8.encodeKey(key);
}

@Override
public ByteBuffer encodeValue(ByteBuf value) {
return value.nioBuffer();
}
}
}

0 comments on commit c5128cf

Please sign in to comment.