diff --git a/springwolf-core/src/main/java/io/github/springwolf/core/asyncapi/scanners/common/headers/HeaderClassExtractor.java b/springwolf-core/src/main/java/io/github/springwolf/core/asyncapi/scanners/common/headers/HeaderClassExtractor.java index 1adcae868..96c769d04 100644 --- a/springwolf-core/src/main/java/io/github/springwolf/core/asyncapi/scanners/common/headers/HeaderClassExtractor.java +++ b/springwolf-core/src/main/java/io/github/springwolf/core/asyncapi/scanners/common/headers/HeaderClassExtractor.java @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 package io.github.springwolf.core.asyncapi.scanners.common.headers; +import io.github.springwolf.asyncapi.v3.model.schema.Schema; import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject; import io.github.springwolf.asyncapi.v3.model.schema.SchemaType; import io.github.springwolf.core.asyncapi.scanners.common.payload.PayloadSchemaObject; @@ -33,12 +34,21 @@ public SchemaObject extractHeader(Method method, PayloadSchemaObject payload) { Header headerAnnotation = argument.getAnnotation(Header.class); String headerName = getHeaderAnnotationName(headerAnnotation); - SchemaObject schema = schemaService - .extractSchema(argument.getType()) - .rootSchema() - .getSchema(); + SwaggerSchemaService.ExtractedSchemas extractedSchema = schemaService.extractSchema(argument.getType()); + Schema schema = extractedSchema.rootSchema().getSchema(); + if (schema == null && extractedSchema.referencedSchemas().size() == 1) { + schema = extractedSchema + .referencedSchemas() + .values() + .iterator() + .next(); + } - headers.getProperties().put(headerName, schema); + if (schema != null) { + headers.getProperties().put(headerName, schema); + } else { + log.debug("Unable to extract schema for header {} in method {}", headerName, methodName); + } } } diff --git a/springwolf-core/src/main/java/io/github/springwolf/core/controller/dtos/MessageDto.java b/springwolf-core/src/main/java/io/github/springwolf/core/controller/dtos/MessageDto.java index 16d46a6f7..034f289db 100644 --- a/springwolf-core/src/main/java/io/github/springwolf/core/controller/dtos/MessageDto.java +++ b/springwolf-core/src/main/java/io/github/springwolf/core/controller/dtos/MessageDto.java @@ -1,11 +1,17 @@ // SPDX-License-Identifier: Apache-2.0 package io.github.springwolf.core.controller.dtos; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.extern.jackson.Jacksonized; +import java.io.IOException; import java.util.Map; @Data @@ -17,11 +23,27 @@ public class MessageDto { private final Map bindings; - private final Map headers; + private final Map headers; @Builder.Default private final String payloadType = String.class.getCanonicalName(); @Builder.Default private final String payload = EMPTY; + + @JsonDeserialize(using = HeaderValueDeserializer.class) + public record HeaderValue(String stringValue) {} + + public static class HeaderValueDeserializer extends JsonDeserializer { + @Override + public HeaderValue deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode node = p.getCodec().readTree(p); + if (node.isNumber()) { + return new HeaderValue(node.numberValue().toString()); + } else if (node.isTextual()) { + return new HeaderValue(node.textValue()); + } + return new HeaderValue(node.toString()); + } + } } diff --git a/springwolf-core/src/test/java/io/github/springwolf/core/asyncapi/controller/dtos/MessageDtoDeserializationTest.java b/springwolf-core/src/test/java/io/github/springwolf/core/asyncapi/controller/dtos/MessageDtoDeserializationTest.java index 89cf725d2..1c09625e5 100644 --- a/springwolf-core/src/test/java/io/github/springwolf/core/asyncapi/controller/dtos/MessageDtoDeserializationTest.java +++ b/springwolf-core/src/test/java/io/github/springwolf/core/asyncapi/controller/dtos/MessageDtoDeserializationTest.java @@ -24,7 +24,8 @@ void testCanBeSerialized() throws IOException, ClassNotFoundException { MessageDto value = objectMapper.readValue(content, MessageDto.class); assertThat(value).isNotNull(); - assertThat(value.getHeaders()).isEqualTo(singletonMap("some-header-key", "some-header-value")); + assertThat(value.getHeaders()) + .isEqualTo(singletonMap("some-header-key", new MessageDto.HeaderValue("some-header-value"))); assertThat(value.getPayload()) .isEqualTo( new ObjectMapper().writeValueAsString(singletonMap("some-payload-key", "some-payload-value"))); diff --git a/springwolf-core/src/test/java/io/github/springwolf/core/asyncapi/scanners/common/headers/HeaderClassExtractorIntegrationTest.java b/springwolf-core/src/test/java/io/github/springwolf/core/asyncapi/scanners/common/headers/HeaderClassExtractorIntegrationTest.java new file mode 100644 index 000000000..f219ee87c --- /dev/null +++ b/springwolf-core/src/test/java/io/github/springwolf/core/asyncapi/scanners/common/headers/HeaderClassExtractorIntegrationTest.java @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.springwolf.core.asyncapi.scanners.common.headers; + +import io.github.springwolf.asyncapi.v3.model.components.ComponentSchema; +import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject; +import io.github.springwolf.asyncapi.v3.model.schema.SchemaType; +import io.github.springwolf.core.asyncapi.scanners.common.payload.PayloadSchemaObject; +import io.github.springwolf.core.asyncapi.schemas.SwaggerSchemaService; +import io.github.springwolf.core.asyncapi.schemas.SwaggerSchemaUtil; +import io.github.springwolf.core.configuration.properties.SpringwolfConfigProperties; +import lombok.val; +import org.junit.jupiter.api.Test; +import org.springframework.messaging.handler.annotation.Header; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class HeaderClassExtractorIntegrationTest { + + private final SwaggerSchemaService schemaService = + new SwaggerSchemaService(List.of(), List.of(), new SwaggerSchemaUtil(), new SpringwolfConfigProperties()); + private final HeaderClassExtractor headerClassExtractor = new HeaderClassExtractor(schemaService); + + private final PayloadSchemaObject payloadSchemaName = new PayloadSchemaObject( + "payloadSchemaName", String.class.getSimpleName(), ComponentSchema.of(new SchemaObject())); + private final SchemaObject stringSchema = + SchemaObject.builder().type(SchemaType.STRING).build(); + + @Test + void getNoDocumentedHeaders() throws NoSuchMethodException { + // when + Method m = TestClass.class.getDeclaredMethod("consumeWithoutHeadersAnnotation", String.class); + val result = headerClassExtractor.extractHeader(m, payloadSchemaName); + + // then + assertEquals(AsyncHeadersNotDocumented.NOT_DOCUMENTED, result); + } + + @Test + void getHeaderWithSingleHeaderAnnotation() throws NoSuchMethodException { + // when + Method m = TestClass.class.getDeclaredMethod("consumeWithSingleHeaderAnnotation", String.class); + val result = headerClassExtractor.extractHeader(m, payloadSchemaName); + + // then + SchemaObject expectedHeaders = SchemaObject.builder() + .type(SchemaType.OBJECT) + .title("payloadSchemaNameHeaders") + .properties(new HashMap<>()) + .build(); + expectedHeaders.getProperties().put("kafka_receivedMessageKey", stringSchema); + + assertEquals(expectedHeaders, result); + } + + @Test + void getHeaderWithMultipleHeaderAnnotation() throws NoSuchMethodException { + // when + Method m = TestClass.class.getDeclaredMethod("consumeWithMultipleHeaderAnnotation", String.class, String.class); + val result = headerClassExtractor.extractHeader(m, payloadSchemaName); + + // then + SchemaObject expectedHeaders = SchemaObject.builder() + .type(SchemaType.OBJECT) + .title("payloadSchemaNameHeaders") + .properties(new HashMap<>()) + .build(); + expectedHeaders.getProperties().put("kafka_receivedMessageKey", stringSchema); + expectedHeaders.getProperties().put("non-exist", stringSchema); + + assertEquals(expectedHeaders, result); + } + + @Test + void getHeaderWithObjectHeaderAnnotation() throws NoSuchMethodException { + // when + Method m = TestClass.class.getDeclaredMethod("consumeWithObjectHeaderAnnotation", TestClass.MyHeader.class); + val result = headerClassExtractor.extractHeader(m, payloadSchemaName); + + // then + SchemaObject expectedHeaders = SchemaObject.builder() + .type(SchemaType.OBJECT) + .title("payloadSchemaNameHeaders") + .properties(new HashMap<>()) + .build(); + expectedHeaders + .getProperties() + .put( + "myHeader", + SchemaObject.builder() + .type(SchemaType.OBJECT) + .title("MyHeader") + .properties(Map.of("key", ComponentSchema.of(stringSchema))) + .build()); + + assertEquals(expectedHeaders, result); + } + + @Test + void getHeaderWithNestedHeaderAnnotation() throws NoSuchMethodException { + // when + Method m = TestClass.class.getDeclaredMethod( + "consumeWithNestedObjectHeaderAnnotation", TestClass.MyNestedHeader.class); + val result = headerClassExtractor.extractHeader(m, payloadSchemaName); + + // then + assertEquals(AsyncHeadersNotDocumented.NOT_DOCUMENTED, result); // currently not supported + } + + public static class TestClass { + + public void consumeWithoutHeadersAnnotation(String simpleValue) {} + + public void consumeWithSingleHeaderAnnotation(@Header("kafka_receivedMessageKey") String key) {} + + public void consumeWithMultipleHeaderAnnotation( + @Header("kafka_receivedMessageKey") String key, + @Header(name = "non-exist", defaultValue = "default") String nonExistingHeader) {} + + public void consumeWithObjectHeaderAnnotation(@Header("myHeader") MyHeader key) {} + + public void consumeWithNestedObjectHeaderAnnotation(@Header("myNestedHeader") MyNestedHeader key) {} + + static class MyHeader { + public String key; + } + + static class MyNestedHeader { + public MyHeader header; + } + } +} diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/springwolf/examples/kafka/consumers/ExampleClassLevelKafkaListener.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/springwolf/examples/kafka/consumers/ExampleClassLevelKafkaListener.java index 4a7ae050c..029e0977b 100644 --- a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/springwolf/examples/kafka/consumers/ExampleClassLevelKafkaListener.java +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/springwolf/examples/kafka/consumers/ExampleClassLevelKafkaListener.java @@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -30,6 +31,7 @@ public class ExampleClassLevelKafkaListener { public void receiveExamplePayload( @Header(KafkaHeaders.RECEIVED_KEY) String key, @Header(KafkaHeaders.OFFSET) Integer offset, + @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta, @Payload ExamplePayloadDto payload) { log.info("Received new message in {}: {}", TOPIC, payload.toString()); } diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/springwolf/examples/kafka/consumers/ExampleConsumer.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/springwolf/examples/kafka/consumers/ExampleConsumer.java index 4e8c2f020..b8637e0a0 100644 --- a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/springwolf/examples/kafka/consumers/ExampleConsumer.java +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/springwolf/examples/kafka/consumers/ExampleConsumer.java @@ -7,6 +7,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -25,6 +26,7 @@ public class ExampleConsumer { public void receiveExamplePayload( @Header(KafkaHeaders.RECEIVED_KEY) String key, @Header(KafkaHeaders.OFFSET) Integer offset, + @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta, @Payload ExamplePayloadDto payload) { log.info("Received new message in example-topic: {}", payload.toString()); diff --git a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json index fe008326e..807b688ce 100644 --- a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json +++ b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json @@ -444,8 +444,8 @@ "type": "object" } }, - "SpringKafkaDefaultHeaders-ExamplePayloadDto-1316119953": { - "title": "SpringKafkaDefaultHeaders-ExamplePayloadDto-1316119953", + "SpringKafkaDefaultHeaders-ExamplePayloadDto-1119017590": { + "title": "SpringKafkaDefaultHeaders-ExamplePayloadDto-1119017590", "type": "object", "properties": { "__TypeId__": { @@ -469,13 +469,20 @@ "examples": [ "\"string\"" ] + }, + "kafka_recordMetadata": { + "type": "object", + "examples": [ + { } + ] } }, "examples": [ { "__TypeId__": "io.github.springwolf.examples.kafka.dtos.ExamplePayloadDto", "kafka_offset": 0, - "kafka_receivedMessageKey": "string" + "kafka_receivedMessageKey": "string", + "kafka_recordMetadata": { } } ], "x-json-schema": { @@ -493,9 +500,12 @@ }, "kafka_receivedMessageKey": { "type": "string" + }, + "kafka_recordMetadata": { + "type": "object" } }, - "title": "SpringKafkaDefaultHeaders-ExamplePayloadDto-1316119953", + "title": "SpringKafkaDefaultHeaders-ExamplePayloadDto-1119017590", "type": "object" } }, @@ -1723,7 +1733,7 @@ }, "io.github.springwolf.examples.kafka.dtos.ExamplePayloadDto": { "headers": { - "$ref": "#/components/schemas/SpringKafkaDefaultHeaders-ExamplePayloadDto-1316119953" + "$ref": "#/components/schemas/SpringKafkaDefaultHeaders-ExamplePayloadDto-1119017590" }, "payload": { "schemaFormat": "application/vnd.aai.asyncapi+json;version=3.0.0", @@ -2147,4 +2157,4 @@ ] } } -} +} \ No newline at end of file diff --git a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.yaml b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.yaml index 96b3f1495..b6985194b 100644 --- a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.yaml +++ b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.yaml @@ -304,8 +304,8 @@ components: type: string title: SpringKafkaDefaultHeaders-AnotherPayloadDto type: object - SpringKafkaDefaultHeaders-ExamplePayloadDto-1316119953: - title: SpringKafkaDefaultHeaders-ExamplePayloadDto-1316119953 + SpringKafkaDefaultHeaders-ExamplePayloadDto-1119017590: + title: SpringKafkaDefaultHeaders-ExamplePayloadDto-1119017590 type: object properties: __TypeId__: @@ -323,10 +323,15 @@ components: type: string examples: - '"string"' + kafka_recordMetadata: + type: object + examples: + - {} examples: - __TypeId__: io.github.springwolf.examples.kafka.dtos.ExamplePayloadDto kafka_offset: 0 kafka_receivedMessageKey: string + kafka_recordMetadata: {} x-json-schema: $schema: https://json-schema.org/draft-04/schema# properties: @@ -339,7 +344,9 @@ components: type: integer kafka_receivedMessageKey: type: string - title: SpringKafkaDefaultHeaders-ExamplePayloadDto-1316119953 + kafka_recordMetadata: + type: object + title: SpringKafkaDefaultHeaders-ExamplePayloadDto-1119017590 type: object SpringKafkaDefaultHeaders-Message: title: SpringKafkaDefaultHeaders-Message @@ -1237,7 +1244,7 @@ components: bindingVersion: 0.5.0 io.github.springwolf.examples.kafka.dtos.ExamplePayloadDto: headers: - $ref: "#/components/schemas/SpringKafkaDefaultHeaders-ExamplePayloadDto-1316119953" + $ref: "#/components/schemas/SpringKafkaDefaultHeaders-ExamplePayloadDto-1119017590" payload: schemaFormat: application/vnd.aai.asyncapi+json;version=3.0.0 schema: @@ -1501,4 +1508,4 @@ operations: kafka: bindingVersion: 0.5.0 messages: - - $ref: "#/channels/yaml-topic/messages/io.github.springwolf.examples.kafka.dtos.YamlPayloadDto" + - $ref: "#/channels/yaml-topic/messages/io.github.springwolf.examples.kafka.dtos.YamlPayloadDto" \ No newline at end of file diff --git a/springwolf-plugins/springwolf-jms-plugin/src/main/java/io/github/springwolf/plugins/jms/producer/SpringwolfJmsProducer.java b/springwolf-plugins/springwolf-jms-plugin/src/main/java/io/github/springwolf/plugins/jms/producer/SpringwolfJmsProducer.java index 62aa723c9..d30a4fd53 100644 --- a/springwolf-plugins/springwolf-jms-plugin/src/main/java/io/github/springwolf/plugins/jms/producer/SpringwolfJmsProducer.java +++ b/springwolf-plugins/springwolf-jms-plugin/src/main/java/io/github/springwolf/plugins/jms/producer/SpringwolfJmsProducer.java @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 package io.github.springwolf.plugins.jms.producer; +import io.github.springwolf.core.controller.dtos.MessageDto; import jakarta.jms.JMSException; import lombok.extern.slf4j.Slf4j; import org.springframework.jms.core.JmsTemplate; @@ -22,13 +23,13 @@ public boolean isEnabled() { return template.isPresent(); } - public void send(String channelName, Map headers, Object payload) { + public void send(String channelName, Map headers, Object payload) { if (template.isPresent()) { template.get().convertAndSend(channelName, payload, message -> { if (headers != null) { headers.forEach((name, value) -> { try { - message.setStringProperty(name, value); + message.setStringProperty(name, value.stringValue()); } catch (JMSException ex) { log.warn("Unable to set JMS Header key={} value={}", name, value, ex); } diff --git a/springwolf-plugins/springwolf-jms-plugin/src/test/java/io/github/springwolf/plugins/jms/controller/SpringwolfJmsControllerIntegrationTest.java b/springwolf-plugins/springwolf-jms-plugin/src/test/java/io/github/springwolf/plugins/jms/controller/SpringwolfJmsControllerIntegrationTest.java index 50d299356..3b0b4e846 100644 --- a/springwolf-plugins/springwolf-jms-plugin/src/test/java/io/github/springwolf/plugins/jms/controller/SpringwolfJmsControllerIntegrationTest.java +++ b/springwolf-plugins/springwolf-jms-plugin/src/test/java/io/github/springwolf/plugins/jms/controller/SpringwolfJmsControllerIntegrationTest.java @@ -15,6 +15,7 @@ import io.github.springwolf.core.asyncapi.schemas.SwaggerSchemaUtil; import io.github.springwolf.core.configuration.properties.SpringwolfConfigProperties; import io.github.springwolf.core.controller.PublishingPayloadCreator; +import io.github.springwolf.core.controller.dtos.MessageDto; import io.github.springwolf.plugins.jms.producer.SpringwolfJmsProducer; import lombok.AllArgsConstructor; import lombok.Builder; @@ -87,7 +88,7 @@ class SpringwolfJmsControllerIntegrationTest { private ArgumentCaptor payloadCaptor; @Captor - private ArgumentCaptor> headerCaptor; + private ArgumentCaptor> headerCaptor; @BeforeEach void setup() { @@ -213,7 +214,8 @@ void testControllerShouldCallJmsProducerIfPayloadAndHeadersAreSend() throws Exce verify(springwolfJmsProducer).send(eq("test-topic"), headerCaptor.capture(), payloadCaptor.capture()); - assertThat(headerCaptor.getValue()).isEqualTo(singletonMap("some-header-key", "some-header-value")); + assertThat(headerCaptor.getValue()) + .isEqualTo(singletonMap("some-header-key", new MessageDto.HeaderValue("some-header-value"))); assertThat(payloadCaptor.getValue()).isEqualTo(new PayloadDto("some-payload-value")); } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/springwolf/plugins/kafka/producer/SpringwolfKafkaProducer.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/springwolf/plugins/kafka/producer/SpringwolfKafkaProducer.java index 552470f5a..e7b6b174d 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/springwolf/plugins/kafka/producer/SpringwolfKafkaProducer.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/springwolf/plugins/kafka/producer/SpringwolfKafkaProducer.java @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 package io.github.springwolf.plugins.kafka.producer; +import io.github.springwolf.core.controller.dtos.MessageDto; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; @@ -26,7 +27,7 @@ public boolean isEnabled() { return kafkaTemplateProvider.isPresent(); } - public void send(String topic, String key, Map headers, Object payload) { + public void send(String topic, String key, Map headers, Object payload) { Optional> kafkaTemplate = kafkaTemplateProvider.get(topic); if (kafkaTemplate.isPresent()) { kafkaTemplate @@ -40,16 +41,16 @@ public void send(String topic, String key, Map headers, Object p } private ProducerRecord buildProducerRecord( - String topic, String key, Map headers, Object payload) { + String topic, String key, Map headers, Object payload) { List
recordHeaders = headers != null ? buildHeaders(headers) : Collections.emptyList(); return new ProducerRecord<>(topic, null, null, key, payload, recordHeaders); } - private List
buildHeaders(Map headers) { + private List
buildHeaders(Map headers) { return headers.entrySet().stream() - .map(header -> - new RecordHeader(header.getKey(), header.getValue().getBytes(UTF_8))) + .map(header -> new RecordHeader( + header.getKey(), header.getValue().stringValue().getBytes(UTF_8))) .collect(Collectors.toList()); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/springwolf/plugins/kafka/controller/SpringwolfKafkaControllerIntegrationTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/springwolf/plugins/kafka/controller/SpringwolfKafkaControllerIntegrationTest.java index d59e5f4f7..b0098ca04 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/springwolf/plugins/kafka/controller/SpringwolfKafkaControllerIntegrationTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/springwolf/plugins/kafka/controller/SpringwolfKafkaControllerIntegrationTest.java @@ -15,6 +15,7 @@ import io.github.springwolf.core.asyncapi.schemas.SwaggerSchemaUtil; import io.github.springwolf.core.configuration.properties.SpringwolfConfigProperties; import io.github.springwolf.core.controller.PublishingPayloadCreator; +import io.github.springwolf.core.controller.dtos.MessageDto; import io.github.springwolf.plugins.kafka.producer.SpringwolfKafkaProducer; import lombok.AllArgsConstructor; import lombok.Builder; @@ -87,7 +88,7 @@ class SpringwolfKafkaControllerIntegrationTest { private ArgumentCaptor payloadCaptor; @Captor - private ArgumentCaptor> headerCaptor; + private ArgumentCaptor> headerCaptor; @BeforeEach void setup() { @@ -214,7 +215,8 @@ void testControllerShouldCallKafkaProducerIfPayloadAndHeadersAreSend() throws Ex verify(springwolfKafkaProducer) .send(eq("test-topic"), isNull(), headerCaptor.capture(), payloadCaptor.capture()); - assertThat(headerCaptor.getValue()).isEqualTo(singletonMap("some-header-key", "some-header-value")); + assertThat(headerCaptor.getValue()) + .isEqualTo(singletonMap("some-header-key", new MessageDto.HeaderValue("some-header-value"))); assertThat(payloadCaptor.getValue()).isEqualTo(new PayloadDto("some-payload-value")); } @@ -243,7 +245,8 @@ void testControllerShouldCallKafkaProducerIfPayloadAndHeadersAndBindingsAreSend( verify(springwolfKafkaProducer) .send(eq("test-topic"), eq("kafka-key-value"), headerCaptor.capture(), payloadCaptor.capture()); - assertThat(headerCaptor.getValue()).isEqualTo(singletonMap("some-header-key", "some-header-value")); + assertThat(headerCaptor.getValue()) + .isEqualTo(singletonMap("some-header-key", new MessageDto.HeaderValue("some-header-value"))); assertThat(payloadCaptor.getValue()).isEqualTo(new PayloadDto("some-payload-value")); } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/springwolf/plugins/kafka/producer/SpringwolfKafkaProducerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/springwolf/plugins/kafka/producer/SpringwolfKafkaProducerTest.java index 72aeff4e2..73729723d 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/springwolf/plugins/kafka/producer/SpringwolfKafkaProducerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/springwolf/plugins/kafka/producer/SpringwolfKafkaProducerTest.java @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 package io.github.springwolf.plugins.kafka.producer; +import io.github.springwolf.core.controller.dtos.MessageDto; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.assertj.core.util.Lists; @@ -97,7 +98,8 @@ void testSendingKafkaMessageWithHeaders() { future.complete(mock(SendResult.class)); Map payload = Collections.singletonMap("some", "field"); - Map headers = Collections.singletonMap("header-key", "header"); + Map headers = + Collections.singletonMap("header-key", new MessageDto.HeaderValue("header")); // when springwolfKafkaProducer.send("test-topic", null, headers, payload); @@ -115,6 +117,7 @@ void testSendingKafkaMessageWithHeaders() { assertThat(headersFromRecord).hasSize(1); assertThat(headersFromRecord.get(0).key()).isEqualTo("header-key"); - assertThat(new String(headersFromRecord.get(0).value())).isEqualTo(headers.get("header-key")); + assertThat(new String(headersFromRecord.get(0).value())) + .isEqualTo(headers.get("header-key").stringValue()); } }