Skip to content

Commit

Permalink
fix(aws dynamoDb) fix update item operation (#1818)
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleksiivanov authored Jan 26, 2024
1 parent 50a0b9f commit 3a54502
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,27 @@
*/
package io.camunda.connector.aws.dynamodb.operation.item;

import com.amazonaws.services.dynamodbv2.document.AttributeUpdate;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.UpdateItemOutcome;
import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec;
import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
import com.amazonaws.services.dynamodbv2.model.ReturnValue;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.aws.ObjectMapperSupplier;
import io.camunda.connector.aws.dynamodb.model.UpdateItem;
import io.camunda.connector.aws.dynamodb.operation.AwsDynamoDbOperation;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

public class UpdateItemOperation implements AwsDynamoDbOperation {
private static final String ACTION_PUT = "put";
private static final String ACTION_DELETE = "delete";
private static final String UPDATE_EXPRESSION_SET = "set ";
private static final String UPDATE_EXPRESSION_REMOVE = "remove ";

private final UpdateItem updateItemModel;
private final ObjectMapper objectMapper;

Expand All @@ -31,39 +37,85 @@ public UpdateItemOperation(final UpdateItem updateItemModel) {

@Override
public Object invoke(final DynamoDB dynamoDB) {
Map<String, Object> primaryKeyMap =
objectMapper.convertValue(updateItemModel.primaryKeyComponents(), new TypeReference<>() {});
Map<String, Object> attributesMap =
objectMapper.convertValue(updateItemModel.keyAttributes(), new TypeReference<>() {});

String tableName = updateItemModel.tableName();
String action = updateItemModel.attributeAction();

return updateItem(dynamoDB, primaryKeyMap, attributesMap, action, tableName);
}

private UpdateItemOutcome updateItem(
DynamoDB dynamoDB,
Map<String, Object> primaryKeyMap,
Map<String, Object> attributesMap,
String action,
String tableName) {
try {
Table table = dynamoDB.getTable(tableName);
PrimaryKey primaryKey = buildPrimaryKey(primaryKeyMap);

UpdateItemSpec updateItemSpec = buildUpdateItemSpec(primaryKey, attributesMap, action);
return table.updateItem(updateItemSpec);
} catch (Exception e) {
throw new ConnectorException("Error in updateItem operation: " + e.getMessage(), e);
}
}

private PrimaryKey buildPrimaryKey(Map<String, Object> primaryKeyMap) {
PrimaryKey primaryKey = new PrimaryKey();
objectMapper
.convertValue(
updateItemModel.keyAttributes(), new TypeReference<HashMap<String, Object>>() {})
.forEach(primaryKey::addComponent);

return dynamoDB
.getTable(updateItemModel.tableName())
.updateItem(primaryKey, getAttributeUpdatesArray());
primaryKeyMap.forEach(primaryKey::addComponent);
return primaryKey;
}

private UpdateItemSpec buildUpdateItemSpec(
PrimaryKey primaryKey, Map<String, Object> attributesMap, String action) {
UpdateItemSpec updateItemSpec = new UpdateItemSpec().withPrimaryKey(primaryKey);
ValueMap valueMap = new ValueMap();
StringBuilder updateExpression = new StringBuilder();

if (ACTION_PUT.equalsIgnoreCase(action)) {
buildPutUpdateExpression(attributesMap, updateExpression, valueMap);
} else if (ACTION_DELETE.equalsIgnoreCase(action)) {
buildDeleteUpdateExpression(attributesMap, updateExpression);
} else {
throw new ConnectorException("Invalid action: " + action);
}

updateItemSpec
.withUpdateExpression(updateExpression.toString())
.withReturnValues(ReturnValue.ALL_NEW);
if (!valueMap.isEmpty()) {
updateItemSpec.withValueMap(valueMap);
}
return updateItemSpec;
}

private void buildPutUpdateExpression(
Map<String, Object> attributesMap, StringBuilder updateExpression, ValueMap valueMap) {
updateExpression.append(UPDATE_EXPRESSION_SET);
attributesMap.forEach(
(k, v) -> {
String attrKey = ":" + k;
updateExpression.append(k).append(" = ").append(attrKey).append(", ");
valueMap.withString(attrKey, String.valueOf(v));
});
trimTrailingComma(updateExpression);
}

private void buildDeleteUpdateExpression(
Map<String, Object> attributesMap, StringBuilder updateExpression) {
updateExpression.append(UPDATE_EXPRESSION_REMOVE);
attributesMap.keySet().forEach(k -> updateExpression.append(k).append(", "));
trimTrailingComma(updateExpression);
}

private AttributeUpdate[] getAttributeUpdatesArray() {
List<AttributeUpdate> attributeUpdates = new ArrayList<>();
objectMapper
.convertValue(
updateItemModel.keyAttributes(), new TypeReference<HashMap<String, Object>>() {})
.forEach(
(key, value) -> {
AttributeUpdate attributeUpdate;
AttributeAction attributeAction =
AttributeAction.fromValue(
updateItemModel.attributeAction().toUpperCase(Locale.ROOT));
attributeUpdate =
switch (attributeAction) {
case PUT -> new AttributeUpdate(key).put(value);
case DELETE -> new AttributeUpdate(key).delete();
default -> throw new IllegalArgumentException(
"Unsupported action [" + attributeAction + "]");
};

attributeUpdates.add(attributeUpdate);
});
return attributeUpdates.toArray(AttributeUpdate[]::new);
private void trimTrailingComma(StringBuilder updateExpression) {
if (!updateExpression.isEmpty()) {
updateExpression.setLength(updateExpression.length() - 2); // Remove last comma and space
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

import com.amazonaws.services.dynamodbv2.document.AttributeUpdate;
import com.amazonaws.services.dynamodbv2.document.KeyAttribute;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.UpdateItemOutcome;
import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.aws.dynamodb.BaseDynamoDbOperationTest;
Expand All @@ -28,23 +28,20 @@

class UpdateItemOperationTest extends BaseDynamoDbOperationTest {
private UpdateItemOperation updateItemOperation;
private UpdateItem updateItem;
@Mock private UpdateItemOutcome updateItemOutcome;
@Captor private ArgumentCaptor<PrimaryKey> primaryKeyArgumentCaptor;
@Captor private ArgumentCaptor<AttributeUpdate> attributeUpdateArgumentCaptor;
@Captor private ArgumentCaptor<UpdateItemSpec> updateItemSpecArgumentCaptor;
private KeyAttribute keyAttribute;
private AttributeUpdate attributeUpdate;

@BeforeEach
public void setUp() {

keyAttribute = new KeyAttribute("id", "123");
attributeUpdate = new AttributeUpdate("name").addElements("John Doe");
AttributeUpdate attributeUpdate = new AttributeUpdate("name").addElements("John Doe");

Map<String, Object> primaryKey = Map.of(keyAttribute.getName(), keyAttribute.getValue());
Map<String, Object> attributeUpdates = Map.of(attributeUpdate.getAttributeName(), "John Doe");

updateItem =
UpdateItem updateItem =
new UpdateItem(
TestDynamoDBData.ActualValue.TABLE_NAME, primaryKey, attributeUpdates, "PUT");
updateItemOperation = new UpdateItemOperation(updateItem);
Expand All @@ -53,19 +50,15 @@ public void setUp() {
@Test
public void testInvoke() {
// Given
when(table.updateItem(
primaryKeyArgumentCaptor.capture(), attributeUpdateArgumentCaptor.capture()))
.thenReturn(updateItemOutcome);
when(table.updateItem(updateItemSpecArgumentCaptor.capture())).thenReturn(updateItemOutcome);
// When
Object result = updateItemOperation.invoke(dynamoDB);
// Then
assertThat(result).isInstanceOf(UpdateItemOutcome.class);
assertThat(((UpdateItemOutcome) result).getItem()).isEqualTo(updateItemOutcome.getItem());
// TODO: Uncomment the following assertion after fixing the bug reported in
// https://github.com/camunda/connectors/issues/1804
// assertThat(primaryKeyArgumentCaptor.getValue().getComponents()).contains(keyAttribute);
assertThat(attributeUpdateArgumentCaptor.getValue().getAttributeName())
.isEqualTo(attributeUpdate.getAttributeName());
UpdateItemSpec value = updateItemSpecArgumentCaptor.getValue();
assertThat(value.getKeyComponents()).contains(keyAttribute);
assertThat(value.getValueMap()).isEqualTo(Map.of(":name", "John Doe"));
}

@Test
Expand Down

0 comments on commit 3a54502

Please sign in to comment.