Skip to content

Commit

Permalink
INTERNAL: Change asyncGets return future type.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jan 23, 2024
1 parent a622c36 commit 40da6dd
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/main/java/net/spy/memcached/ArcusClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,12 @@ public GetFuture<Object> asyncGet(String key) {
}

@Override
public <T> OperationFuture<CASValue<T>> asyncGets(String key, Transcoder<T> tc) {
public <T> GetFuture<CASValue<T>> asyncGets(String key, Transcoder<T> tc) {
return this.getClient().asyncGets(key, tc);
}

@Override
public OperationFuture<CASValue<Object>> asyncGets(String key) {
public GetFuture<CASValue<Object>> asyncGets(String key) {
return this.getClient().asyncGets(key);
}

Expand Down
15 changes: 7 additions & 8 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SingleElementInfiniteIterator;
import net.spy.memcached.internal.result.GetsResultImpl;
import net.spy.memcached.internal.result.GetResult;
import net.spy.memcached.internal.result.GetResultImpl;
import net.spy.memcached.ops.CASOperationStatus;
Expand Down Expand Up @@ -936,16 +937,15 @@ public GetFuture<Object> asyncGet(final String key) {
* @throws IllegalStateException in the rare circumstance where queue
* is too full to accept any more requests
*/
public <T> OperationFuture<CASValue<T>> asyncGets(final String key,
public <T> GetFuture<CASValue<T>> asyncGets(final String key,
final Transcoder<T> tc) {

final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<CASValue<T>> rv =
new OperationFuture<CASValue<T>>(latch, operationTimeout);
final GetFuture<CASValue<T>> rv = new GetFuture<CASValue<T>>(latch, operationTimeout);

Operation op = opFact.gets(key,
new GetsOperation.Callback() {
private CASValue<T> val = null;
private GetResult<CASValue<T>> val = null;

public void receivedStatus(OperationStatus status) {
rv.set(val, status);
Expand All @@ -954,8 +954,7 @@ public void receivedStatus(OperationStatus status) {
public void gotData(String k, int flags, long cas, byte[] data) {
assert key.equals(k) : "Wrong key returned";
assert cas > 0 : "CAS was less than zero: " + cas;
val = new CASValue<T>(cas, tc.decode(
new CachedData(flags, data, tc.getMaxSize())));
val = new GetsResultImpl<T>(cas, new CachedData(flags, data, tc.getMaxSize()), tc);
}

public void complete() {
Expand All @@ -976,7 +975,7 @@ public void complete() {
* @throws IllegalStateException in the rare circumstance where queue
* is too full to accept any more requests
*/
public OperationFuture<CASValue<Object>> asyncGets(final String key) {
public GetFuture<CASValue<Object>> asyncGets(final String key) {
return asyncGets(key, transcoder);
}

Expand All @@ -993,7 +992,7 @@ public OperationFuture<CASValue<Object>> asyncGets(final String key) {
* is too full to accept any more requests
*/
public <T> CASValue<T> gets(String key, Transcoder<T> tc) {
OperationFuture<CASValue<T>> future = asyncGets(key, tc);
GetFuture<CASValue<T>> future = asyncGets(key, tc);
try {
return future.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand Down
45 changes: 7 additions & 38 deletions src/main/java/net/spy/memcached/internal/GetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.spy.memcached.internal.result.GetResult;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationStatus;

/**
Expand All @@ -17,51 +15,22 @@
*
* @param <T> Type of object returned from the get
*/
public class GetFuture<T> implements Future<T> {

private final OperationFuture<GetResult<T>> rv;
public class GetFuture<T> extends OperationFuture<T> {
private GetResult<T> result;

public GetFuture(CountDownLatch l, long opTimeout) {
this.rv = new OperationFuture<GetResult<T>>(l, opTimeout);
}

public GetFuture(GetFuture<T> parent) {
this.rv = parent.rv;
}

public boolean cancel(boolean ign) {
return rv.cancel(ign);
}

public T get() throws InterruptedException, ExecutionException {
GetResult<T> result = rv.get();
return result == null ? null : result.getDecodedValue();
super(l, opTimeout);
}

@Override
public T get(long duration, TimeUnit units)
throws InterruptedException, TimeoutException, ExecutionException {
GetResult<T> result = rv.get(duration, units);
super.get(duration, units); // for waiting latch.
return result == null ? null : result.getDecodedValue();
}

public OperationStatus getStatus() {
return rv.getStatus();
}

public void set(GetResult<T> result, OperationStatus status) {
rv.set(result, status);
}

public void setOperation(Operation to) {
rv.setOperation(to);
}

public boolean isCancelled() {
return rv.isCancelled();
}

public boolean isDone() {
return rv.isDone();
super.set(null, status);
this.result = result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package net.spy.memcached.internal.result;

import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.Transcoder;

public class GetsResultImpl<T> implements GetResult<CASValue<T>> {
private final long cas;
private final CachedData cachedData;
private final Transcoder<T> transcoder;
private volatile CASValue<T> decodedValue = null;

public GetsResultImpl(long cas, CachedData cachedData, Transcoder<T> transcoder) {
this.cas = cas;
this.cachedData = cachedData;
this.transcoder = transcoder;
}

@Override
public CASValue<T> getDecodedValue() {
if (decodedValue == null) {
decodedValue = new CASValue<T>(cas, transcoder.decode(cachedData));
}
return decodedValue;
}
}
34 changes: 25 additions & 9 deletions src/main/java/net/spy/memcached/plugin/FrontCacheGetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package net.spy.memcached.plugin;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.ops.OperationStatus;

/**
* Future returned for GET operations.
Expand All @@ -30,28 +32,42 @@
* @param <T> Type of object returned from the get
*/
public class FrontCacheGetFuture<T> extends GetFuture<T> {
private final GetFuture<T> parent;
private final LocalCacheManager localCacheManager;

private final String key;

public FrontCacheGetFuture(LocalCacheManager localCacheManager, String key, GetFuture<T> parent) {
super(parent);
super(new CountDownLatch(0), 1);
this.parent = parent;
this.localCacheManager = localCacheManager;
this.key = key;
}

@Override
public T get() throws InterruptedException, ExecutionException {
T t = super.get();
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
T t = parent.get(timeout, unit);
localCacheManager.put(key, t);
return t;
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
T t = super.get(timeout, unit);
localCacheManager.put(key, t);
return t;
public OperationStatus getStatus() {
return parent.getStatus();
}

@Override
public boolean cancel(boolean ign) {
return parent.cancel(ign);
}

@Override
public boolean isDone() {
return parent.isDone();
}

@Override
public boolean isCancelled() {
return parent.isCancelled();
}
}

0 comments on commit 40da6dd

Please sign in to comment.