Skip to content

Commit

Permalink
feat(kafka): add avro publication support to example
Browse files Browse the repository at this point in the history
  • Loading branch information
timonback committed Jan 5, 2024
1 parent 282735f commit e3cec1d
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 10 deletions.
22 changes: 21 additions & 1 deletion springwolf-examples/springwolf-kafka-example/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
environment:
BOOTSTRAP_SERVER: kafka:29092
BOOTSTRAP_SERVER_SASL: kafka:29093
KAFKA_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081
ports:
- "8080:8080"
depends_on:
Expand All @@ -30,14 +31,30 @@ services:
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29099'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SCHEMA_REGISTRY_URL: 'kafka-schema-registry:8081'
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf

kafka-schema-registry:
image: confluentinc/cp-schema-registry:latest
ports:
- "8081:8081"
profiles:
- test
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
depends_on:
- kafka
links:
- kafka

akhq:
image: tchiotludo/akhq:latest
ports:
- "8081:8080"
- "8085:8080"
profiles:
- test
environment:
Expand All @@ -47,3 +64,6 @@ services:
local:
properties:
bootstrap.servers: kafka:29092
schema-registry:
url: http://kafka-schema-registry:8081
type: confluent
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.stavshamir.springwolf.example.kafka.configuration;

import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFromProperties;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateProvider;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Component
@Primary
@RequiredArgsConstructor
public class CustomSpringwolfKafkaProducer implements SpringwolfKafkaTemplateProvider {
private final SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties;

@Value("${KAFKA_SCHEMA_REGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;

@Override
public boolean isPresent() {
return springwolfKafkaTemplateFromProperties.isPresent();
}

@Override
public Optional<KafkaTemplate<Object, Object>> get(String topic) {
return springwolfKafkaTemplateFromProperties
.get(topic)
.map(objectObjectKafkaTemplate -> customize(objectObjectKafkaTemplate, topic));
}

private KafkaTemplate<Object, Object> customize(KafkaTemplate<Object, Object> kafkaTemplate, String topic) {
if (topic.contains("avro")) {
Map<String, Object> producerProperties =
new HashMap<>(kafkaTemplate.getProducerFactory().getConfigurationProperties());

// configure the producerProperties to use avro serializer/deserializer
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProperties.put("schema.registry.url", kafkaSchemaRegistryUrl);

DefaultKafkaProducerFactory<Object, Object> producerFactory =
new DefaultKafkaProducerFactory<>(producerProperties);
return new KafkaTemplate<>(producerFactory);
}
return kafkaTemplate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ public void receiveAnotherPayloadBatched(List<AnotherPayloadDto> payloads) {
}

@KafkaListener(
topics = "avro-topic-without-publishing-support",
topics = "avro-topic",
properties = {
"specific.avro.reader=true",
"schema.registry.url=http://localhost:8081",
"key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer",
"value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer"
"schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL:http://localhost:8081}",
"key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer",
"value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer",
"spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer",
"spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer"
})
public void receiveExampleAvroPayload(List<ExamplePayloadAvroDto> payloads) {
log.info("Received new message in avro-topic: {}", payloads.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
}
}
},
"avro-topic-without-publishing-support": {
"avro-topic": {
"publish": {
"operationId": "avro-topic-without-publishing-support_publish_receiveExampleAvroPayload",
"operationId": "avro-topic_publish_receiveExampleAvroPayload",
"description": "Auto-generated description",
"bindings": {
"kafka": {
Expand Down Expand Up @@ -770,4 +770,4 @@
}
},
"tags": [ ]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.github.stavshamir.springwolf.configuration.properties.SpringwolfKafkaConfigProperties;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFromProperties;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand All @@ -31,8 +32,8 @@ public SpringwolfKafkaController springwolfKafkaController(
@Bean
@ConditionalOnMissingBean
public SpringwolfKafkaProducer springwolfKafkaProducer(
SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties) {
return new SpringwolfKafkaProducer(springwolfKafkaTemplateFromProperties);
SpringwolfKafkaTemplateProvider springwolfKafkaTemplateProvider) {
return new SpringwolfKafkaProducer(springwolfKafkaTemplateProvider);
}

@Bean
Expand Down

0 comments on commit e3cec1d

Please sign in to comment.