Skip to content

Commit 3a861b7

Browse files
committed
[DEVEX-250] Added the initial implementation of Jackson Serializer
1 parent 4dc914d commit 3a861b7

File tree

6 files changed

+105
-19
lines changed

6 files changed

+105
-19
lines changed

db-client-java/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@ dependencies {
4646
implementation "io.grpc:grpc-stub:${grpcVersion}"
4747
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
4848
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
49+
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
4950
implementation "org.slf4j:slf4j-api:2.0.17"
5051
implementation "org.bouncycastle:bcprov-jdk18on:1.80"
5152
implementation "org.bouncycastle:bcpkix-jdk18on:1.80"
52-
53+
5354
implementation platform("io.opentelemetry:opentelemetry-bom:${openTelemetryVersion}")
5455
implementation "io.opentelemetry:opentelemetry-api"
5556
implementation "io.opentelemetry.semconv:opentelemetry-semconv:${openTelemetrySemConvVersion}"
@@ -64,7 +65,6 @@ dependencies {
6465
testImplementation "org.reactivestreams:reactive-streams-tck:${reactiveStreamsApiVersion}"
6566
testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}"
6667
testImplementation platform("com.fasterxml.jackson:jackson-bom:${jacksonVersion}")
67-
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
6868
testImplementation "com.github.javafaker:javafaker:1.0.2"
6969
testImplementation 'org.slf4j:slf4j-simple:2.0.17'
7070
testImplementation "io.opentelemetry:opentelemetry-sdk"

db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ public final class MessageData {
1212
private final byte[] messageData;
1313
private final byte[] messageMetadata;
1414

15-
MessageData(String messageType, byte[] messageData) {
15+
public MessageData(String messageType, byte[] messageData) {
1616
this(messageType, messageData, null, UUID.randomUUID(), ContentType.JSON);
1717
}
1818

19-
MessageData(String messageType, byte[] messageData, byte[] userMetadata) {
19+
public MessageData(String messageType, byte[] messageData, byte[] userMetadata) {
2020
this(messageType, messageData, userMetadata, UUID.randomUUID(), ContentType.JSON);
2121
}
22-
23-
MessageData(String messageType, byte[] messageData, byte[] userMetadata, UUID messageId, String contentType) {
22+
23+
public MessageData(String messageType, byte[] messageData, byte[] userMetadata, UUID messageId, String contentType) {
2424
this.messageId = messageId;
2525
this.messageType = messageType;
2626
this.contentType = contentType;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.kurrent.dbclient.serialization;
2+
3+
import com.fasterxml.jackson.annotation.JsonInclude;
4+
import com.fasterxml.jackson.core.JsonProcessingException;
5+
import com.fasterxml.jackson.databind.*;
6+
import com.fasterxml.jackson.databind.json.JsonMapper;
7+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.io.IOException;
12+
import java.util.Optional;
13+
14+
public class JacksonSerializer implements Serializer {
15+
public static final JsonMapper.Builder defaultBuilder = JsonMapper.builder()
16+
.addModule(new JavaTimeModule())
17+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
18+
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
19+
.propertyNamingStrategy(PropertyNamingStrategies.LOWER_CAMEL_CASE)
20+
.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)
21+
.serializationInclusion(JsonInclude.Include.NON_NULL)
22+
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
23+
24+
private static final Logger logger = LoggerFactory.getLogger(JacksonSerializer.class);
25+
26+
private final JsonMapper jsonMapper;
27+
28+
public JacksonSerializer(){
29+
this(defaultBuilder);
30+
}
31+
32+
public JacksonSerializer(JsonMapper.Builder builder){
33+
jsonMapper = builder != null ? builder.build() : defaultBuilder.build();
34+
}
35+
36+
@Override
37+
public byte[] serialize(Object value) {
38+
try {
39+
return jsonMapper.writeValueAsBytes(value);
40+
} catch (JsonProcessingException e) {
41+
throw new RuntimeException(e);
42+
}
43+
}
44+
45+
@Override
46+
public <MessageType> Optional<MessageType> deserialize(Class<MessageType> eventClass, byte[] data) {
47+
try {
48+
MessageType result = jsonMapper.readValue(data, eventClass);
49+
50+
return Optional.ofNullable(result);
51+
} catch (IOException e) {
52+
logger.warn("Error deserializing event {}", eventClass.getName(), e);
53+
return Optional.empty();
54+
}
55+
}
56+
}

db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializerImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,18 @@
77
import java.util.stream.Collectors;
88

99
class MessageSerializerImpl implements MessageSerializer {
10+
Serializer serializer = new JacksonSerializer();
11+
1012
@Override
1113
public MessageSerializer with(OperationSerializationSettings serializationSettings) {
12-
return null;
14+
return this;
1315
}
1416

1517
@Override
1618
public MessageData serialize(Message value, MessageSerializationContext context) {
17-
return null;
19+
return MessageData
20+
.builderAsJson(value.getData().getClass().getTypeName(), serializer.serialize(value))
21+
.build();
1822
}
1923

2024
@Override
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.kurrent.dbclient.serialization;
2+
3+
import java.util.Optional;
4+
5+
/// <summary>
6+
/// Defines the core serialization capabilities required by the KurrentDB client.
7+
/// Implementations of this interface handle the conversion between Java objects and their
8+
/// binary representation for storage in and retrieval from the event store.
9+
/// <br />
10+
/// The client ships default Jackson implementation, but custom implementations can be provided or other formats.
11+
/// </summary>
12+
public interface Serializer {
13+
/// <summary>
14+
/// Converts a Java object to its binary representation for storage in the event store.
15+
/// </summary>
16+
/// <param name="value">The object to serialize. This could be an event, command, or metadata object.</param>
17+
/// <returns>
18+
/// A binary representation of the object that can be stored in KurrentDB.
19+
/// </returns>
20+
byte[] serialize(Object value);
21+
22+
/// <summary>
23+
/// Reconstructs a Java object from its binary representation retrieved from the event store.
24+
/// </summary>
25+
/// <param name="data">The binary data to deserialize, typically retrieved from a KurrentDB event.</param>
26+
/// <param name="type">The target Java type to deserialize the data into, determined from message type mappings.</param>
27+
/// <returns>
28+
/// The deserialized object cast to the specified type, or null if the data cannot be deserialized.
29+
/// The returned object will be an instance of the specified type or a compatible subtype.
30+
/// </returns>
31+
<MessageType> Optional<MessageType> deserialize(Class<MessageType> eventClass, byte[] data);
32+
}

db-client-java/src/test/java/io/kurrent/dbclient/streams/serialization/SerializationTests.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,19 @@ default void testPlainJavaObjectsAreSerializedAndDeserializedUsingAutoSerializat
1616

1717
// Given
1818
final String streamName = generateName();
19-
final List<UserRegistered> expected = generateMessages(2);
19+
final Object[] expected = generateMessages(2).toArray();
2020

21-
2221
// When
2322
AppendToStreamOptions appendOptions = AppendToStreamOptions.get()
2423
.expectedRevision(ExpectedRevision.noStream());
2524

26-
WriteResult appendResult = client.appendToStream(streamName, appendOptions, expected.iterator())
25+
WriteResult appendResult = client.appendToStream(streamName, appendOptions, expected)
2726
.get();
2827

29-
Assertions.assertEquals(ExpectedRevision.expectedRevision(0), appendResult.getNextExpectedRevision());
30-
31-
ReadStreamOptions readStreamOptions = ReadStreamOptions.get()
32-
.fromEnd()
33-
.backwards()
34-
.maxCount(1);
35-
28+
Assertions.assertEquals(ExpectedRevision.expectedRevision(1), appendResult.getNextExpectedRevision());
29+
3630
// Ensure appended event is readable
37-
ReadResult result = client.readStream(streamName, readStreamOptions)
31+
ReadResult result = client.readStream(streamName, ReadStreamOptions.get())
3832
.get();
3933

4034
Assertions.assertEquals(2, result.getEvents().size());

0 commit comments

Comments
 (0)