Skip to content

Commit c77a816

Browse files
committed
FEATURE: Create new api asyncBopPipedUpsert.
1 parent 54a8417 commit c77a816

File tree

7 files changed

+476
-6
lines changed

7 files changed

+476
-6
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

+61
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@
9191
import net.spy.memcached.collection.CollectionPipedUpdate;
9292
import net.spy.memcached.collection.CollectionPipedUpdate.BTreePipedUpdate;
9393
import net.spy.memcached.collection.CollectionPipedUpdate.MapPipedUpdate;
94+
import net.spy.memcached.collection.CollectionPipedUpsert;
95+
import net.spy.memcached.collection.CollectionPipedUpsert.BTreePipedUpsert;
96+
import net.spy.memcached.collection.CollectionPipedUpsert.ByteArrayBTreePipedUpsert;
9497
import net.spy.memcached.collection.CollectionResponse;
9598
import net.spy.memcached.collection.CollectionUpdate;
9699
import net.spy.memcached.collection.Element;
@@ -2707,6 +2710,64 @@ public void complete() {
27072710
return rv;
27082711
}
27092712

2713+
@Override
2714+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
2715+
String key, Map<Long, Object> elements,
2716+
CollectionAttributes attributesForCreate) {
2717+
return asyncBopPipedUpsertBulk(key, elements, attributesForCreate, collectionTranscoder);
2718+
}
2719+
2720+
@Override
2721+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
2722+
String key, List<Element<Object>> elements, CollectionAttributes collectionAttributes) {
2723+
return asyncBopPipedUpsertBulk(key, elements, collectionAttributes, collectionTranscoder);
2724+
}
2725+
2726+
@Override
2727+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
2728+
String key, Map<Long, T> elements, CollectionAttributes attributesForCreate, Transcoder<T> tc) {
2729+
2730+
if (elements.isEmpty()) {
2731+
throw new IllegalArgumentException("The number of piped operations must be larger than 0.");
2732+
}
2733+
2734+
List<CollectionPipedInsert<T>> upsertList = new ArrayList<CollectionPipedInsert<T>>();
2735+
2736+
if (elements.size() <= CollectionPipedUpsert.MAX_PIPED_ITEM_COUNT) {
2737+
upsertList.add(new BTreePipedUpsert<T>(key, elements, attributesForCreate, tc));
2738+
} else {
2739+
PartitionedMap<Long, T> list = new PartitionedMap<Long, T>(
2740+
elements, CollectionPipedUpsert.MAX_PIPED_ITEM_COUNT);
2741+
for (Map<Long, T> elementMap : list) {
2742+
upsertList.add(new BTreePipedUpsert<T>(key, elementMap, attributesForCreate, tc));
2743+
}
2744+
}
2745+
return asyncCollectionPipedInsert(key, upsertList);
2746+
}
2747+
2748+
@Override
2749+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
2750+
String key, List<Element<T>> elements, CollectionAttributes attributesForCreate, Transcoder<T> tc) {
2751+
2752+
if (elements.isEmpty()) {
2753+
throw new IllegalArgumentException("The number of piped operations must be larger than 0.");
2754+
}
2755+
2756+
List<CollectionPipedInsert<T>> upsertList = new ArrayList<CollectionPipedInsert<T>>();
2757+
2758+
2759+
if (elements.size() <= CollectionPipedUpsert.MAX_PIPED_ITEM_COUNT) {
2760+
upsertList.add(new ByteArrayBTreePipedUpsert<T>(key, elements, attributesForCreate, tc));
2761+
} else {
2762+
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
2763+
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
2764+
for (List<Element<T>> elementList : list) {
2765+
upsertList.add(new ByteArrayBTreePipedUpsert<T>(key, elementList, attributesForCreate, tc));
2766+
}
2767+
}
2768+
return asyncCollectionPipedInsert(key, upsertList);
2769+
}
2770+
27102771
@Override
27112772
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpdateBulk(
27122773
String key, List<Element<Object>> elements) {

src/main/java/net/spy/memcached/ArcusClientIF.java

+55
Original file line numberDiff line numberDiff line change
@@ -1526,6 +1526,61 @@ CollectionFuture<Boolean> asyncMopUpdate(String key, String mkey,
15261526
<T> CollectionFuture<Boolean> asyncMopUpdate(String key, String mkey,
15271527
T value, Transcoder<T> tc);
15281528

1529+
/**
1530+
* Upsert elements into the b+tree
1531+
*
1532+
* @param key key of a b+tree
1533+
* @param elements list of b+tree elements
1534+
* @param attributesForCreate create a b+tree with this attributes,
1535+
* if given key of b+tree is not exists.
1536+
* @return a future indicating success
1537+
*/
1538+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1539+
String key, List<Element<Object>> elements, CollectionAttributes attributesForCreate);
1540+
1541+
1542+
/**
1543+
* Upsert elements into the b+tree
1544+
*
1545+
* @param key key of a b+tree
1546+
* @param elements map of b+tree elements
1547+
* @param attributesForCreate create a b+tree with this attributes,
1548+
* if given key of b+tree is not exists.
1549+
* @return a future indicating success
1550+
*/
1551+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1552+
String key, Map<Long, Object> elements,
1553+
CollectionAttributes attributesForCreate);
1554+
1555+
/**
1556+
* Upsert elements into the b+tree
1557+
*
1558+
* @param key key of a b+tree
1559+
* @param elements list of b+tree elements
1560+
* @param attributesForCreate create a b+tree with this attributes,
1561+
* if given key of b+tree is not exists.
1562+
* @param tc a transcoder to encode the value of element
1563+
* @return a future indicating success
1564+
*/
1565+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1566+
String key, List<Element<T>> elements,
1567+
CollectionAttributes attributesForCreate, Transcoder<T> tc);
1568+
1569+
1570+
/**
1571+
* Upsert elements into the b+tree
1572+
*
1573+
* @param key key of a b+tree
1574+
* @param elements map of b+tree elements
1575+
* @param attributesForCreate create a b+tree with this attributes,
1576+
* if given key of b+tree is not exists.
1577+
* @param tc a transcoder to encode the value of element
1578+
* @return a future indicating success
1579+
*/
1580+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1581+
String key, Map<Long, T> elements,
1582+
CollectionAttributes attributesForCreate, Transcoder<T> tc);
1583+
15291584
/**
15301585
* Update elements from the b+tree
15311586
*

src/main/java/net/spy/memcached/ArcusClientPool.java

+25
Original file line numberDiff line numberDiff line change
@@ -1586,4 +1586,29 @@ public <E> BTreeStoreAndGetFuture<Boolean, E> asyncBopUpsertAndGetTrimmed(
15861586
value, attributesForCreate, transcoder);
15871587
}
15881588

1589+
@Override
1590+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1591+
String key, List<Element<Object>> elements, CollectionAttributes attributesForCreate) {
1592+
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate);
1593+
}
1594+
1595+
@Override
1596+
public CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1597+
String key, Map<Long, Object> elements, CollectionAttributes attributesForCreate) {
1598+
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate);
1599+
}
1600+
1601+
@Override
1602+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1603+
String key, List<Element<T>> elements,
1604+
CollectionAttributes attributesForCreate, Transcoder<T> tc) {
1605+
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate, tc);
1606+
}
1607+
1608+
@Override
1609+
public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpsertBulk(
1610+
String key, Map<Long, T> elements,
1611+
CollectionAttributes attributesForCreate, Transcoder<T> tc) {
1612+
return this.getClient().asyncBopPipedUpsertBulk(key, elements, attributesForCreate, tc);
1613+
}
15891614
}

src/main/java/net/spy/memcached/collection/CollectionPipedInsert.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public ByteBuffer getAsciiCommand() {
212212
for (i = this.nextOpIndex; i < keySize; i++) {
213213
Long bkey = keyList.get(i);
214214
byte[] value = encodedList.get(i);
215-
setArguments(bb, COMMAND, key, bkey, value.length,
215+
setArguments(bb, getCommand(), key, bkey, value.length,
216216
createOption, (i < keySize - 1) ? PIPE : "");
217217
bb.put(value);
218218
bb.put(CRLF);
@@ -223,6 +223,10 @@ public ByteBuffer getAsciiCommand() {
223223

224224
return bb;
225225
}
226+
227+
public String getCommand() {
228+
return COMMAND;
229+
}
226230
}
227231

228232
/**
@@ -274,7 +278,7 @@ public ByteBuffer getAsciiCommand() {
274278
for (i = this.nextOpIndex; i < eSize; i++) {
275279
Element<T> element = elements.get(i);
276280
byte[] value = encodedList.get(i);
277-
setArguments(bb, COMMAND, key,
281+
setArguments(bb, getCommand(), key,
278282
element.getStringBkey(), element.getStringEFlag(), value.length,
279283
createOption, (i < eSize - 1) ? PIPE : "");
280284
bb.put(value);
@@ -286,6 +290,10 @@ public ByteBuffer getAsciiCommand() {
286290

287291
return bb;
288292
}
293+
294+
public String getCommand() {
295+
return COMMAND;
296+
}
289297
}
290298

291299
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package net.spy.memcached.collection;
2+
3+
import java.util.List;
4+
import java.util.Map;
5+
6+
import net.spy.memcached.transcoders.Transcoder;
7+
8+
public abstract class CollectionPipedUpsert<T> extends CollectionPipedInsert<T> {
9+
10+
public CollectionPipedUpsert(String key, CollectionAttributes attribute,
11+
Transcoder<T> tc, int itemCount) {
12+
super(key, attribute, tc, itemCount);
13+
}
14+
15+
public static class BTreePipedUpsert<T> extends BTreePipedInsert<T> {
16+
private static final String COMMAND = "bop upsert";
17+
18+
public BTreePipedUpsert(String key, Map<Long, T> map,
19+
CollectionAttributes attr, Transcoder<T> tc) {
20+
super(key, map, attr, tc);
21+
}
22+
23+
@Override
24+
public String getCommand() {
25+
return COMMAND;
26+
}
27+
}
28+
29+
public static class ByteArrayBTreePipedUpsert<T> extends ByteArraysBTreePipedInsert<T> {
30+
private static final String COMMAND = "bop upsert";
31+
32+
public ByteArrayBTreePipedUpsert(String key, List<Element<T>> elements,
33+
CollectionAttributes attr, Transcoder<T> tc) {
34+
super(key, elements, attr, tc);
35+
}
36+
37+
@Override
38+
public String getCommand() {
39+
return COMMAND;
40+
}
41+
}
42+
}

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class CollectionPipedInsertOperationImpl extends OperationImpl
4949
true, "CREATED_STORED", CollectionResponse.CREATED_STORED);
5050
private static final OperationStatus STORED = new CollectionOperationStatus(
5151
true, "STORED", CollectionResponse.STORED);
52+
private static final OperationStatus REPLACED = new CollectionOperationStatus(
53+
true, "REPLACED", CollectionResponse.REPLACED);
5254
private static final OperationStatus NOT_FOUND = new CollectionOperationStatus(
5355
false, "NOT_FOUND", CollectionResponse.NOT_FOUND);
5456
private static final OperationStatus ELEMENT_EXISTS = new CollectionOperationStatus(
@@ -117,8 +119,8 @@ assert getState() == OperationState.READING
117119

118120
if (insert.isNotPiped()) {
119121
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
120-
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
121-
TYPE_MISMATCH, BKEY_MISMATCH);
122+
REPLACED, NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED,
123+
OUT_OF_RANGE, TYPE_MISMATCH, BKEY_MISMATCH);
122124
if (status.isSuccess()) {
123125
cb.receivedStatus((successAll) ? END : FAILED_END);
124126
} else {
@@ -157,8 +159,8 @@ assert getState() == OperationState.READING
157159
count = Integer.parseInt(stuff[1]);
158160
} else {
159161
OperationStatus status = matchStatus(line, STORED, CREATED_STORED,
160-
NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE,
161-
TYPE_MISMATCH, BKEY_MISMATCH);
162+
REPLACED, NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED,
163+
OUT_OF_RANGE, TYPE_MISMATCH, BKEY_MISMATCH);
162164

163165
if (!status.isSuccess()) {
164166
cb.gotStatus(index, status);

0 commit comments

Comments
 (0)