Skip to content

Commit

Permalink
Merge pull request #332 from sUpniverse/supniverse/cancel-operation
Browse files Browse the repository at this point in the history
FIX: the operation was canceled when the exception occured
  • Loading branch information
jhpark816 authored Feb 22, 2021
2 parents 138a57a + 7edbe25 commit d013768
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -622,14 +622,17 @@ public <T> CASResponse cas(String key, long casId, T value,
*/
public <T> CASResponse cas(String key, long casId, int exp, T value,
Transcoder<T> tc) {
OperationFuture<CASResponse> future = asyncCAS(key, casId, exp, value, tc);
try {
return asyncCAS(key, casId, exp, value, tc).get(operationTimeout,
TimeUnit.MILLISECONDS);
return future.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
future.cancel(true);
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
future.cancel(true);
throw new RuntimeException("Exception waiting for value", e);
} catch (TimeoutException e) {
future.cancel(true);
throw new OperationTimeoutException(e);
}
}
Expand Down Expand Up @@ -1005,13 +1008,15 @@ public OperationFuture<CASValue<Object>> asyncGets(final String key) {
public <T> CASValue<T> gets(String key, Transcoder<T> tc) {
OperationFuture<CASValue<T>> future = asyncGets(key, tc);
try {
return future.get(
operationTimeout, TimeUnit.MILLISECONDS);
return future.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
future.cancel(true);
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
future.cancel(true);
throw new RuntimeException("Exception waiting for value", e);
} catch (TimeoutException e) {
future.cancel(true);
throw new OperationTimeoutException(e);
}
}
Expand Down Expand Up @@ -1045,8 +1050,7 @@ public CASValue<Object> gets(String key) {
public <T> T get(String key, Transcoder<T> tc) {
Future<T> future = asyncGet(key, tc);
try {
return future.get(
operationTimeout, TimeUnit.MILLISECONDS);
return future.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
future.cancel(true);
throw new RuntimeException("Interrupted waiting for value", e);
Expand Down Expand Up @@ -1437,14 +1441,17 @@ public BulkFuture<Map<String, CASValue<Object>>> asyncGetsBulk(String... keys) {
*/
public <T> Map<String, T> getBulk(Collection<String> keys,
Transcoder<T> tc) {
BulkFuture<Map<String, T>> future = asyncGetBulk(keys, tc);
try {
return asyncGetBulk(keys, tc).get(
operationTimeout, TimeUnit.MILLISECONDS);
return future.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
future.cancel(true);
throw new RuntimeException("Interrupted getting bulk values", e);
} catch (ExecutionException e) {
future.cancel(true);
throw new RuntimeException("Failed getting bulk values", e);
} catch (TimeoutException e) {
future.cancel(true);
throw new OperationTimeoutException(e);
}
}
Expand Down Expand Up @@ -1507,14 +1514,17 @@ public Map<String, Object> getBulk(String... keys) {
*/
public <T> Map<String, CASValue<T>> getsBulk(Collection<String> keys,
Transcoder<T> tc) {
BulkFuture<Map<String, CASValue<T>>> future = asyncGetsBulk(keys, tc);
try {
return asyncGetsBulk(keys, tc).get(
operationTimeout, TimeUnit.MILLISECONDS);
return future.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
future.cancel(true);
throw new RuntimeException("Interrupted getting bulk values", e);
} catch (ExecutionException e) {
future.cancel(true);
throw new RuntimeException("Failed getting bulk values", e);
} catch (TimeoutException e) {
future.cancel(true);
throw new OperationTimeoutException(e.toString(), e);
}
}
Expand Down Expand Up @@ -1672,9 +1682,11 @@ public void complete() {
}));
try {
if (!latch.await(operationTimeout, TimeUnit.MILLISECONDS)) {
op.cancel("by applcation.");
throw new OperationTimeoutException(operationTimeout, TimeUnit.MILLISECONDS, op);
}
} catch (InterruptedException e) {
op.cancel("by applcation.");
throw new RuntimeException("Interrupted", e);
}
getLogger().debug("Mutation returned %s", rv);
Expand Down Expand Up @@ -1778,10 +1790,13 @@ private long mutateWithDefault(Mutator t, String key,
assert rv != -1 : "Failed to mutate or init value";
}
} catch (InterruptedException e) {
f.cancel(true);
throw new RuntimeException("Interrupted waiting for store", e);
} catch (ExecutionException e) {
f.cancel(true);
throw new RuntimeException("Failed waiting for store", e);
} catch (TimeoutException e) {
f.cancel(true);
throw new OperationTimeoutException(e);
}
}
Expand Down

0 comments on commit d013768

Please sign in to comment.