diff --git a/common/src/main/kotlin/streams/utils/JSONUtils.kt b/common/src/main/kotlin/streams/utils/JSONUtils.kt index 283314b5..8b233851 100644 --- a/common/src/main/kotlin/streams/utils/JSONUtils.kt +++ b/common/src/main/kotlin/streams/utils/JSONUtils.kt @@ -17,9 +17,7 @@ import com.fasterxml.jackson.module.kotlin.convertValue import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import org.neo4j.driver.internal.value.PointValue -import org.neo4j.function.ThrowingBiConsumer import org.neo4j.graphdb.spatial.Point -import org.neo4j.values.AnyValue import org.neo4j.values.storable.CoordinateReferenceSystem import org.neo4j.values.storable.Values import org.neo4j.values.virtual.MapValue @@ -40,35 +38,43 @@ import java.time.temporal.TemporalAccessor import kotlin.reflect.full.isSubclassOf abstract class StreamsPoint { abstract val crs: String } -data class StreamsPointCartesian(override val crs: String, val x: Double, val y: Double, val z: Double? = null): StreamsPoint() -data class StreamsPointWgs(override val crs: String, val latitude: Double, val longitude: Double, val height: Double? = null): StreamsPoint() +data class StreamsPointCartesian2D(override val crs: String, val x: Double, val y: Double): StreamsPoint() +data class StreamsPointCartesian3D(override val crs: String, val x: Double, val y: Double, val z: Double): StreamsPoint() +data class StreamsPointWgs2D(override val crs: String, val latitude: Double, val longitude: Double): StreamsPoint() +data class StreamsPointWgs3D(override val crs: String, val latitude: Double, val longitude: Double, val height: Double): StreamsPoint() fun Point.toStreamsPoint(): StreamsPoint { val crsType = this.crs.type val coordinate = this.coordinates[0].coordinate return when (this.crs) { - CoordinateReferenceSystem.Cartesian -> StreamsPointCartesian(crsType, coordinate[0], coordinate[1]) - CoordinateReferenceSystem.Cartesian_3D -> StreamsPointCartesian(crsType, coordinate[0], coordinate[1], coordinate[2]) - CoordinateReferenceSystem.WGS84 -> StreamsPointWgs(crsType, coordinate[0], coordinate[1]) - CoordinateReferenceSystem.WGS84_3D -> StreamsPointWgs(crsType, coordinate[0], coordinate[1], coordinate[2]) + CoordinateReferenceSystem.Cartesian -> StreamsPointCartesian2D(crsType, coordinate[0], coordinate[1]) + CoordinateReferenceSystem.Cartesian_3D -> StreamsPointCartesian3D(crsType, coordinate[0], coordinate[1], coordinate[2]) + CoordinateReferenceSystem.WGS84 -> StreamsPointWgs2D(crsType, coordinate[0], coordinate[1]) + CoordinateReferenceSystem.WGS84_3D -> StreamsPointWgs3D(crsType, coordinate[0], coordinate[1], coordinate[2]) else -> throw IllegalArgumentException("Point type $crsType not supported") } } -fun Map.toMapValue(): MapValue { - val map = this - val builder = MapValueBuilder() - map.forEach { (t, u) -> builder.add(t, Values.of(u)) } - return builder.build() +fun Map.toMapValue(): MapValue { + return toMapValue(true) +} + +fun Map.toMapValue(allowNulls: Boolean): MapValue { + return this + .filterValues { v -> allowNulls || v != null } + .entries + .fold(MapValueBuilder()) { builder, entry -> + builder.add(entry.key, Values.of(entry.value, allowNulls)); builder} + .build() } fun PointValue.toStreamsPoint(): StreamsPoint { val point = this.asPoint() return when (val crsType = point.srid()) { - CoordinateReferenceSystem.Cartesian.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian.name, point.x(), point.y()) - CoordinateReferenceSystem.Cartesian_3D.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian_3D.name, point.x(), point.y(), point.z()) - CoordinateReferenceSystem.WGS84.code -> StreamsPointWgs(CoordinateReferenceSystem.WGS84.name, point.x(), point.y()) - CoordinateReferenceSystem.WGS84_3D.code -> StreamsPointWgs(CoordinateReferenceSystem.WGS84_3D.name, point.x(), point.y(), point.z()) + CoordinateReferenceSystem.Cartesian.code -> StreamsPointCartesian2D(CoordinateReferenceSystem.Cartesian.name, point.x(), point.y()) + CoordinateReferenceSystem.Cartesian_3D.code -> StreamsPointCartesian3D(CoordinateReferenceSystem.Cartesian_3D.name, point.x(), point.y(), point.z()) + CoordinateReferenceSystem.WGS84.code -> StreamsPointWgs2D(CoordinateReferenceSystem.WGS84.name, point.x(), point.y()) + CoordinateReferenceSystem.WGS84_3D.code -> StreamsPointWgs3D(CoordinateReferenceSystem.WGS84_3D.name, point.x(), point.y(), point.z()) else -> throw IllegalArgumentException("Point type $crsType not supported") } } @@ -76,10 +82,10 @@ fun PointValue.toStreamsPoint(): StreamsPoint { fun org.neo4j.driver.types.Point.toStreamsPoint(): StreamsPoint { val point = this return when (val crsType = point.srid()) { - CoordinateReferenceSystem.Cartesian.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian.name, point.x(), point.y()) - CoordinateReferenceSystem.Cartesian_3D.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian_3D.name, point.x(), point.y(), point.z()) - CoordinateReferenceSystem.WGS84.code -> StreamsPointWgs(CoordinateReferenceSystem.WGS84.name, point.x(), point.y()) - CoordinateReferenceSystem.WGS84_3D.code -> StreamsPointWgs(CoordinateReferenceSystem.WGS84_3D.name, point.x(), point.y(), point.z()) + CoordinateReferenceSystem.Cartesian.code -> StreamsPointCartesian2D(CoordinateReferenceSystem.Cartesian.name, point.x(), point.y()) + CoordinateReferenceSystem.Cartesian_3D.code -> StreamsPointCartesian3D(CoordinateReferenceSystem.Cartesian_3D.name, point.x(), point.y(), point.z()) + CoordinateReferenceSystem.WGS84.code -> StreamsPointWgs2D(CoordinateReferenceSystem.WGS84.name, point.x(), point.y()) + CoordinateReferenceSystem.WGS84_3D.code -> StreamsPointWgs3D(CoordinateReferenceSystem.WGS84_3D.name, point.x(), point.y(), point.z()) else -> throw IllegalArgumentException("Point type $crsType not supported") } } @@ -236,7 +242,7 @@ abstract class StreamsTransactionEventDeserializer : Js ?.properties ?.mapValues { if (points.contains(it.key)) { - org.neo4j.values.storable.PointValue.fromMap((it.value as Map).toMapValue()) + org.neo4j.values.storable.PointValue.fromMap((it.value as Map).toMapValue(false)) } else { it.value } diff --git a/common/src/test/kotlin/streams/utils/JSONUtilsTest.kt b/common/src/test/kotlin/streams/utils/JSONUtilsTest.kt index fcf8ef3c..4f955b05 100644 --- a/common/src/test/kotlin/streams/utils/JSONUtilsTest.kt +++ b/common/src/test/kotlin/streams/utils/JSONUtilsTest.kt @@ -26,9 +26,9 @@ class JSONUtilsTest { @Test fun `should serialize Geometry and Temporal Data Types`() { // Given - val expected = "{\"point2dCartesian\":{\"crs\":\"cartesian\",\"x\":1.0,\"y\":2.0,\"z\":null}," + + val expected = "{\"point2dCartesian\":{\"crs\":\"cartesian\",\"x\":1.0,\"y\":2.0}," + "\"point3dCartesian\":{\"crs\":\"cartesian-3d\",\"x\":1.0,\"y\":2.0,\"z\":3.0}," + - "\"point2dWgs84\":{\"crs\":\"wgs-84\",\"latitude\":1.0,\"longitude\":2.0,\"height\":null}," + + "\"point2dWgs84\":{\"crs\":\"wgs-84\",\"latitude\":1.0,\"longitude\":2.0}," + "\"point3dWgs84\":{\"crs\":\"wgs-84-3d\",\"latitude\":1.0,\"longitude\":2.0,\"height\":3.0}," + "\"time\":\"14:00:00Z\",\"dateTime\":\"2017-12-17T17:14:35.123456789Z\"}" val map = linkedMapOf("point2dCartesian" to pointValue(Cartesian, 1.0, 2.0), @@ -48,9 +48,9 @@ class JSONUtilsTest { @Test fun `should serialize driver Point Data Types`() { // Given - val expected = "{\"point2dCartesian\":{\"crs\":\"cartesian\",\"x\":1.0,\"y\":2.0,\"z\":null}," + + val expected = "{\"point2dCartesian\":{\"crs\":\"cartesian\",\"x\":1.0,\"y\":2.0}," + "\"point3dCartesian\":{\"crs\":\"cartesian-3d\",\"x\":1.0,\"y\":2.0,\"z\":3.0}," + - "\"point2dWgs84\":{\"crs\":\"wgs-84\",\"latitude\":1.0,\"longitude\":2.0,\"height\":null}," + + "\"point2dWgs84\":{\"crs\":\"wgs-84\",\"latitude\":1.0,\"longitude\":2.0}," + "\"point3dWgs84\":{\"crs\":\"wgs-84-3d\",\"latitude\":1.0,\"longitude\":2.0,\"height\":3.0}," + "\"time\":\"14:00:00Z\",\"dateTime\":\"2017-12-17T17:14:35.123456789Z\"}" val map = linkedMapOf("point2dCartesian" to pointValue(Cartesian, 1.0, 2.0), @@ -111,6 +111,102 @@ class JSONUtilsTest { assertEquals(cdcData, fromString) } + @Test + fun `should convert cdcString wgs2D with height null to PointValue`() { + // given + val timestamp = System.currentTimeMillis() + val expectedPointValue = org.neo4j.values.storable.PointValue.fromMap(mapOf("crs" to "wgs-84", "latitude" to 12.78, "longitude" to 56.7).toMapValue()) + + //when + val cdcString = """{ + |"meta":{"timestamp":$timestamp,"username":"user","txId":1,"txEventId":0,"txEventsCount":1,"operation":"created"}, + |"payload":{"id":"0","before":null,"after":{"properties":{"location":{"crs":"wgs-84","latitude":12.78,"longitude":56.7,"height":null}}, + |"labels":["LabelCDC"]},"type":"node"}, + |"schema":{"properties":{"location":"PointValue"}} + |}""".trimMargin() + + //then + val actualEvent = JSONUtils.asStreamsTransactionEvent(cdcString) + val actualPointValue = actualEvent.payload.after?.properties?.get("location") as org.neo4j.values.storable.PointValue + assertEquals(expectedPointValue, actualPointValue) + } + + @Test + fun `should convert cdcMap wgs2D with height null to PointValue`() { + // given + val timestamp = System.currentTimeMillis() + val expectedPointValue = org.neo4j.values.storable.PointValue.fromMap(mapOf("crs" to "wgs-84", "latitude" to 12.78, "longitude" to 56.7).toMapValue()) + + //when + val cdcMap = mapOf( + "meta" to mapOf("timestamp" to timestamp, + "username" to "user", + "txId" to 1, + "txEventId" to 0, + "txEventsCount" to 1, + "operation" to OperationType.created), + "payload" to mapOf("id" to "0", + "before" to null, + "after" to NodeChange(properties = mapOf("location" to mapOf("crs" to "wgs-84", "latitude" to 12.78, "longitude" to 56.7, "height" to null)), + labels = listOf("LabelCDC")), + "type" to EntityType.node), + "schema" to mapOf("properties" to mapOf("location" to "PointValue")) + ) + + //then + val actualEvent = JSONUtils.asStreamsTransactionEvent(cdcMap) + val actualPointValue = actualEvent.payload.after?.properties?.get("location") as org.neo4j.values.storable.PointValue + assertEquals(expectedPointValue, actualPointValue) + } + + @Test + fun `should convert cdcString cartesian2D with z null to PointValue`() { + // given + val timestamp = System.currentTimeMillis() + val expectedPointValue = org.neo4j.values.storable.PointValue.fromMap(mapOf("crs" to "cartesian", "x" to 12.78, "y" to 56.7).toMapValue()) + + //when + val cdcString = """{ + |"meta":{"timestamp":$timestamp,"username":"user","txId":1,"txEventId":0,"txEventsCount":1,"operation":"created"}, + |"payload":{"id":"0","before":null,"after":{"properties":{"location":{"crs":"cartesian","x":12.78,"y":56.7,"z":null}}, + |"labels":["LabelCDC"]},"type":"node"}, + |"schema":{"properties":{"location":"PointValue"}} + |}""".trimMargin() + + //then + val actualEvent = JSONUtils.asStreamsTransactionEvent(cdcString) + val actualPointValue = actualEvent.payload.after?.properties?.get("location") as org.neo4j.values.storable.PointValue + assertEquals(expectedPointValue, actualPointValue) + } + + @Test + fun `should convert cdcMap cartesian2D with z null to PointValue`() { + // given + val timestamp = System.currentTimeMillis() + val expectedPointValue = org.neo4j.values.storable.PointValue.fromMap(mapOf("crs" to "cartesian", "x" to 12.78, "y" to 56.7).toMapValue()) + + //when + val cdcMap = mapOf( + "meta" to mapOf("timestamp" to timestamp, + "username" to "user", + "txId" to 1, + "txEventId" to 0, + "txEventsCount" to 1, + "operation" to OperationType.created), + "payload" to mapOf("id" to "0", + "before" to null, + "after" to NodeChange(properties = mapOf("location" to mapOf("crs" to "cartesian", "x" to 12.78, "y" to 56.7, "z" to null)), + labels = listOf("LabelCDC")), + "type" to EntityType.node), + "schema" to mapOf("properties" to mapOf("location" to "PointValue")) + ) + + //then + val actualEvent = JSONUtils.asStreamsTransactionEvent(cdcMap) + val actualPointValue = actualEvent.payload.after?.properties?.get("location") as org.neo4j.values.storable.PointValue + assertEquals(expectedPointValue, actualPointValue) + } + @Test fun `should deserialize plain values`() { assertEquals("a", JSONUtils.readValue("a", stringWhenFailure = true))