diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 636db34d3..d8d7717bd 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -622,14 +622,17 @@ public CASResponse cas(String key, long casId, T value, */ public CASResponse cas(String key, long casId, int exp, T value, Transcoder tc) { + OperationFuture future = asyncCAS(key, casId, exp, value, tc); try { - return asyncCAS(key, casId, exp, value, tc).get(operationTimeout, - TimeUnit.MILLISECONDS); + return future.get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + future.cancel(true); throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { + future.cancel(true); throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { + future.cancel(true); throw new OperationTimeoutException(e); } } @@ -1005,13 +1008,15 @@ public OperationFuture> asyncGets(final String key) { public CASValue gets(String key, Transcoder tc) { OperationFuture> future = asyncGets(key, tc); try { - return future.get( - operationTimeout, TimeUnit.MILLISECONDS); + return future.get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + future.cancel(true); throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { + future.cancel(true); throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { + future.cancel(true); throw new OperationTimeoutException(e); } } @@ -1045,8 +1050,7 @@ public CASValue gets(String key) { public T get(String key, Transcoder tc) { Future future = asyncGet(key, tc); try { - return future.get( - operationTimeout, TimeUnit.MILLISECONDS); + return future.get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { future.cancel(true); throw new RuntimeException("Interrupted waiting for value", e); @@ -1437,14 +1441,17 @@ public BulkFuture>> asyncGetsBulk(String... keys) { */ public Map getBulk(Collection keys, Transcoder tc) { + BulkFuture> future = asyncGetBulk(keys, tc); try { - return asyncGetBulk(keys, tc).get( - operationTimeout, TimeUnit.MILLISECONDS); + return future.get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + future.cancel(true); throw new RuntimeException("Interrupted getting bulk values", e); } catch (ExecutionException e) { + future.cancel(true); throw new RuntimeException("Failed getting bulk values", e); } catch (TimeoutException e) { + future.cancel(true); throw new OperationTimeoutException(e); } } @@ -1507,14 +1514,17 @@ public Map getBulk(String... keys) { */ public Map> getsBulk(Collection keys, Transcoder tc) { + BulkFuture>> future = asyncGetsBulk(keys, tc); try { - return asyncGetsBulk(keys, tc).get( - operationTimeout, TimeUnit.MILLISECONDS); + return future.get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + future.cancel(true); throw new RuntimeException("Interrupted getting bulk values", e); } catch (ExecutionException e) { + future.cancel(true); throw new RuntimeException("Failed getting bulk values", e); } catch (TimeoutException e) { + future.cancel(true); throw new OperationTimeoutException(e.toString(), e); } } @@ -1672,9 +1682,11 @@ public void complete() { })); try { if (!latch.await(operationTimeout, TimeUnit.MILLISECONDS)) { + op.cancel("by applcation."); throw new OperationTimeoutException(operationTimeout, TimeUnit.MILLISECONDS, op); } } catch (InterruptedException e) { + op.cancel("by applcation."); throw new RuntimeException("Interrupted", e); } getLogger().debug("Mutation returned %s", rv); @@ -1778,10 +1790,13 @@ private long mutateWithDefault(Mutator t, String key, assert rv != -1 : "Failed to mutate or init value"; } } catch (InterruptedException e) { + f.cancel(true); throw new RuntimeException("Interrupted waiting for store", e); } catch (ExecutionException e) { + f.cancel(true); throw new RuntimeException("Failed waiting for store", e); } catch (TimeoutException e) { + f.cancel(true); throw new OperationTimeoutException(e); } }