Skip to content

Commit 89077c2

Browse files
committed
INTREANL: Create new api asyncBopPipedUpsert.
1 parent a7ed5e2 commit 89077c2

File tree

7 files changed

+475
-6
lines changed

7 files changed

+475
-6
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

+61
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@
9292
import net.spy.memcached.collection.CollectionPipedUpdate;
9393
import net.spy.memcached.collection.CollectionPipedUpdate.BTreePipedUpdate;
9494
import net.spy.memcached.collection.CollectionPipedUpdate.MapPipedUpdate;
95+
import net.spy.memcached.collection.CollectionPipedUpsert;
96+
import net.spy.memcached.collection.CollectionPipedUpsert.BTreePipedUpsert;
97+
import net.spy.memcached.collection.CollectionPipedUpsert.ByteArrayBTreePipedUpsert;
9598
import net.spy.memcached.collection.CollectionResponse;
9699
import net.spy.memcached.collection.CollectionUpdate;
97100
import net.spy.memcached.collection.Element;
@@ -2734,6 +2737,64 @@ public void complete() {
27342737
return rv;
27352738
}
27362739

2740+
@Override
2741+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
2742+
String key, Map<Long, Object> elements,
2743+
CollectionAttributes attributesForCreate) {
2744+
return asyncBopPipedUpsertBulk(key, elements, attributesForCreate, collectionTranscoder);
2745+
}
2746+
2747+
@Override
2748+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
2749+
String key, List<Element<Object>> elements, CollectionAttributes collectionAttributes) {
2750+
return asyncBopPipedUpsertBulk(key, elements, collectionAttributes, collectionTranscoder);
2751+
}
2752+
2753+
@Override
2754+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
2755+
String key, Map<Long, T> elements, CollectionAttributes attributesForCreate, Transcoder<T> tc) {
2756+
2757+
if (elements.isEmpty()) {
2758+
throw new IllegalArgumentException("The number of piped operations must be larger than 0.");
2759+
}
2760+
2761+
List<CollectionPipedInsert<T>> upsertList = new ArrayList<CollectionPipedInsert<T>>();
2762+
2763+
if (elements.size() <= CollectionPipedUpsert.MAX_PIPED_ITEM_COUNT) {
2764+
upsertList.add(new BTreePipedUpsert<T>(key, elements, attributesForCreate, tc));
2765+
} else {
2766+
PartitionedMap<Long, T> list = new PartitionedMap<Long, T>(
2767+
elements, CollectionPipedUpsert.MAX_PIPED_ITEM_COUNT);
2768+
for (Map<Long, T> elementMap : list) {
2769+
upsertList.add(new BTreePipedUpsert<T>(key, elementMap, attributesForCreate, tc));
2770+
}
2771+
}
2772+
return asyncCollectionPipedInsert(key, upsertList);
2773+
}
2774+
2775+
@Override
2776+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
2777+
String key, List<Element<T>> elements, CollectionAttributes attributesForCreate, Transcoder<T> tc) {
2778+
2779+
if (elements.isEmpty()) {
2780+
throw new IllegalArgumentException("The number of piped operations must be larger than 0.");
2781+
}
2782+
2783+
List<CollectionPipedInsert<T>> upsertList = new ArrayList<CollectionPipedInsert<T>>();
2784+
2785+
2786+
if (elements.size() <= CollectionPipedUpsert.MAX_PIPED_ITEM_COUNT) {
2787+
upsertList.add(new ByteArrayBTreePipedUpsert<T>(key, elements, attributesForCreate, tc));
2788+
} else {
2789+
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
2790+
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
2791+
for (List<Element<T>> elementList : list) {
2792+
upsertList.add(new ByteArrayBTreePipedUpsert<T>(key, elementList, attributesForCreate, tc));
2793+
}
2794+
}
2795+
return asyncCollectionPipedInsert(key, upsertList);
2796+
}
2797+
27372798
@Override
27382799
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpdateBulk(
27392800
String key, List<Element<Object>> elements) {

src/main/java/net/spy/memcached/ArcusClientIF.java

+55
Original file line numberDiff line numberDiff line change
@@ -1540,6 +1540,61 @@ public CollectionFuture<Boolean> asyncMopUpdate(String key, String mkey,
15401540
public <T> CollectionFuture<Boolean> asyncMopUpdate(String key, String mkey,
15411541
T value, Transcoder<T> tc);
15421542

1543+
/**
1544+
* Upsert elements into the b+tree
1545+
*
1546+
* @param key key of a b+tree
1547+
* @param elements list of b+tree elements
1548+
* @param attributesForCreate create a b+tree with this attributes,
1549+
* if given key of b+tree is not exists.
1550+
* @return a future indicating success
1551+
*/
1552+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1553+
String key, List<Element<Object>> elements, CollectionAttributes attributesForCreate);
1554+
1555+
1556+
/**
1557+
* Upsert elements into the b+tree
1558+
*
1559+
* @param key key of a b+tree
1560+
* @param elements map of b+tree elements
1561+
* @param attributesForCreate create a b+tree with this attributes,
1562+
* if given key of b+tree is not exists.
1563+
* @return a future indicating success
1564+
*/
1565+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1566+
String key, Map<Long, Object> elements,
1567+
CollectionAttributes attributesForCreate);
1568+
1569+
/**
1570+
* Upsert elements into the b+tree
1571+
*
1572+
* @param key key of a b+tree
1573+
* @param elements list of b+tree elements
1574+
* @param attributesForCreate create a b+tree with this attributes,
1575+
* if given key of b+tree is not exists.
1576+
* @param tc a transcoder to encode the value of element
1577+
* @return a future indicating success
1578+
*/
1579+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1580+
String key, List<Element<T>> elements,
1581+
CollectionAttributes attributesForCreate, Transcoder<T> tc);
1582+
1583+
1584+
/**
1585+
* Upsert elements into the b+tree
1586+
*
1587+
* @param key key of a b+tree
1588+
* @param elements map of b+tree elements
1589+
* @param attributesForCreate create a b+tree with this attributes,
1590+
* if given key of b+tree is not exists.
1591+
* @param tc a transcoder to encode the value of element
1592+
* @return a future indicating success
1593+
*/
1594+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1595+
String key, Map<Long, T> elements,
1596+
CollectionAttributes attributesForCreate, Transcoder<T> tc);
1597+
15431598
/**
15441599
* Update elements from the b+tree
15451600
*

src/main/java/net/spy/memcached/ArcusClientPool.java

+25
Original file line numberDiff line numberDiff line change
@@ -1585,4 +1585,29 @@ public <E> BTreeStoreAndGetFuture<Boolean, E> asyncBopUpsertAndGetTrimmed(
15851585
value, attributesForCreate, transcoder);
15861586
}
15871587

1588+
@Override
1589+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1590+
String key, List<Element<Object>> elements, CollectionAttributes attributesForCreate) {
1591+
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate);
1592+
}
1593+
1594+
@Override
1595+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1596+
String key, Map<Long, Object> elements, CollectionAttributes attributesForCreate) {
1597+
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate);
1598+
}
1599+
1600+
@Override
1601+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1602+
String key, List<Element<T>> elements,
1603+
CollectionAttributes attributesForCreate, Transcoder<T> tc) {
1604+
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate, tc);
1605+
}
1606+
1607+
@Override
1608+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1609+
String key, Map<Long, T> elements,
1610+
CollectionAttributes attributesForCreate, Transcoder<T> tc) {
1611+
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate, tc);
1612+
}
15881613
}

src/main/java/net/spy/memcached/collection/CollectionPipedInsert.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public ByteBuffer getAsciiCommand() {
212212
for (i = this.nextOpIndex; i < keySize; i++) {
213213
Long bkey = keyList.get(i);
214214
byte[] value = encodedList.get(i);
215-
setArguments(bb, COMMAND, key, bkey, value.length,
215+
setArguments(bb, getCommand(), key, bkey, value.length,
216216
createOption, (i < keySize - 1) ? PIPE : "");
217217
bb.put(value);
218218
bb.put(CRLF);
@@ -223,6 +223,10 @@ public ByteBuffer getAsciiCommand() {
223223

224224
return bb;
225225
}
226+
227+
public String getCommand() {
228+
return COMMAND;
229+
}
226230
}
227231

228232
/**
@@ -274,7 +278,7 @@ public ByteBuffer getAsciiCommand() {
274278
for (i = this.nextOpIndex; i < eSize; i++) {
275279
Element<T> element = elements.get(i);
276280
byte[] value = encodedList.get(i);
277-
setArguments(bb, COMMAND, key,
281+
setArguments(bb, getCommand(), key,
278282
element.getStringBkey(), element.getStringEFlag(), value.length,
279283
createOption, (i < eSize - 1) ? PIPE : "");
280284
bb.put(value);
@@ -286,6 +290,10 @@ public ByteBuffer getAsciiCommand() {
286290

287291
return bb;
288292
}
293+
294+
public String getCommand() {
295+
return COMMAND;
296+
}
289297
}
290298

291299
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package net.spy.memcached.collection;
2+
3+
import java.util.List;
4+
import java.util.Map;
5+
6+
import net.spy.memcached.transcoders.Transcoder;
7+
8+
public abstract class CollectionPipedUpsert<T> extends CollectionPipedInsert<T> {
9+
10+
public CollectionPipedUpsert(String key, CollectionAttributes attribute,
11+
Transcoder<T> tc, int itemCount) {
12+
super(key, attribute, tc, itemCount);
13+
}
14+
15+
public static class BTreePipedUpsert<T> extends BTreePipedInsert<T> {
16+
private static final String COMMAND = "bop upsert";
17+
18+
public BTreePipedUpsert(String key, Map<Long, T> map,
19+
CollectionAttributes attr, Transcoder<T> tc) {
20+
super(key, map, attr, tc);
21+
}
22+
23+
@Override
24+
public String getCommand() {
25+
return COMMAND;
26+
}
27+
}
28+
29+
public static class ByteArrayBTreePipedUpsert<T> extends ByteArraysBTreePipedInsert<T> {
30+
private static final String COMMAND = "bop upsert";
31+
32+
public ByteArrayBTreePipedUpsert(String key, List<Element<T>> elements,
33+
CollectionAttributes attr, Transcoder<T> tc) {
34+
super(key, elements, attr, tc);
35+
}
36+
37+
@Override
38+
public String getCommand() {
39+
return COMMAND;
40+
}
41+
}
42+
}

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class CollectionPipedInsertOperationImpl extends OperationImpl
4949
true, "CREATED_STORED", CollectionResponse.CREATED_STORED);
5050
private static final OperationStatus STORED = new CollectionOperationStatus(
5151
true, "STORED", CollectionResponse.STORED);
52+
private static final OperationStatus REPLACED = new CollectionOperationStatus(
53+
true, "REPLACED", CollectionResponse.REPLACED);
5254
private static final OperationStatus NOT_FOUND = new CollectionOperationStatus(
5355
false, "NOT_FOUND", CollectionResponse.NOT_FOUND);
5456
private static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus(
@@ -117,8 +119,8 @@ assert getState() == OperationState.READING
117119

118120
if (insert.isNotPiped()) {
119121
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
120-
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
121-
TYPE_MISMATCH, BKEY_MISMATCH);
122+
REPLACED, NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED,
123+
OUT_OF_RANGE, TYPE_MISMATCH, BKEY_MISMATCH);
122124
if (status.isSuccess()) {
123125
cb.receivedStatus((successAll) ? END : FAILED_END);
124126
} else {
@@ -157,8 +159,8 @@ assert getState() == OperationState.READING
157159
count = Integer.parseInt(stuff[1]);
158160
} else {
159161
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
160-
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
161-
TYPE_MISMATCH, BKEY_MISMATCH);
162+
REPLACED, NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED,
163+
OUT_OF_RANGE, TYPE_MISMATCH, BKEY_MISMATCH);
162164

163165
if (!status.isSuccess()) {
164166
cb.gotStatus(index, status);

0 commit comments

Comments
 (0)