From 1e727f571cb85e0aaa6fd9336a4fd2084f30abba Mon Sep 17 00:00:00 2001 From: "pu.wang" Date: Wed, 15 Jan 2025 20:54:53 +0800 Subject: [PATCH 1/4] feat(csc): support csc on broadcasting mode --- .../java/redis/clients/jedis/Connection.java | 10 ++++ .../clients/jedis/ConnectionFactory.java | 57 +++++++++++++++++++ .../jedis/DefaultJedisClientConfig.java | 50 ++++++++++++++-- .../clients/jedis/JedisClientConfig.java | 9 +++ .../java/redis/clients/jedis/JedisPooled.java | 4 +- .../clients/jedis/csc/CacheConnection.java | 17 +++++- .../java/redis/clients/jedis/csc/CSCTest.java | 57 +++++++++++++++++++ 7 files changed, 193 insertions(+), 11 deletions(-) create mode 100644 src/test/java/redis/clients/jedis/csc/CSCTest.java diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index de473d0b8e..adc1a43f0c 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -195,6 +195,16 @@ public void sendCommand(final ProtocolCommand cmd, final String... args) { sendCommand(new CommandArguments(cmd).addObjects((Object[]) args)); } + @Experimental + public void sendCommandWithTracking(final ProtocolCommand cmd, List prefixList, final String... args) { + List list = new ArrayList<>(); + for (String prefix : prefixList) { + list.add("PREFIX"); + list.add(prefix); + } + sendCommand(new CommandArguments(cmd).addObjects((Object[]) args).addObjects(list.toArray())); + } + public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { sendCommand(new CommandArguments(cmd).addObjects((Object[]) args)); } diff --git a/src/main/java/redis/clients/jedis/ConnectionFactory.java b/src/main/java/redis/clients/jedis/ConnectionFactory.java index 7440417152..c20f8f7731 100644 --- a/src/main/java/redis/clients/jedis/ConnectionFactory.java +++ b/src/main/java/redis/clients/jedis/ConnectionFactory.java @@ -6,6 +6,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import redis.clients.jedis.annots.Experimental; @@ -29,6 +34,8 @@ public class ConnectionFactory implements PooledObjectFactory { private final Supplier objectMaker; private final AuthXEventListener authXEventListener; + private CacheConnection trackingConnection = null; + private ScheduledExecutorService invalidationListeningExecutor = null; public ConnectionFactory(final HostAndPort hostAndPort) { this(hostAndPort, DefaultJedisClientConfig.builder().build(), null); @@ -42,6 +49,11 @@ public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache csCache) { this(new DefaultJedisSocketFactory(hostAndPort, clientConfig), clientConfig, csCache); + if (!clientConfig.getTrackingModeOnDefault()) { + invalidationListeningExecutor = Executors.newSingleThreadScheduledExecutor(); + // initialize tracking connection + initializeTrackingConnection(); + } } public ConnectionFactory(final JedisSocketFactory jedisSocketFactory, @@ -68,6 +80,51 @@ private ConnectionFactory(final JedisSocketFactory jedisSocketFactory, } } + @Experimental + private void initializeTrackingConnection() { + trackingConnection = new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache); + tracking(); + startInvalidationListenerThread(); + } + + /** + * tracking on broadcasting mode + */ + @Experimental + private void tracking() { + List trackingPrefixList = clientConfig.getTrackingPrefixList(); + // If no prefix is set, the prefix is "" + if (trackingPrefixList == null) { + trackingPrefixList = new ArrayList<>(); + trackingPrefixList.add(""); + } + trackingConnection.sendCommandWithTracking(Protocol.Command.CLIENT, trackingPrefixList, "TRACKING", "ON", "BCAST"); + String reply = trackingConnection.getStatusCodeReply(); + if (!"OK".equals(reply)) { + throw new JedisException("Could not enable client tracking. Reply: " + reply); + } + } + + /** + * Start a scheduled task to listen for invalidation event + */ + @Experimental + private void startInvalidationListenerThread() { + invalidationListeningExecutor.scheduleAtFixedRate(() -> { + if (trackingConnection.isBroken() || !trackingConnection.isConnected() || !trackingConnection.ping()) { + // flush cache(broadcasting mode only trackingConnection disconnect) + clientSideCache.flush(); + try { + trackingConnection.connect(); + } catch (Exception e) { + // TODO + } + tracking(); + } + trackingConnection.readPushesWithCheckingBroken(); + }, 2, 2, TimeUnit.SECONDS); + } + private Supplier connectionSupplier() { return clientSideCache == null ? () -> new Connection(jedisSocketFactory, clientConfig) : () -> new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache); diff --git a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java index 5f0e050ef4..77a702979d 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.util.List; import java.util.function.Supplier; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLParameters; @@ -32,13 +33,23 @@ public final class DefaultJedisClientConfig implements JedisClientConfig { private final AuthXManager authXManager; + /** + * tracking prefix list + */ + private final List trackingPrefixList; + + /** + * tracking mode(true:default; false:broadcast) + */ + private final boolean trackingModeOnDefault; + private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMillis, int soTimeoutMillis, int blockingSocketTimeoutMillis, Supplier credentialsProvider, int database, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, HostAndPortMapper hostAndPortMapper, ClientSetInfoConfig clientSetInfoConfig, boolean readOnlyForRedisClusterReplicas, - AuthXManager authXManager) { + AuthXManager authXManager, List trackingPrefixList, boolean trackingModeOnDefault) { this.redisProtocol = protocol; this.connectionTimeoutMillis = connectionTimeoutMillis; this.socketTimeoutMillis = soTimeoutMillis; @@ -54,7 +65,8 @@ private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMi this.clientSetInfoConfig = clientSetInfoConfig; this.readOnlyForRedisClusterReplicas = readOnlyForRedisClusterReplicas; this.authXManager = authXManager; - + this.trackingPrefixList = trackingPrefixList; + this.trackingModeOnDefault = trackingModeOnDefault; } @Override @@ -138,6 +150,16 @@ public ClientSetInfoConfig getClientSetInfoConfig() { return clientSetInfoConfig; } + @Override + public List getTrackingPrefixList() { + return trackingPrefixList; + } + + @Override + public boolean getTrackingModeOnDefault() { + return trackingModeOnDefault; + } + @Override public boolean isReadOnlyForRedisClusterReplicas() { return readOnlyForRedisClusterReplicas; @@ -174,6 +196,10 @@ public static class Builder { private AuthXManager authXManager; + private List trackingPrefixList; + + private boolean trackingModeOnDefault = true; + private Builder() { } @@ -186,7 +212,7 @@ public DefaultJedisClientConfig build() { return new DefaultJedisClientConfig(redisProtocol, connectionTimeoutMillis, socketTimeoutMillis, blockingSocketTimeoutMillis, credentialsProvider, database, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, - clientSetInfoConfig, readOnlyForRedisClusterReplicas, authXManager); + clientSetInfoConfig, readOnlyForRedisClusterReplicas, authXManager, trackingPrefixList, trackingModeOnDefault); } /** @@ -293,6 +319,16 @@ public Builder authXManager(AuthXManager authXManager) { return this; } + public Builder trackingPrefixList(List trackingPrefixList) { + this.trackingPrefixList = trackingPrefixList; + return this; + } + + public Builder trackingModeOnDefault(boolean trackingModeOnDefault) { + this.trackingModeOnDefault = trackingModeOnDefault; + return this; + } + public Builder from(JedisClientConfig instance) { this.redisProtocol = instance.getRedisProtocol(); this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis(); @@ -317,12 +353,13 @@ public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int s int blockingSocketTimeoutMillis, String user, String password, int database, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, - HostAndPortMapper hostAndPortMapper, AuthXManager authXManager) { + HostAndPortMapper hostAndPortMapper, AuthXManager authXManager, + List trackingPrefixList, boolean trackingModeOnDefault) { return new DefaultJedisClientConfig(null, connectionTimeoutMillis, soTimeoutMillis, blockingSocketTimeoutMillis, new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(user, password)), database, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null, - false, authXManager); + false, authXManager, trackingPrefixList, trackingModeOnDefault); } public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) { @@ -331,6 +368,7 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) { copy.getCredentialsProvider(), copy.getDatabase(), copy.getClientName(), copy.isSsl(), copy.getSslSocketFactory(), copy.getSslParameters(), copy.getHostnameVerifier(), copy.getHostAndPortMapper(), copy.getClientSetInfoConfig(), - copy.isReadOnlyForRedisClusterReplicas(), copy.getAuthXManager()); + copy.isReadOnlyForRedisClusterReplicas(), copy.getAuthXManager(), + copy.getTrackingPrefixList(), copy.getTrackingModeOnDefault()); } } diff --git a/src/main/java/redis/clients/jedis/JedisClientConfig.java b/src/main/java/redis/clients/jedis/JedisClientConfig.java index 82e9eb8e7f..77b673d778 100644 --- a/src/main/java/redis/clients/jedis/JedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/JedisClientConfig.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.util.List; import java.util.function.Supplier; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLParameters; @@ -55,6 +56,14 @@ default AuthXManager getAuthXManager() { return null; } + default List getTrackingPrefixList() { + return null; + } + + default boolean getTrackingModeOnDefault() { + return true; + } + default int getDatabase() { return Protocol.DEFAULT_DATABASE; } diff --git a/src/main/java/redis/clients/jedis/JedisPooled.java b/src/main/java/redis/clients/jedis/JedisPooled.java index c735fda7c1..23aa698c33 100644 --- a/src/main/java/redis/clients/jedis/JedisPooled.java +++ b/src/main/java/redis/clients/jedis/JedisPooled.java @@ -295,7 +295,7 @@ public JedisPooled(final GenericObjectPoolConfig poolConfig, final S final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout, final String user, final String password, final int database, final String clientName) { this(new HostAndPort(host, port), DefaultJedisClientConfig.create(connectionTimeout, soTimeout, - infiniteSoTimeout, user, password, database, clientName, false, null, null, null, null, null), + infiniteSoTimeout, user, password, database, clientName, false, null, null, null, null, null, null, true), poolConfig); } @@ -306,7 +306,7 @@ public JedisPooled(final GenericObjectPoolConfig poolConfig, final S final HostnameVerifier hostnameVerifier) { this(new HostAndPort(host, port), DefaultJedisClientConfig.create(connectionTimeout, soTimeout, infiniteSoTimeout, user, password, database, clientName, ssl, sslSocketFactory, sslParameters, - hostnameVerifier, null, null), poolConfig); + hostnameVerifier, null, null, null, true), poolConfig); } public JedisPooled(final URI uri) { diff --git a/src/main/java/redis/clients/jedis/csc/CacheConnection.java b/src/main/java/redis/clients/jedis/csc/CacheConnection.java index f157d95a94..ce44a88ec8 100644 --- a/src/main/java/redis/clients/jedis/csc/CacheConnection.java +++ b/src/main/java/redis/clients/jedis/csc/CacheConnection.java @@ -14,6 +14,7 @@ public class CacheConnection extends Connection { + private boolean isDefaultMode = true; private final Cache cache; private ReentrantLock lock; private static final String REDIS = "redis"; @@ -21,7 +22,6 @@ public class CacheConnection extends Connection { public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, Cache cache) { super(socketFactory, clientConfig); - if (protocol != RedisProtocol.RESP3) { throw new JedisException("Client side caching is only supported with RESP3."); } @@ -33,7 +33,10 @@ public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig } } this.cache = Objects.requireNonNull(cache); - initializeClientSideCache(); + this.isDefaultMode = clientConfig.getTrackingModeOnDefault(); + if (isDefaultMode) { + initializeClientSideCache(); + } } @Override @@ -66,7 +69,10 @@ protected void protocolReadPushes(RedisInputStream inputStream) { @Override public void disconnect() { super.disconnect(); - cache.flush(); + if (isDefaultMode) { + // The cache is cleared when the connection is disconnected only in default mode + cache.flush(); + } } @Override @@ -79,6 +85,11 @@ public T executeCommand(final CommandObject commandObject) { CacheEntry cacheEntry = cache.get(cacheKey); if (cacheEntry != null) { // (probable) CACHE HIT !! + if (!isDefaultMode) { + // Broadcast mode returns directly + cache.getStats().hit(); + return cacheEntry.getValue(); + } cacheEntry = validateEntry(cacheEntry); if (cacheEntry != null) { // CACHE HIT confirmed !!! diff --git a/src/test/java/redis/clients/jedis/csc/CSCTest.java b/src/test/java/redis/clients/jedis/csc/CSCTest.java new file mode 100644 index 0000000000..24bf76f7b7 --- /dev/null +++ b/src/test/java/redis/clients/jedis/csc/CSCTest.java @@ -0,0 +1,57 @@ +package redis.clients.jedis.csc; + +import org.junit.Test; +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisClientConfig; +import redis.clients.jedis.UnifiedJedis; +import redis.clients.jedis.commands.ProtocolCommand; + +import java.util.ArrayList; +import java.util.List; + +public class CSCTest { + private static JedisClientConfig clientConfig; + private static CacheConfig cacheConfig; + + static { + List trackingPrefixList = new ArrayList<>(); + trackingPrefixList.add("v1"); + trackingPrefixList.add("v2"); + + clientConfig = DefaultJedisClientConfig.builder() + .resp3() // RESP3 protocol is required for client-side caching + .trackingModeOnDefault(false) // tracking mode(true:default; false:broadcast) + .trackingPrefixList(trackingPrefixList) // tracking prefix list(only broadcast mode) + .build(); + + cacheConfig = getCacheConfig(); + } + + private static CacheConfig getCacheConfig() { + + // This is a simple cacheable implementation that ignores keys starting with "ignore_me" + Cacheable cacheable = new DefaultCacheable() { + @Override + public boolean isCacheable(ProtocolCommand command, List keys) { + return isDefaultCacheableCommand(command); + } + }; + + // Create a cache with a maximum size of 10000 entries + return CacheConfig.builder() + .maxSize(10000) + .cacheable(cacheable) + .build(); + } + + @Test + public void testTrackingOnBroadcastMode() { + HostAndPort node = HostAndPort.from("127.0.0.1:6379"); + try (UnifiedJedis client = new UnifiedJedis(node, clientConfig, CacheFactory.getCache(cacheConfig))) { + String a = client.get("a"); + System.out.println(); + + } + } +} From c89017b6ce2815f2137de048df4b83b1d1430bc2 Mon Sep 17 00:00:00 2001 From: "pu.wang" Date: Thu, 16 Jan 2025 14:22:24 +0800 Subject: [PATCH 2/4] feat: process conflict --- .../jedis/DefaultJedisClientConfig.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java index 25a4737ec0..59f020406a 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.util.List; import java.util.function.Supplier; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLParameters; @@ -33,6 +34,16 @@ public final class DefaultJedisClientConfig implements JedisClientConfig { private final AuthXManager authXManager; + /** + * tracking prefix list + */ + private final List trackingPrefixList; + + /** + * tracking mode(true:default; false:broadcast) + */ + private final boolean trackingModeOnDefault; + private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.redisProtocol = builder.redisProtocol; this.connectionTimeoutMillis = builder.connectionTimeoutMillis; @@ -50,6 +61,8 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.clientSetInfoConfig = builder.clientSetInfoConfig; this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas; this.authXManager = builder.authXManager; + this.trackingPrefixList = builder.trackingPrefixList; + this.trackingModeOnDefault = builder.trackingModeOnDefault; } @Override @@ -143,6 +156,16 @@ public boolean isReadOnlyForRedisClusterReplicas() { return readOnlyForRedisClusterReplicas; } + @Override + public List getTrackingPrefixList() { + return trackingPrefixList; + } + + @Override + public boolean getTrackingModeOnDefault() { + return trackingModeOnDefault; + } + public static Builder builder() { return new Builder(); } @@ -175,6 +198,10 @@ public static class Builder { private AuthXManager authXManager = null; + private List trackingPrefixList = null; + + private boolean trackingModeOnDefault = true; + private Builder() { } @@ -297,6 +324,16 @@ public Builder authXManager(AuthXManager authXManager) { return this; } + public Builder trackingPrefixList(List trackingPrefixList) { + this.trackingPrefixList = trackingPrefixList; + return this; + } + + public Builder trackingModeOnDefault(boolean trackingModeOnDefault) { + this.trackingModeOnDefault = trackingModeOnDefault; + return this; + } + public Builder from(JedisClientConfig instance) { this.redisProtocol = instance.getRedisProtocol(); this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis(); @@ -314,6 +351,8 @@ public Builder from(JedisClientConfig instance) { this.clientSetInfoConfig = instance.getClientSetInfoConfig(); this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas(); this.authXManager = instance.getAuthXManager(); + this.trackingPrefixList = instance.getTrackingPrefixList(); + this.trackingModeOnDefault = instance.getTrackingModeOnDefault(); return this; } } @@ -375,6 +414,8 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) { } builder.authXManager(copy.getAuthXManager()); + builder.trackingPrefixList(copy.getTrackingPrefixList()); + builder.trackingModeOnDefault(copy.getTrackingModeOnDefault()); return builder.build(); } From 3072312ad8be43e4ac10daf10584fb44bebf3521 Mon Sep 17 00:00:00 2001 From: "pu.wang" Date: Thu, 16 Jan 2025 14:55:06 +0800 Subject: [PATCH 3/4] docs(csc): add docs --- .../clients/jedis/ConnectionFactory.java | 22 +++++++++++++++---- .../clients/jedis/JedisClientConfig.java | 6 +++++ .../clients/jedis/csc/CacheConnection.java | 6 ++++- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/main/java/redis/clients/jedis/ConnectionFactory.java b/src/main/java/redis/clients/jedis/ConnectionFactory.java index c20f8f7731..bc1de6832a 100644 --- a/src/main/java/redis/clients/jedis/ConnectionFactory.java +++ b/src/main/java/redis/clients/jedis/ConnectionFactory.java @@ -34,7 +34,17 @@ public class ConnectionFactory implements PooledObjectFactory { private final Supplier objectMaker; private final AuthXEventListener authXEventListener; + /** + * Only one connection is maintained between a client and a server node for tracking and receiving invalidation messages. + *

+ * This is done to avoid the server sending duplicate messages to multiple connections, + * thereby reducing the CPU consumption of the server. + */ private CacheConnection trackingConnection = null; + + /** + * The single thread executor for listening invalidation messages. + */ private ScheduledExecutorService invalidationListeningExecutor = null; public ConnectionFactory(final HostAndPort hostAndPort) { @@ -80,6 +90,9 @@ private ConnectionFactory(final JedisSocketFactory jedisSocketFactory, } } + /** + * Create a "tracking" connection and start tracking and listen invalidation messages periodically. + */ @Experimental private void initializeTrackingConnection() { trackingConnection = new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache); @@ -88,12 +101,12 @@ private void initializeTrackingConnection() { } /** - * tracking on broadcasting mode + * Tracking on broadcasting mode. */ @Experimental private void tracking() { List trackingPrefixList = clientConfig.getTrackingPrefixList(); - // If no prefix is set, the prefix is "" + // if no prefix is set, the prefix is "". if (trackingPrefixList == null) { trackingPrefixList = new ArrayList<>(); trackingPrefixList.add(""); @@ -106,7 +119,7 @@ private void tracking() { } /** - * Start a scheduled task to listen for invalidation event + * Start a scheduled task to listen for invalidation event. */ @Experimental private void startInvalidationListenerThread() { @@ -114,10 +127,11 @@ private void startInvalidationListenerThread() { if (trackingConnection.isBroken() || !trackingConnection.isConnected() || !trackingConnection.ping()) { // flush cache(broadcasting mode only trackingConnection disconnect) clientSideCache.flush(); + // reconnect and enable tracking try { trackingConnection.connect(); } catch (Exception e) { - // TODO + // do something } tracking(); } diff --git a/src/main/java/redis/clients/jedis/JedisClientConfig.java b/src/main/java/redis/clients/jedis/JedisClientConfig.java index bb71901567..9e2e47f8c5 100644 --- a/src/main/java/redis/clients/jedis/JedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/JedisClientConfig.java @@ -57,10 +57,16 @@ default AuthXManager getAuthXManager() { return null; } + /** + * @return tracking prefix list. + */ default List getTrackingPrefixList() { return null; } + /** + * @return {@code true} - default mode. {@code false} - broadcasting mode. + */ default boolean getTrackingModeOnDefault() { return true; } diff --git a/src/main/java/redis/clients/jedis/csc/CacheConnection.java b/src/main/java/redis/clients/jedis/csc/CacheConnection.java index ce44a88ec8..70f32d3214 100644 --- a/src/main/java/redis/clients/jedis/csc/CacheConnection.java +++ b/src/main/java/redis/clients/jedis/csc/CacheConnection.java @@ -14,6 +14,9 @@ public class CacheConnection extends Connection { + /** + * tracking mode(true:default; false:broadcast) + */ private boolean isDefaultMode = true; private final Cache cache; private ReentrantLock lock; @@ -34,6 +37,7 @@ public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig } this.cache = Objects.requireNonNull(cache); this.isDefaultMode = clientConfig.getTrackingModeOnDefault(); + // only default mode if (isDefaultMode) { initializeClientSideCache(); } @@ -86,7 +90,7 @@ public T executeCommand(final CommandObject commandObject) { CacheEntry cacheEntry = cache.get(cacheKey); if (cacheEntry != null) { // (probable) CACHE HIT !! if (!isDefaultMode) { - // Broadcast mode returns directly + // broadcast mode returns directly cache.getStats().hit(); return cacheEntry.getValue(); } From f810b812d151da6b5860f7b2daddf1112ed662f5 Mon Sep 17 00:00:00 2001 From: "pu.wang" Date: Thu, 20 Mar 2025 19:58:46 +0800 Subject: [PATCH 4/4] feat(csc): add tracking config --- .../clients/jedis/ConnectionFactory.java | 9 ++-- .../jedis/DefaultJedisClientConfig.java | 43 +++++-------------- .../clients/jedis/JedisClientConfig.java | 24 ++++------- .../clients/jedis/csc/CacheConnection.java | 2 +- .../clients/jedis/csc/TrackingConfig.java | 37 ++++++++++++++++ .../java/redis/clients/jedis/csc/CSCTest.java | 13 ++---- 6 files changed, 65 insertions(+), 63 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/csc/TrackingConfig.java diff --git a/src/main/java/redis/clients/jedis/ConnectionFactory.java b/src/main/java/redis/clients/jedis/ConnectionFactory.java index bc1de6832a..87a8031225 100644 --- a/src/main/java/redis/clients/jedis/ConnectionFactory.java +++ b/src/main/java/redis/clients/jedis/ConnectionFactory.java @@ -59,7 +59,7 @@ public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache csCache) { this(new DefaultJedisSocketFactory(hostAndPort, clientConfig), clientConfig, csCache); - if (!clientConfig.getTrackingModeOnDefault()) { + if (!clientConfig.getTrackingConfig().isTrackingModeOnDefault()) { invalidationListeningExecutor = Executors.newSingleThreadScheduledExecutor(); // initialize tracking connection initializeTrackingConnection(); @@ -105,7 +105,7 @@ private void initializeTrackingConnection() { */ @Experimental private void tracking() { - List trackingPrefixList = clientConfig.getTrackingPrefixList(); + List trackingPrefixList = clientConfig.getTrackingConfig().getTrackingPrefixList(); // if no prefix is set, the prefix is "". if (trackingPrefixList == null) { trackingPrefixList = new ArrayList<>(); @@ -127,15 +127,16 @@ private void startInvalidationListenerThread() { if (trackingConnection.isBroken() || !trackingConnection.isConnected() || !trackingConnection.ping()) { // flush cache(broadcasting mode only trackingConnection disconnect) clientSideCache.flush(); - // reconnect and enable tracking + // create a new connection and enable tracking try { - trackingConnection.connect(); + trackingConnection = new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache); } catch (Exception e) { // do something } tracking(); } trackingConnection.readPushesWithCheckingBroken(); + // period? }, 2, 2, TimeUnit.SECONDS); } diff --git a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java index 59f020406a..5a6723731c 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java @@ -1,12 +1,12 @@ package redis.clients.jedis; -import java.util.List; import java.util.function.Supplier; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLSocketFactory; import redis.clients.jedis.authentication.AuthXManager; +import redis.clients.jedis.csc.TrackingConfig; public final class DefaultJedisClientConfig implements JedisClientConfig { @@ -34,15 +34,7 @@ public final class DefaultJedisClientConfig implements JedisClientConfig { private final AuthXManager authXManager; - /** - * tracking prefix list - */ - private final List trackingPrefixList; - - /** - * tracking mode(true:default; false:broadcast) - */ - private final boolean trackingModeOnDefault; + private final TrackingConfig trackingConfig; private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.redisProtocol = builder.redisProtocol; @@ -61,8 +53,7 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.clientSetInfoConfig = builder.clientSetInfoConfig; this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas; this.authXManager = builder.authXManager; - this.trackingPrefixList = builder.trackingPrefixList; - this.trackingModeOnDefault = builder.trackingModeOnDefault; + this.trackingConfig = builder.trackingConfig; } @Override @@ -157,13 +148,8 @@ public boolean isReadOnlyForRedisClusterReplicas() { } @Override - public List getTrackingPrefixList() { - return trackingPrefixList; - } - - @Override - public boolean getTrackingModeOnDefault() { - return trackingModeOnDefault; + public TrackingConfig getTrackingConfig() { + return trackingConfig; } public static Builder builder() { @@ -198,9 +184,7 @@ public static class Builder { private AuthXManager authXManager = null; - private List trackingPrefixList = null; - - private boolean trackingModeOnDefault = true; + private TrackingConfig trackingConfig = TrackingConfig.DEFAULT; private Builder() { } @@ -324,13 +308,8 @@ public Builder authXManager(AuthXManager authXManager) { return this; } - public Builder trackingPrefixList(List trackingPrefixList) { - this.trackingPrefixList = trackingPrefixList; - return this; - } - - public Builder trackingModeOnDefault(boolean trackingModeOnDefault) { - this.trackingModeOnDefault = trackingModeOnDefault; + public Builder trackingConfig(TrackingConfig trackingConfig) { + this.trackingConfig = trackingConfig; return this; } @@ -351,8 +330,7 @@ public Builder from(JedisClientConfig instance) { this.clientSetInfoConfig = instance.getClientSetInfoConfig(); this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas(); this.authXManager = instance.getAuthXManager(); - this.trackingPrefixList = instance.getTrackingPrefixList(); - this.trackingModeOnDefault = instance.getTrackingModeOnDefault(); + this.trackingConfig = instance.getTrackingConfig(); return this; } } @@ -414,8 +392,7 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) { } builder.authXManager(copy.getAuthXManager()); - builder.trackingPrefixList(copy.getTrackingPrefixList()); - builder.trackingModeOnDefault(copy.getTrackingModeOnDefault()); + builder.trackingConfig(copy.getTrackingConfig()); return builder.build(); } diff --git a/src/main/java/redis/clients/jedis/JedisClientConfig.java b/src/main/java/redis/clients/jedis/JedisClientConfig.java index 9e2e47f8c5..0ff8ced74f 100644 --- a/src/main/java/redis/clients/jedis/JedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/JedisClientConfig.java @@ -1,12 +1,12 @@ package redis.clients.jedis; -import java.util.List; import java.util.function.Supplier; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLSocketFactory; import redis.clients.jedis.authentication.AuthXManager; +import redis.clients.jedis.csc.TrackingConfig; public interface JedisClientConfig { @@ -57,20 +57,6 @@ default AuthXManager getAuthXManager() { return null; } - /** - * @return tracking prefix list. - */ - default List getTrackingPrefixList() { - return null; - } - - /** - * @return {@code true} - default mode. {@code false} - broadcasting mode. - */ - default boolean getTrackingModeOnDefault() { - return true; - } - default int getDatabase() { return Protocol.DEFAULT_DATABASE; } @@ -130,4 +116,12 @@ default boolean isReadOnlyForRedisClusterReplicas() { default ClientSetInfoConfig getClientSetInfoConfig() { return ClientSetInfoConfig.DEFAULT; } + + /** + * Modify Tracking config(tracking mode, tracking prefixes) + * @return Tracking config + */ + default TrackingConfig getTrackingConfig() { + return TrackingConfig.DEFAULT; + } } diff --git a/src/main/java/redis/clients/jedis/csc/CacheConnection.java b/src/main/java/redis/clients/jedis/csc/CacheConnection.java index 70f32d3214..09ca39978e 100644 --- a/src/main/java/redis/clients/jedis/csc/CacheConnection.java +++ b/src/main/java/redis/clients/jedis/csc/CacheConnection.java @@ -36,7 +36,7 @@ public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig } } this.cache = Objects.requireNonNull(cache); - this.isDefaultMode = clientConfig.getTrackingModeOnDefault(); + this.isDefaultMode = clientConfig.getTrackingConfig().isTrackingModeOnDefault(); // only default mode if (isDefaultMode) { initializeClientSideCache(); diff --git a/src/main/java/redis/clients/jedis/csc/TrackingConfig.java b/src/main/java/redis/clients/jedis/csc/TrackingConfig.java new file mode 100644 index 0000000000..675ee00dc4 --- /dev/null +++ b/src/main/java/redis/clients/jedis/csc/TrackingConfig.java @@ -0,0 +1,37 @@ +package redis.clients.jedis.csc; + +import java.util.Collections; +import java.util.List; + +public final class TrackingConfig { + + /** + * tracking mode(true:default; false:broadcast) + */ + private final boolean trackingModeOnDefault; + + /** + * tracking prefix list(only broadcast mode) + */ + private final List trackingPrefixList; + + public TrackingConfig(List trackingPrefixList, boolean trackingModeOnDefault) { + this.trackingPrefixList = trackingPrefixList; + this.trackingModeOnDefault = trackingModeOnDefault; + } + + public boolean isTrackingModeOnDefault() { + return trackingModeOnDefault; + } + + public List getTrackingPrefixList() { + return trackingPrefixList; + } + + public static final TrackingConfig DEFAULT = new TrackingConfig(null, true); + + /** + * prefix: "" + */ + public static final TrackingConfig BROADCAST = new TrackingConfig(Collections.emptyList(), false); +} diff --git a/src/test/java/redis/clients/jedis/csc/CSCTest.java b/src/test/java/redis/clients/jedis/csc/CSCTest.java index 24bf76f7b7..294c15370d 100644 --- a/src/test/java/redis/clients/jedis/csc/CSCTest.java +++ b/src/test/java/redis/clients/jedis/csc/CSCTest.java @@ -7,7 +7,6 @@ import redis.clients.jedis.UnifiedJedis; import redis.clients.jedis.commands.ProtocolCommand; -import java.util.ArrayList; import java.util.List; public class CSCTest { @@ -15,14 +14,9 @@ public class CSCTest { private static CacheConfig cacheConfig; static { - List trackingPrefixList = new ArrayList<>(); - trackingPrefixList.add("v1"); - trackingPrefixList.add("v2"); - clientConfig = DefaultJedisClientConfig.builder() .resp3() // RESP3 protocol is required for client-side caching - .trackingModeOnDefault(false) // tracking mode(true:default; false:broadcast) - .trackingPrefixList(trackingPrefixList) // tracking prefix list(only broadcast mode) + .trackingConfig(TrackingConfig.BROADCAST) .build(); cacheConfig = getCacheConfig(); @@ -49,9 +43,8 @@ public boolean isCacheable(ProtocolCommand command, List keys) { public void testTrackingOnBroadcastMode() { HostAndPort node = HostAndPort.from("127.0.0.1:6379"); try (UnifiedJedis client = new UnifiedJedis(node, clientConfig, CacheFactory.getCache(cacheConfig))) { - String a = client.get("a"); - System.out.println(); - + String a1 = client.get("a"); + String a2 = client.get("a"); } } }