diff --git a/docs/03-key-value-API.md b/docs/03-key-value-API.md index b3c442567..9b1de31a4 100644 --- a/docs/03-key-value-API.md +++ b/docs/03-key-value-API.md @@ -106,16 +106,40 @@ future.get(key).getStatusCode() | 설명 --------------------------------| --------- StatusCode.SUCCESS | 조회 성공(key에 해당하는 item 존재하지 않아도 성공) -여러 key들의 value들을 한번에 조회하는 bulk API를 제공한다. +여러 key들의 value를 한번에 조회하는 bulk API를 제공한다. ```java -BulkFuture> asyncGetBulk(Collection keys) -BulkFuture> asyncGetBulk(String... keys) +BulkFuture> asyncGetBulk(Collection keys) +BulkFuture> asyncGetBulk(String... keys) ``` - 다수 key들에 저장된 value를 Map 형태로 반환한다. - 다수 key들은 String 유형의 Collection이거나 String 유형의 나열된 key 목록일 수 있다. +하나의 key를 가진 cache item에 저장된 CASValue를 조회하는 API를 제공한다. + +```java +GetFuture> asyncGets(String key) +``` + +- 주어진 key에 저장된 CASValue(cas, value)를 반환한다. + +수행 결과는 future 객체를 통해 얻는다. + +future.get(key).getStatusCode() | 설명 +--------------------------------| --------- +StatusCode.SUCCESS | 조회 성공(key에 해당하는 item 존재하지 않아도 성공) + +여러 key들의 CASValue를 한번에 조회하는 bulk API를 제공한다. + +```java +BulkFuture>> asyncGetsBulk(Collection keys) +BulkFuture>> asyncGetsBulk(String... keys) +``` + +- 다수 key들에 저장된 CASValue를 Map> 형태로 반환한다. +- 다수 key들은 String 유형의 Collection이거나 String 유형의 나열된 key 목록일 수 있다. + ## Key-Value Item 값의 증감 key-value item에서 value 부분의 값을 증가시키거나 감소시키는 연산이다. diff --git a/src/main/java/net/spy/memcached/ArcusClientPool.java b/src/main/java/net/spy/memcached/ArcusClientPool.java index 95c9732cb..7a72a0ee6 100644 --- a/src/main/java/net/spy/memcached/ArcusClientPool.java +++ b/src/main/java/net/spy/memcached/ArcusClientPool.java @@ -210,6 +210,29 @@ public BulkFuture> asyncGetBulk(String... keys) { return this.getClient().asyncGetBulk(keys); } + public BulkFuture>> asyncGetsBulk(Collection keys, + Iterator> tcs) { + return this.getClient().asyncGetsBulk(keys, tcs); + } + + public BulkFuture>> asyncGetsBulk(Collection keys, + Transcoder tc) { + return this.getClient().asyncGetsBulk(keys, tc); + } + + public BulkFuture>> asyncGetsBulk(Collection keys) { + return this.getClient().asyncGetsBulk(keys); + } + + public BulkFuture>> asyncGetsBulk(Transcoder tc, + String... keys) { + return this.getClient().asyncGetsBulk(tc, keys); + } + + public BulkFuture>> asyncGetsBulk(String... keys) { + return this.getClient().asyncGetsBulk(keys); + } + public Map getBulk(Collection keys, Transcoder tc) throws OperationTimeoutException { return this.getClient().getBulk(keys, tc); @@ -230,6 +253,26 @@ public Map getBulk(String... keys) return this.getClient().getBulk(keys); } + public Map> getsBulk(Collection keys, Transcoder tc) + throws OperationTimeoutException { + return this.getClient().getsBulk(keys, tc); + } + + public Map> getsBulk(Collection keys) + throws OperationTimeoutException { + return this.getClient().getsBulk(keys); + } + + public Map> getsBulk(Transcoder tc, String... keys) + throws OperationTimeoutException { + return this.getClient().getsBulk(tc, keys); + } + + public Map> getsBulk(String... keys) + throws OperationTimeoutException { + return this.getClient().getsBulk(keys); + } + public Map getVersions() { return this.getClient().getVersions(); } diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index c809b76a1..636db34d3 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -1003,8 +1003,9 @@ public OperationFuture> asyncGets(final String key) { * is too full to accept any more requests */ public CASValue gets(String key, Transcoder tc) { + OperationFuture> future = asyncGets(key, tc); try { - return asyncGets(key, tc).get( + return future.get( operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); @@ -1098,9 +1099,6 @@ public BulkFuture> asyncGetBulk(Collection keys, // Break the gets down into groups by key final Map>> chunks = new HashMap>>(); - final Map chunkCount - = new HashMap(); - final NodeLocator locator = conn.getLocator(); Iterator key_iter = keys.iterator(); while (key_iter.hasNext() && tc_iter.hasNext()) { String key = key_iter.next(); @@ -1119,53 +1117,15 @@ public T call() throws Exception { continue; } } - tc_map.put(key, tc); validateKey(key); - final MemcachedNode primaryNode = conn.getPrimaryNode(key); - MemcachedNode node = null; - // FIXME. Support FailureMode. See MemcachedConnection.addOperation. - if (primaryNode == null) { - node = null; - } else if (primaryNode.isActive() || primaryNode.isFirstConnecting()) { - node = primaryNode; - } else { - Iterator iter = conn.getNodeSequence(key); - while (node == null && iter.hasNext()) { - MemcachedNode n = iter.next(); - if (n.isActive()) { - node = n; - } - } - if (node == null) { - node = primaryNode; - } - } - List> lks = chunks.get(node); - if (lks == null) { - lks = new ArrayList>(); - Collection ts = new ArrayList(); - lks.add(0, ts); - chunks.put(node, lks); - chunkCount.put(node, 0); - } - if (lks.get(chunkCount.get(node)).size() >= GET_BULK_CHUNK_SIZE) { - int count = chunkCount.get(node) + 1; - Collection ts = new ArrayList(); - lks.add(count, ts); - chunkCount.put(node, count); - } - Collection ks = lks.get(chunkCount.get(node)); - ks.add(key); + MemcachedNode node = findNodeByKey(key); + addKeyToChunk(chunks, key, node); } - int chunk_size = 0; - for (Map.Entry counts - : chunkCount.entrySet()) { - chunk_size += counts.getValue() + 1; - } - final CountDownLatch latch = new CountDownLatch(chunk_size); - final Collection ops = new ArrayList(chunk_size); + int wholeChunkSize = getWholeChunkSize(chunks); + final CountDownLatch latch = new CountDownLatch(wholeChunkSize); + final Collection ops = new ArrayList(wholeChunkSize); GetOperation.Callback cb = new GetOperation.Callback() { public void receivedStatus(OperationStatus status) { @@ -1191,14 +1151,14 @@ public void complete() { for (Map.Entry>> me : chunks.entrySet()) { MemcachedNode node = me.getKey(); - for (int i = 0; i <= chunkCount.get(node); i++) { + for (Collection lk : me.getValue()) { Operation op; if (node == null) { - op = opFact.mget(me.getValue().get(i), cb); + op = opFact.mget(lk, cb); op.cancel("no node"); } else { - op = node.enabledMGetOp() ? opFact.mget(me.getValue().get(i), cb) - : opFact.get(me.getValue().get(i), cb); + op = node.enabledMGetOp() ? opFact.mget(lk, cb) + : opFact.get(lk, cb); conn.addOperation(node, op); } ops.add(op); @@ -1236,7 +1196,7 @@ public BulkFuture> asyncGetBulk(Collection keys) { } /** - * Varargs wrapper for asynchronous bulk gets. + * Varargs wrapper for asynchronous bulk get. * * @param * @param tc the transcoder to serialize and unserialize value @@ -1251,7 +1211,7 @@ public BulkFuture> asyncGetBulk(Transcoder tc, } /** - * Varargs wrapper for asynchronous bulk gets with the default transcoder. + * Varargs wrapper for asynchronous bulk get with the default transcoder. * * @param keys one more more keys to get * @return the future values of those keys @@ -1262,6 +1222,207 @@ public BulkFuture> asyncGetBulk(String... keys) { return asyncGetBulk(Arrays.asList(keys), transcoder); } + /** + * Asynchronously gets (with CAS support) a bunch of objects from the cache. + * + * @param + * @param keys the keys to request + * @param tc_iter an iterator of transcoders to serialize and + * unserialize values; the transcoders are matched with + * the keys in the same order. The minimum of the key + * collection length and number of transcoders is used + * and no exception is thrown if they do not match + * @return a Future result of that fetch + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public BulkFuture>> asyncGetsBulk(Collection keys, + Iterator> tc_iter) { + final Map>> m + = new ConcurrentHashMap>>(); + + // This map does not need to be a ConcurrentHashMap + // because it is fully populated when it is used and + // used only to read the transcoder for a key. + final Map> tc_map = new HashMap>(); + + // Break the gets down into groups by key + final Map>> chunks + = new HashMap>>(); + Iterator key_iter = keys.iterator(); + while (key_iter.hasNext() && tc_iter.hasNext()) { + String key = key_iter.next(); + Transcoder tc = tc_iter.next(); + + tc_map.put(key, tc); + validateKey(key); + MemcachedNode node = findNodeByKey(key); + addKeyToChunk(chunks, key, node); + } + + int wholeChunkSize = getWholeChunkSize(chunks); + final CountDownLatch latch = new CountDownLatch(wholeChunkSize); + final Collection ops = new ArrayList(wholeChunkSize); + + GetsOperation.Callback cb = new GetsOperation.Callback() { + public void receivedStatus(OperationStatus status) { + if (!status.isSuccess()) { + getLogger().warn("Unsuccessful gets: %s", status); + } + } + + public void gotData(String k, int flags, long cas, byte[] data) { + Transcoder tc = tc_map.get(k); + + m.put(k, tcService.decode(tc, cas, + new CachedData(flags, data, tc.getMaxSize()))); + } + + public void complete() { + latch.countDown(); + } + }; + + // Now that we know how many servers it breaks down into, and the latch + // is all set up, convert all of these strings collections to operations + checkState(); + for (Map.Entry>> me + : chunks.entrySet()) { + MemcachedNode node = me.getKey(); + for (Collection lk : me.getValue()) { + Operation op; + if (node == null) { + op = opFact.mgets(lk, cb); + op.cancel("no node"); + } else { + op = node.enabledMGetsOp() ? opFact.mgets(lk, cb) + : opFact.gets(lk, cb); + conn.addOperation(node, op); + } + ops.add(op); + } + } + return new BulkGetFuture>(m, ops, latch); + } + + /** + * find node by key + * @param key the key to request + * @return primary node + */ + private MemcachedNode findNodeByKey(String key) { + final MemcachedNode primaryNode = conn.getPrimaryNode(key); + MemcachedNode node = null; + // FIXME. Support FailureMode. See MemcachedConnection.addOperation. + if (primaryNode == null) { + node = null; + } else if (primaryNode.isActive() || primaryNode.isFirstConnecting()) { + node = primaryNode; + } else { + Iterator iter = conn.getNodeSequence(key); + while (node == null && iter.hasNext()) { + MemcachedNode n = iter.next(); + if (n.isActive()) { + node = n; + } + } + if (node == null) { + node = primaryNode; + } + } + return node; + } + + /** + * add key to chunks + * @param chunks collection list that sorted by node + * @param key the key to request + * @param node primary node to request + */ + private void addKeyToChunk(Map>> chunks, + String key, MemcachedNode node) { + List> lks = chunks.get(node); + if (lks == null) { + lks = new ArrayList>(); + Collection ts = new ArrayList(); + lks.add(ts); + chunks.put(node, lks); + } + if (lks.get(lks.size() - 1).size() >= GET_BULK_CHUNK_SIZE) { + lks.add(new ArrayList()); + } + lks.get(lks.size() - 1).add(key); + } + + /** + * get size of whole chunk by node + * @param chunks collection list that sorted by node + * @return size of whole chunk + */ + private int getWholeChunkSize(Map>> chunks) { + int wholeChunkSize = 0; + for (Map.Entry>> counts + : chunks.entrySet()) { + wholeChunkSize += counts.getValue().size(); + } + return wholeChunkSize; + } + + /** + * Asynchronously gets (with CAS support) a bunch of objects from the cache. + * + * @param + * @param keys the keys to request + * @param tc the transcoder to serialize and unserialize values + * @return a Future result of that fetch + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public BulkFuture>> asyncGetsBulk(Collection keys, + Transcoder tc) { + return asyncGetsBulk(keys, new SingleElementInfiniteIterator>(tc)); + } + + /** + * Asynchronously gets (with CAS support) a bunch of objects from the cache and decode them + * with the given transcoder. + * + * @param keys the keys to request + * @return a Future result of that fetch + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public BulkFuture>> asyncGetsBulk(Collection keys) { + return asyncGetsBulk(keys, transcoder); + } + + /** + * Varargs wrapper for asynchronous bulk gets. + * + * @param + * @param tc the transcoder to serialize and unserialize value + * @param keys one more more keys to get + * @return the future values of those keys + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public BulkFuture>> asyncGetsBulk(Transcoder tc, + String... keys) { + return asyncGetsBulk(Arrays.asList(keys), tc); + } + + /** + * Varargs wrapper for asynchronous bulk gets (with CAS support) with the default transcoder. + * + * @param keys one more more keys to get + * @return the future values of those keys + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public BulkFuture>> asyncGetsBulk(String... keys) { + return asyncGetsBulk(Arrays.asList(keys), transcoder); + } + /** * Get the values for multiple keys from the cache. * @@ -1332,6 +1493,77 @@ public Map getBulk(String... keys) { return getBulk(Arrays.asList(keys), transcoder); } + /** + * Gets (with CAS support) values for multiple keys from the cache. + * + * @param + * @param keys the keys + * @param tc the transcoder to serialize and unserialize value + * @return a map of the CAS values (for each value that exists) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public Map> getsBulk(Collection keys, + Transcoder tc) { + try { + return asyncGetsBulk(keys, tc).get( + operationTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted getting bulk values", e); + } catch (ExecutionException e) { + throw new RuntimeException("Failed getting bulk values", e); + } catch (TimeoutException e) { + throw new OperationTimeoutException(e.toString(), e); + } + } + + /** + * Gets (with CAS support) values for multiple keys from the cache. + * + * @param keys the keys + * @return a map of the CAS values (for each value that exists) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public Map> getsBulk(Collection keys) { + return getsBulk(keys, transcoder); + } + + /** + * Gets (with CAS support) values for multiple keys from the cache. + * + * @param + * @param tc the transcoder to serialize and unserialize value + * @param keys the keys + * @return a map of the CAS values (for each value that exists) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public Map> getsBulk(Transcoder tc, String... keys) { + return getsBulk(Arrays.asList(keys), tc); + } + + /** + * Gets (with CAS support) values for multiple keys from the cache. + * + * @param keys the keys + * @return a map of the CAS values (for each value that exists) + * @throws OperationTimeoutException if the global operation timeout is + * exceeded + * @throws IllegalStateException in the rare circumstance where queue + * is too full to accept any more requests + */ + public Map> getsBulk(String... keys) { + return getsBulk(Arrays.asList(keys), transcoder); + } + + /** * Get the versions of all of the connected memcacheds. * diff --git a/src/main/java/net/spy/memcached/MemcachedClientIF.java b/src/main/java/net/spy/memcached/MemcachedClientIF.java index 5fef46762..a07089e6c 100644 --- a/src/main/java/net/spy/memcached/MemcachedClientIF.java +++ b/src/main/java/net/spy/memcached/MemcachedClientIF.java @@ -105,6 +105,19 @@ BulkFuture> asyncGetBulk(Transcoder tc, BulkFuture> asyncGetBulk(String... keys); + BulkFuture>> asyncGetsBulk(Collection keys, + Iterator> tcs); + + BulkFuture>> asyncGetsBulk(Collection keys, + Transcoder tc); + + BulkFuture>> asyncGetsBulk(Collection keys); + + BulkFuture>> asyncGetsBulk(Transcoder tc, + String... keys); + + BulkFuture>> asyncGetsBulk(String... keys); + Map getBulk(Collection keys, Transcoder tc) throws OperationTimeoutException; @@ -117,6 +130,18 @@ Map getBulk(Transcoder tc, String... keys) Map getBulk(String... keys) throws OperationTimeoutException; + Map> getsBulk(Collection keys, Transcoder tc) + throws OperationTimeoutException; + + Map> getsBulk(Collection keys) + throws OperationTimeoutException; + + Map> getsBulk(Transcoder tc, String... keys) + throws OperationTimeoutException; + + Map> getsBulk(String... keys) + throws OperationTimeoutException; + Map getVersions(); Map> getStats(); diff --git a/src/main/java/net/spy/memcached/MemcachedNode.java b/src/main/java/net/spy/memcached/MemcachedNode.java index e5ff2644c..086b422ec 100644 --- a/src/main/java/net/spy/memcached/MemcachedNode.java +++ b/src/main/java/net/spy/memcached/MemcachedNode.java @@ -216,6 +216,11 @@ public interface MemcachedNode { */ boolean enabledMGetOp(); + /** + * Check the enable MGets operation. + */ + boolean enabledMGetsOp(); + /** * Check the enable SpaceSeparate operation. */ diff --git a/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java b/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java index bc8d4114c..d8637e80f 100644 --- a/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java +++ b/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java @@ -168,6 +168,10 @@ public boolean enabledMGetOp() { throw new UnsupportedOperationException(); } + public boolean enabledMGetsOp() { + throw new UnsupportedOperationException(); + } + public boolean enabledSpaceSeparate() { throw new UnsupportedOperationException(); } diff --git a/src/main/java/net/spy/memcached/OperationFactory.java b/src/main/java/net/spy/memcached/OperationFactory.java index 8acdda281..bf5678db4 100644 --- a/src/main/java/net/spy/memcached/OperationFactory.java +++ b/src/main/java/net/spy/memcached/OperationFactory.java @@ -142,6 +142,15 @@ public interface OperationFactory { */ GetOperation get(Collection keys, GetOperation.Callback cb); + /** + * Create a gets operation. + * + * @param keys the collection of keys to get + * @param cb the callback that will contain the results + * @return a new GetsOperation + */ + GetsOperation gets(Collection keys, GetsOperation.Callback cb); + /** * Create a mget operation. * @@ -151,6 +160,15 @@ public interface OperationFactory { */ GetOperation mget(Collection keys, GetOperation.Callback cb); + /** + * Create a mgets operation. + * + * @param keys the collection of keys to get + * @param cb the callback that will contain the results + * @return a new GetOperation + */ + GetsOperation mgets(Collection keys, GetsOperation.Callback cb); + /** * Create a mutator operation. * diff --git a/src/main/java/net/spy/memcached/ops/APIType.java b/src/main/java/net/spy/memcached/ops/APIType.java index 33e5c777d..272dd111c 100644 --- a/src/main/java/net/spy/memcached/ops/APIType.java +++ b/src/main/java/net/spy/memcached/ops/APIType.java @@ -24,7 +24,8 @@ public enum APIType { CAS(OperationType.WRITE), INCR(OperationType.WRITE), DECR(OperationType.WRITE), DELETE(OperationType.WRITE), - GET(OperationType.READ), GETS(OperationType.READ), MGET(OperationType.READ), + GET(OperationType.READ), GETS(OperationType.READ), + MGET(OperationType.READ), MGETS(OperationType.READ), // List API Type LOP_CREATE(OperationType.WRITE), diff --git a/src/main/java/net/spy/memcached/ops/BaseOperationFactory.java b/src/main/java/net/spy/memcached/ops/BaseOperationFactory.java index 749cc0520..25390e9cc 100644 --- a/src/main/java/net/spy/memcached/ops/BaseOperationFactory.java +++ b/src/main/java/net/spy/memcached/ops/BaseOperationFactory.java @@ -44,12 +44,16 @@ public Collection clone(KeyedOperation op) { Collection rv = new ArrayList( op.getKeys().size()); if (op instanceof GetOperation) { - rv.addAll(cloneGet(op)); + GetOperation.Callback getCb = new MultiGetOperationCallback( + op.getCallback(), op.getKeys().size()); + for (String k : op.getKeys()) { + rv.add(get(k, getCb)); + } } else if (op instanceof GetsOperation) { - GetsOperation.Callback callback = - (GetsOperation.Callback) op.getCallback(); + GetsOperation.Callback getsCb = new MultiGetsOperationCallback( + op.getCallback(), op.getKeys().size()); for (String k : op.getKeys()) { - rv.add(gets(k, callback)); + rv.add(gets(k, getsCb)); } } else if (op instanceof CASOperation) { CASOperation cop = (CASOperation) op; @@ -98,11 +102,6 @@ public Collection clone(KeyedOperation op) { } else { assert false : "Unhandled operation type: " + op.getClass(); } - return rv; } - - protected abstract Collection cloneGet( - KeyedOperation op); - } diff --git a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index 02c85b8db..721e49e2b 100644 --- a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -69,6 +69,7 @@ public abstract class TCPMemcachedNodeImpl extends SpyObject private String version = null; private boolean isAsciiProtocol = true; private boolean enabledMGetOp = false; + private boolean enabledMGetsOp = false; private boolean enabledSpaceSeparate = false; // operation Future.get timeout counter @@ -496,37 +497,43 @@ public final SelectionKey getSk() { public final void setVersion(String vr) { version = vr; - setEnableMGetOp(); - setEnableSpaceSeparate(); + StringTokenizer tokens = new StringTokenizer(version, "."); + int majorVersion = Integer.parseInt(tokens.nextToken()); + int minorVersion = Integer.parseInt(tokens.nextToken()); + boolean isEnterprise = version.contains("E"); + if (isAsciiProtocol) { + setEnableMGetOp(majorVersion, minorVersion, isEnterprise); + setEnableMGetsOp(majorVersion, minorVersion, isEnterprise); + setEnableSpaceSeparate(majorVersion, minorVersion, isEnterprise); + } } public final String getVersion() { return version; } - private final void setEnableMGetOp() { - if (isAsciiProtocol) { - StringTokenizer tokens = new StringTokenizer(version, "."); - int majorVersion = Integer.parseInt(tokens.nextToken()); - int minorVersion = Integer.parseInt(tokens.nextToken()); - if (version.contains("E")) { - enabledMGetOp = (majorVersion > 0 || (majorVersion == 0 && minorVersion > 6)); - } else { - enabledMGetOp = (majorVersion > 1 || (majorVersion == 1 && minorVersion > 10)); - } + private void setEnableMGetOp(int majorVersion, int minorVersion, boolean isEnterprise) { + if (isEnterprise) { + enabledMGetOp = (majorVersion > 0 || (majorVersion == 0 && minorVersion > 6)); + } else { + enabledMGetOp = (majorVersion > 1 || (majorVersion == 1 && minorVersion > 10)); } } - private final void setEnableSpaceSeparate() { - if (isAsciiProtocol) { - StringTokenizer tokens = new StringTokenizer(version, "."); - int majorVersion = Integer.parseInt(tokens.nextToken()); - int minorVersion = Integer.parseInt(tokens.nextToken()); - if (version.contains("E")) { - enabledSpaceSeparate = (majorVersion > 0 || (majorVersion == 0 && minorVersion > 6)); - } else { - enabledSpaceSeparate = (majorVersion > 1 || (majorVersion == 1 && minorVersion > 10)); - } + private void setEnableMGetsOp(int majorVersion, int minorVersion, boolean isEnterprise) { + if (isEnterprise) { + enabledMGetsOp = (majorVersion > 0 || (majorVersion == 0 && minorVersion > 8)); + } else { + enabledMGetsOp = (majorVersion > 1 || (majorVersion == 1 && minorVersion > 12)); + } + } + + private void setEnableSpaceSeparate(int majorVersion, int minorVersion, + boolean isEnterprise) { + if (isEnterprise) { + enabledSpaceSeparate = (majorVersion > 0 || (majorVersion == 0 && minorVersion > 6)); + } else { + enabledSpaceSeparate = (majorVersion > 1 || (majorVersion == 1 && minorVersion > 10)); } } @@ -534,6 +541,10 @@ public final boolean enabledMGetOp() { return enabledMGetOp; } + public final boolean enabledMGetsOp() { + return enabledMGetsOp; + } + public final boolean enabledSpaceSeparate() { return enabledSpaceSeparate; } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java b/src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java index dfcd5daed..6e14557a9 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java @@ -16,7 +16,6 @@ */ package net.spy.memcached.protocol.ascii; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -70,12 +69,9 @@ import net.spy.memcached.ops.GetAttrOperation; import net.spy.memcached.ops.GetOperation; import net.spy.memcached.ops.GetsOperation; -import net.spy.memcached.ops.KeyedOperation; -import net.spy.memcached.ops.MultiGetOperationCallback; import net.spy.memcached.ops.Mutator; import net.spy.memcached.ops.MutatorOperation; import net.spy.memcached.ops.NoopOperation; -import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.SASLAuthOperation; import net.spy.memcached.ops.SASLMechsOperation; @@ -111,10 +107,18 @@ public GetsOperation gets(String key, GetsOperation.Callback cb) { return new GetsOperationImpl(key, cb); } + public GetsOperation gets(Collection keys, GetsOperation.Callback cb) { + return new GetsOperationImpl(keys, cb); + } + public GetOperation mget(Collection keys, GetOperation.Callback cb) { return new MGetOperationImpl(keys, cb); } + public GetsOperation mgets(Collection keys, GetsOperation.Callback cb) { + return new MGetsOperationImpl(keys, cb); + } + public MutatorOperation mutate(Mutator m, String key, int by, long def, int exp, OperationCallback cb) { return new MutatorOperationImpl(m, key, by, def, exp, cb); @@ -148,17 +152,6 @@ public ConcatenationOperation cat(ConcatenationType catType, return new ConcatenationOperationImpl(catType, key, data, cb); } - @Override - protected Collection cloneGet(KeyedOperation op) { - Collection rv = new ArrayList(); - GetOperation.Callback callback = new MultiGetOperationCallback( - op.getCallback(), op.getKeys().size()); - for (String k : op.getKeys()) { - rv.add(get(k, callback)); - } - return rv; - } - public SASLMechsOperation saslMechs(OperationCallback cb) { throw new UnsupportedOperationException(); } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/BaseGetOpImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/BaseGetOpImpl.java index 6a4f3fff5..29473b1a2 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/BaseGetOpImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/BaseGetOpImpl.java @@ -150,7 +150,7 @@ public final void initialize() { commandBuilder.append(keysString); commandBuilder.append(RN_STRING); } else { - assert cmd.equals("mget") : "Unknown Command " + cmd; + assert (cmd.equals("mget") || cmd.equals("mgets")) : "Unknown Command " + cmd; int lenKeys = keysString.getBytes().length; int numKeys = keys.size(); diff --git a/src/main/java/net/spy/memcached/protocol/ascii/GetsOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/GetsOperationImpl.java index d0aa88bb8..d6fcd66a5 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/GetsOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/GetsOperationImpl.java @@ -1,6 +1,8 @@ package net.spy.memcached.protocol.ascii; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import net.spy.memcached.ops.APIType; import net.spy.memcached.ops.GetsOperation; @@ -17,4 +19,9 @@ public GetsOperationImpl(String key, GetsOperation.Callback cb) { setAPIType(APIType.GETS); } + public GetsOperationImpl(Collection keys, GetsOperation.Callback cb) { + super(CMD, cb, new HashSet(keys)); + setAPIType(APIType.GETS); + } + } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/MGetsOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/MGetsOperationImpl.java new file mode 100644 index 000000000..f75961d33 --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/MGetsOperationImpl.java @@ -0,0 +1,21 @@ +package net.spy.memcached.protocol.ascii; + +import net.spy.memcached.ops.APIType; +import net.spy.memcached.ops.GetsOperation; + +import java.util.Collection; +import java.util.HashSet; + +/** + * Operation for retrieving data. + */ +public class MGetsOperationImpl extends BaseGetOpImpl implements GetsOperation { + + private static final String CMD = "mgets"; + + public MGetsOperationImpl(Collection k, Callback c) { + super(CMD, c, new HashSet(k)); + setAPIType(APIType.MGETS); + } + +} diff --git a/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java b/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java index 503fc3fad..a80dd1f85 100644 --- a/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java +++ b/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java @@ -16,7 +16,6 @@ */ package net.spy.memcached.protocol.binary; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -71,13 +70,9 @@ import net.spy.memcached.ops.GetOperation; import net.spy.memcached.ops.GetOperation.Callback; import net.spy.memcached.ops.GetsOperation; -import net.spy.memcached.ops.KeyedOperation; -import net.spy.memcached.ops.MultiGetOperationCallback; -import net.spy.memcached.ops.MultiGetsOperationCallback; import net.spy.memcached.ops.Mutator; import net.spy.memcached.ops.MutatorOperation; import net.spy.memcached.ops.NoopOperation; -import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.SASLAuthOperation; import net.spy.memcached.ops.SASLMechsOperation; @@ -114,11 +109,21 @@ public GetsOperation gets(String key, GetsOperation.Callback cb) { return new GetOperationImpl(key, cb); } + public GetsOperation gets(Collection keys, GetsOperation.Callback callback) { + throw new RuntimeException( + "gets is not supported in binary protocol yet."); + } + public GetOperation mget(Collection keys, GetOperation.Callback cb) { throw new RuntimeException( "mget is not supported in binary protocol yet."); } + public GetsOperation mgets(Collection keys, GetsOperation.Callback cb) { + throw new RuntimeException( + "mgets is not supported in binary protocol yet."); + } + public MutatorOperation mutate(Mutator m, String key, int by, long def, int exp, OperationCallback cb) { return new MutatorOperationImpl(m, key, by, def, exp, cb); @@ -153,24 +158,6 @@ public ConcatenationOperation cat(ConcatenationType catType, long casId, return new ConcatenationOperationImpl(catType, key, data, casId, cb); } - @Override - protected Collection cloneGet(KeyedOperation op) { - Collection rv = new ArrayList(); - GetOperation.Callback getCb = null; - GetsOperation.Callback getsCb = null; - if (op.getCallback() instanceof GetOperation.Callback) { - getCb = new MultiGetOperationCallback( - op.getCallback(), op.getKeys().size()); - } else { - getsCb = new MultiGetsOperationCallback( - op.getCallback(), op.getKeys().size()); - } - for (String k : op.getKeys()) { - rv.add(getCb == null ? gets(k, getsCb) : get(k, getCb)); - } - return rv; - } - public SASLAuthOperation saslAuth(String[] mech, String serverName, Map props, CallbackHandler cbh, OperationCallback cb) { diff --git a/src/main/java/net/spy/memcached/transcoders/TranscodeService.java b/src/main/java/net/spy/memcached/transcoders/TranscodeService.java index 985a033b5..6fff3f52a 100644 --- a/src/main/java/net/spy/memcached/transcoders/TranscodeService.java +++ b/src/main/java/net/spy/memcached/transcoders/TranscodeService.java @@ -10,6 +10,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import net.spy.memcached.CASValue; import net.spy.memcached.CachedData; import net.spy.memcached.compat.SpyObject; import net.spy.memcached.internal.BasicThreadFactory; @@ -49,6 +50,28 @@ public T call() { return task; } + /** + * Perform a decode. + */ + public Future> decode(final Transcoder tc, + final long cas, + final CachedData cachedData) { + + assert !pool.isShutdown() : "Pool has already shut down."; + + TranscodeService.Task> task = new TranscodeService.Task>( + new Callable>() { + public CASValue call() { + return new CASValue(cas, tc.decode(cachedData)); + } + }); + + if (tc.asyncDecode(cachedData)) { + this.pool.execute(task); + } + return task; + } + /** * Shut down the pool. */ diff --git a/src/test/java/net/spy/memcached/BinaryClientTest.java b/src/test/java/net/spy/memcached/BinaryClientTest.java index 6c607e17a..115d0fa10 100644 --- a/src/test/java/net/spy/memcached/BinaryClientTest.java +++ b/src/test/java/net/spy/memcached/BinaryClientTest.java @@ -32,6 +32,31 @@ public void testGetStatsCacheDump() throws Exception { assertTrue(true); } + @Override + public void testGetsBulk() throws Exception { + assertTrue(true); + } + + @Override + public void testGetsBulkVararg() throws Exception { + assertTrue(true); + } + + @Override + public void testGetsBulkVarargWithTranscoder() throws Exception { + assertTrue(true); + } + + @Override + public void testAsyncGetsBulkVarargWithTranscoder() throws Exception { + assertTrue(true); + } + + @Override + public void testAsyncGetsBulkWithTranscoderIterator() throws Exception { + assertTrue(true); + } + public void testCASAppendFail() throws Exception { final String key = "append.key"; assertTrue(client.set(key, 5, "test").get()); diff --git a/src/test/java/net/spy/memcached/MockMemcachedNode.java b/src/test/java/net/spy/memcached/MockMemcachedNode.java index 3a8a15b33..8543ac518 100644 --- a/src/test/java/net/spy/memcached/MockMemcachedNode.java +++ b/src/test/java/net/spy/memcached/MockMemcachedNode.java @@ -183,6 +183,11 @@ public boolean enabledMGetOp() { return false; } + @Override + public boolean enabledMGetsOp() { + return false; + } + public boolean enabledSpaceSeparate() { return false; } diff --git a/src/test/java/net/spy/memcached/OperationFactoryTestBase.java b/src/test/java/net/spy/memcached/OperationFactoryTestBase.java index 7961ed269..57131b9db 100644 --- a/src/test/java/net/spy/memcached/OperationFactoryTestBase.java +++ b/src/test/java/net/spy/memcached/OperationFactoryTestBase.java @@ -230,6 +230,55 @@ public void testMultipleGetOperationFanout() { } } + // These are harder cases as they fan out. + public void testMultipleGetsOperationCloning() { + Collection keys = Arrays.asList("k1", "k2", "k3"); + GetsOperation.Callback callback = + (GetsOperation.Callback) mock(GetsOperation.Callback.class).proxy(); + GetsOperation op = ofact.gets(keys, callback); + + Collection ops = ofact.clone(op); + assertEquals(3, ops.size()); + + Collection mutableKeys = new ArrayList(keys); + int i = 3; + for (Operation o : ops) { + assertEquals(i, mutableKeys.size()); // Starting size + GetsOperation go = (GetsOperation) o; + mutableKeys.removeAll(go.getKeys()); + // Verify we matched and removed 1 + assertEquals(--i, mutableKeys.size()); + } + } + + public void testMultipleGetsOperationFanout() { + long[] casId = {82757248, 82757249, 82757250}; + Collection keys = Arrays.asList("k1", "k2", "k3"); + Mock m = mock(GetsOperation.Callback.class); + OperationStatus st = new OperationStatus(true, "blah"); + m.expects(once()).method("complete"); + m.expects(once()).method("receivedStatus").with(same(st)); + m.expects(once()).method("gotData") + .with(eq("k1"), eq(1), eq(casId[0]), isA(byte[].class)); + m.expects(once()).method("gotData") + .with(eq("k2"), eq(2), eq(casId[1]), isA(byte[].class)); + m.expects(once()).method("gotData") + .with(eq("k3"), eq(3), eq(casId[2]), isA(byte[].class)); + + GetsOperation.Callback callback = (GetsOperation.Callback) m.proxy(); + GetsOperation op = ofact.gets(keys, callback); + + // Transition each operation callback into the complete state. + Iterator ki = keys.iterator(); + int i = 0; + for (Operation o : ofact.clone(op)) { + GetsOperation.Callback cb = (GetsOperation.Callback) o.getCallback(); + cb.gotData(ki.next(), ++i, casId[i - 1], new byte[3]); + cb.receivedStatus(st); + cb.complete(); + } + } + protected void assertKey(KeyedOperation op) { assertEquals(TEST_KEY, op.getKeys().iterator().next()); } diff --git a/src/test/java/net/spy/memcached/ProtocolBaseCase.java b/src/test/java/net/spy/memcached/ProtocolBaseCase.java index a79fa49ba..a5b0d057e 100644 --- a/src/test/java/net/spy/memcached/ProtocolBaseCase.java +++ b/src/test/java/net/spy/memcached/ProtocolBaseCase.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import net.spy.memcached.compat.SyncThread; +import net.spy.memcached.internal.BulkFuture; import net.spy.memcached.ops.OperationErrorType; import net.spy.memcached.ops.OperationException; import net.spy.memcached.transcoders.SerializingTranscoder; @@ -458,6 +459,106 @@ public void testAsyncGetBulkWithTranscoderIterator() throws Exception { } } + public void testGetsBulk() throws Exception { + Collection keys = Arrays.asList("test1", "test2", "test3"); + assertEquals(0, client.getsBulk(keys).size()); + client.set("test1", 5, "val1"); + client.set("test2", 5, "val2"); + Map> vals = client.getsBulk(keys); + assertEquals(2, vals.size()); + assertEquals("val1", vals.get("test1").getValue()); + assertEquals("val2", vals.get("test2").getValue()); + assertEquals(client.gets("test1").getCas(), vals.get("test1").getCas()); + assertEquals(client.gets("test2").getCas(), vals.get("test2").getCas()); + } + + public void testGetsBulkVararg() throws Exception { + assertEquals(0, client.getsBulk("test1", "test2", "test3").size()); + client.set("test1", 5, "val1"); + client.set("test2", 5, "val2"); + Map> vals = client.getsBulk("test1", "test2", "test3"); + assertEquals(2, vals.size()); + assertEquals("val1", vals.get("test1").getValue()); + assertEquals("val2", vals.get("test2").getValue()); + assertEquals(client.gets("test1").getCas(), vals.get("test1").getCas()); + assertEquals(client.gets("test2").getCas(), vals.get("test2").getCas()); + } + + public void testGetsBulkVarargWithTranscoder() throws Exception { + Transcoder t = new TestTranscoder(); + assertEquals(0, client.getsBulk(t, "test1", "test2", "test3").size()); + client.set("test1", 5, "val1", t); + client.set("test2", 5, "val2", t); + Map> vals = client.getsBulk(t, + "test1", "test2", "test3"); + assertEquals(2, vals.size()); + assertEquals("val1", vals.get("test1").getValue()); + assertEquals("val2", vals.get("test2").getValue()); + assertEquals(client.gets("test1", t).getCas(), vals.get("test1").getCas()); + assertEquals(client.gets("test2", t).getCas(), vals.get("test2").getCas()); + } + + public void testAsyncGetsBulkVarargWithTranscoder() throws Exception { + Transcoder t = new TestTranscoder(); + assertEquals(0, client.getsBulk(t, "test1", "test2", "test3").size()); + client.set("test1", 5, "val1", t); + client.set("test2", 5, "val2", t); + BulkFuture>> vals = client.asyncGetsBulk(t, + "test1", "test2", "test3"); + assertEquals(2, vals.get().size()); + assertEquals("val1", vals.get().get("test1").getValue()); + assertEquals("val2", vals.get().get("test2").getValue()); + assertEquals(client.gets("test1", t).getCas(), vals.get().get("test1").getCas()); + assertEquals(client.gets("test2", t).getCas(), vals.get().get("test2").getCas()); + } + + public void testAsyncGetsBulkWithTranscoderIterator() throws Exception { + ArrayList keys = new ArrayList(); + keys.add("test1"); + keys.add("test2"); + keys.add("test3"); + + ArrayList> tcs = new ArrayList>(keys.size()); + for (String key : keys) { + tcs.add(new TestWithKeyTranscoder(key)); + } + + // Any transcoders listed after list of keys should be + // ignored. + for (String key : keys) { + tcs.add(new TestWithKeyTranscoder(key)); + } + + assertEquals(0, client.asyncGetBulk(keys, tcs.listIterator()).get().size()); + + client.set(keys.get(0), 5, "val1", tcs.get(0)); + client.set(keys.get(1), 5, "val2", tcs.get(1)); + BulkFuture>> vals = client.asyncGetsBulk(keys, tcs.listIterator()); + assertEquals(2, vals.get().size()); + CASValue val1 = vals.get().get(keys.get(0)); + CASValue val2 = vals.get().get(keys.get(1)); + assertEquals("val1", val1.getValue()); + assertEquals("val2", val2.getValue()); + assertEquals(client.gets(keys.get(0), tcs.get(0)).getCas(), val1.getCas()); + assertEquals(client.gets(keys.get(1), tcs.get(1)).getCas(), val2.getCas()); + + // Set with one transcoder with the proper key and get + // with another transcoder with the wrong key. + keys.add(0, "test4"); + Transcoder encodeTranscoder = new TestWithKeyTranscoder(keys.get(0)); + client.set(keys.get(0), 5, "val4", encodeTranscoder).get(); + + Transcoder decodeTranscoder = new TestWithKeyTranscoder("not " + keys.get(0)); + tcs.add(0, decodeTranscoder); + try { + client.asyncGetsBulk(keys, tcs.listIterator()).get(); + fail("Expected ExecutionException caused by key mismatch"); + } catch (java.util.concurrent.ExecutionException e) { + // pass + } + } + + public void testAvailableServers() { if (USE_ZK) { return; // We don't know the server address priori @@ -611,7 +712,7 @@ public long getOperationTimeout() { } } - public void xtestGracefulShutdownTooSlow() throws Exception { + public void testGracefulShutdownTooSlow() throws Exception { for (int i = 0; i < 10000; i++) { client.set("t" + i, 10, i); } diff --git a/src/test/java/net/spy/memcached/protocol/binary/OperationFactoryTest.java b/src/test/java/net/spy/memcached/protocol/binary/OperationFactoryTest.java index 49f9957c9..edbc222b9 100644 --- a/src/test/java/net/spy/memcached/protocol/binary/OperationFactoryTest.java +++ b/src/test/java/net/spy/memcached/protocol/binary/OperationFactoryTest.java @@ -10,4 +10,13 @@ protected OperationFactory getOperationFactory() { return new BinaryOperationFactory(); } + @Override + public void testMultipleGetsOperationCloning() { + assertTrue(true); + } + + @Override + public void testMultipleGetsOperationFanout() { + assertTrue(true); + } }