Skip to content

Commit

Permalink
INTERNAL: Remove transcoder service logic in bulkGet apis.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Jan 24, 2024
1 parent 1aa7fa4 commit f42faec
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
21 changes: 11 additions & 10 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SingleElementInfiniteIterator;
import net.spy.memcached.internal.result.GetsResultImpl;
import net.spy.memcached.internal.result.GetResult;
import net.spy.memcached.internal.result.GetResultImpl;
import net.spy.memcached.internal.result.GetsResultImpl;
import net.spy.memcached.ops.CASOperationStatus;
import net.spy.memcached.ops.CancelledOperationStatus;
import net.spy.memcached.ops.ConcatenationType;
Expand Down Expand Up @@ -1079,7 +1079,7 @@ public Object get(String key) {
*/
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
Iterator<Transcoder<T>> tc_iter) {
final Map<String, Future<T>> rvMap = new ConcurrentHashMap<String, Future<T>>();
final Map<String, GetResult<T>> rvMap = new ConcurrentHashMap<String, GetResult<T>>();

// This map does not need to be a ConcurrentHashMap
// because it is fully populated when it is used and
Expand Down Expand Up @@ -1112,8 +1112,9 @@ public void receivedStatus(OperationStatus status) {

public void gotData(String k, int flags, byte[] data) {
Transcoder<T> tc = tc_map.get(k);
rvMap.put(k, tcService.decode(tc,
new CachedData(flags, data, tc.getMaxSize())));
GetResult<T> result
= new GetResultImpl<T>(new CachedData(flags, data, tc.getMaxSize()), tc);
rvMap.put(k, result);
}

public void complete() {
Expand Down Expand Up @@ -1213,8 +1214,8 @@ public BulkFuture<Map<String, Object>> asyncGetBulk(String... keys) {
*/
public <T> BulkFuture<Map<String, CASValue<T>>> asyncGetsBulk(Collection<String> keys,
Iterator<Transcoder<T>> tc_iter) {
final Map<String, Future<CASValue<T>>> m
= new ConcurrentHashMap<String, Future<CASValue<T>>>();
final Map<String, GetResult<CASValue<T>>> rvMap
= new ConcurrentHashMap<String, GetResult<CASValue<T>>>();

// This map does not need to be a ConcurrentHashMap
// because it is fully populated when it is used and
Expand Down Expand Up @@ -1248,9 +1249,9 @@ public void receivedStatus(OperationStatus status) {

public void gotData(String k, int flags, long cas, byte[] data) {
Transcoder<T> tc = tc_map.get(k);

m.put(k, tcService.decode(tc, cas,
new CachedData(flags, data, tc.getMaxSize())));
GetResult<CASValue<T>> result
= new GetsResultImpl<T>(cas, new CachedData(flags, data, tc.getMaxSize()), tc);
rvMap.put(k, result);
}

public void complete() {
Expand All @@ -1276,7 +1277,7 @@ public void complete() {
ops.add(op);
}
}
return new BulkGetFuture<CASValue<T>>(m, ops, latch, operationTimeout);
return new BulkGetFuture<CASValue<T>>(rvMap, ops, latch, operationTimeout);
}

/**
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/net/spy/memcached/internal/BulkGetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.internal.result.GetResult;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

Expand All @@ -40,13 +40,13 @@
* @param <T> types of objects returned from the GET
*/
public class BulkGetFuture<T> implements BulkFuture<Map<String, T>> {
private final Map<String, Future<T>> rvMap;
private final Map<String, GetResult<T>> rvMap;
private final Collection<Operation> ops;
private final CountDownLatch latch;
private final long timeout;
private boolean isTimeout = false;

public BulkGetFuture(Map<String, Future<T>> rvMap, Collection<Operation> ops,
public BulkGetFuture(Map<String, GetResult<T>> rvMap, Collection<Operation> ops,
CountDownLatch latch, Long timeout) {
super();
this.rvMap = rvMap;
Expand Down Expand Up @@ -164,10 +164,9 @@ private Map<String, T> internalGet(long to, TimeUnit unit,
}

Map<String, T> resultMap = new HashMap<String, T>();
for (Map.Entry<String, Future<T>> me : rvMap.entrySet()) {
for (Map.Entry<String, GetResult<T>> me : rvMap.entrySet()) {
String key = me.getKey();
Future<T> future = me.getValue();
T value = future.get();
T value = me.getValue().getDecodedValue();

// put the key into the result map.
resultMap.put(key, value);
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/net/spy/memcached/ProtocolBaseCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ public void testAsyncGetBulkWithTranscoderIterator() throws Exception {
tcs.add(0, decodeTranscoder);
try {
client.asyncGetBulk(keys, tcs.listIterator()).get();
fail("Expected ExecutionException caused by key mismatch");
} catch (java.util.concurrent.ExecutionException e) {
fail("Expected ComparisonFailure caused by key mismatch");
} catch (AssertionError e) {
// pass
}
}
Expand Down Expand Up @@ -552,8 +552,8 @@ public void testAsyncGetsBulkWithTranscoderIterator() throws Exception {
tcs.add(0, decodeTranscoder);
try {
client.asyncGetsBulk(keys, tcs.listIterator()).get();
fail("Expected ExecutionException caused by key mismatch");
} catch (java.util.concurrent.ExecutionException e) {
fail("Expected ComparisonFailure caused by key mismatch");
} catch (AssertionError e) {
// pass
}
}
Expand Down

0 comments on commit f42faec

Please sign in to comment.