Skip to content

Commit

Permalink
Merge pull request #190 from conker84/issue_186
Browse files Browse the repository at this point in the history
fixes #186: Kafka even sink with manual commit fails with multiple topic subscriptions
  • Loading branch information
mneedham authored Jun 5, 2019
2 parents 515fd85 + f4928b1 commit 99edf5c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 9 deletions.
25 changes: 16 additions & 9 deletions consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class KafkaEventSink(private val config: Config,

private fun createJob(): Job {
log.info("Creating Sink daemon Job")
return GlobalScope.launch(Dispatchers.IO) {
return GlobalScope.launch(Dispatchers.IO) { // TODO improve exception management
try {
while (isActive) {
if (Neo4jUtils.isWriteableInstance(db)) {
Expand Down Expand Up @@ -177,7 +177,11 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
if (!records.isEmpty) {
try {
this.topics.forEach { topic ->
action(topic, records.records(topic).map { JSONUtils.readValue<Any>(it.value()) })
val topicRecords = records.records(topic)
if (!topicRecords.iterator().hasNext()) {
return@forEach
}
action(topic, topicRecords.map { JSONUtils.readValue<Any>(it.value()) })
}
} catch (e: Exception) {
// TODO add dead letter queue
Expand Down Expand Up @@ -210,10 +214,10 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
fun toConsumerRecordsMap(topicPartitionsMap: Map<TopicPartition, Long>,
records: ConsumerRecords<String, ByteArray>)
: Map<TopicPartition, List<ConsumerRecord<String, ByteArray>>> = topicPartitionsMap
.mapValues {
records.records(it.key)
}
.filterValues { it.isNotEmpty() }
.mapValues {
records.records(it.key)
}
.filterValues { it.isNotEmpty() }

fun setSeek(topicPartitionsMap: Map<TopicPartition, Long>) {
if (isSeekSet) {
Expand Down Expand Up @@ -250,6 +254,9 @@ class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
if (!records.isEmpty) {
this.topics.forEach { topic ->
val topicRecords = records.records(topic)
if (!topicRecords.iterator().hasNext()) {
return@forEach
}
val lastRecord = topicRecords.last()
val offsetAndMetadata = OffsetAndMetadata(lastRecord.offset(), "")
val topicPartition = TopicPartition(lastRecord.topic(), lastRecord.partition())
Expand Down Expand Up @@ -315,7 +322,7 @@ class StreamsConsumerRebalanceListener(private val topicPartitionOffsetMap: Map<
.map {
val offset = consumer.position(it)
if (log.isDebugEnabled) {
log.debug("for topic ${it.topic()} partition ${it.partition()}, the last saved offset is: $offset")
log.debug("onPartitionsRevoked: for topic ${it.topic()} partition ${it.partition()}, the last saved offset is: $offset")
}
it to OffsetAndMetadata(offset, "")
}
Expand All @@ -325,9 +332,9 @@ class StreamsConsumerRebalanceListener(private val topicPartitionOffsetMap: Map<

override fun onPartitionsAssigned(partitions: Collection<TopicPartition>) {
for (partition in partitions) {
val offset = topicPartitionOffsetMap[partition]?.offset()
val offset = (topicPartitionOffsetMap[partition] ?: consumer.committed(partition))?.offset()
if (log.isDebugEnabled) {
log.debug("for ${partition.topic()} partition ${partition.partition()}, the retrieved offset is: $offset")
log.debug("onPartitionsAssigned: for ${partition.topic()} partition ${partition.partition()}, the retrieved offset is: $offset")
}
if (offset == null) {
when (autoOffsetReset) {
Expand Down
57 changes: 57 additions & 0 deletions consumer/src/test/kotlin/integrations/KafkaEventSinkIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,63 @@ class KafkaEventSinkIT {
}, equalTo(true), 30, TimeUnit.SECONDS)
}

@Test
fun `should fix issue 186 with auto commit false`() {
val product = "product" to "MERGE (p:Product {id: event.id}) ON CREATE SET p.name = event.name"
val customer = "customer" to "MERGE (c:Customer {id: event.id}) ON CREATE SET c.name = event.name"
val bought = "bought" to """
MERGE (c:Customer {id: event.id})
MERGE (p:Product {id: event.id})
MERGE (c)-[:BOUGHT]->(p)
""".trimIndent()
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${product.first}", product.second)
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${customer.first}", customer.second)
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${bought.first}", bought.second)
graphDatabaseBuilder.setConfig("kafka.${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG}", "false")
db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI

val props = mapOf("id" to 1, "name" to "My Awesome Product")
var producerRecord = ProducerRecord(product.first, UUID.randomUUID().toString(),
JSONUtils.writeValueAsBytes(props))
kafkaProducer.send(producerRecord).get()
assertEventually(ThrowingSupplier<Boolean, Exception> {
val query = """
MATCH (p:Product)
WHERE properties(p) = {props}
RETURN count(p) AS count
""".trimIndent()
val result = db.execute(query, mapOf("props" to props)).columnAs<Long>("count")
result.hasNext() && result.next() == 1L && !result.hasNext()
}, equalTo(true), 30, TimeUnit.SECONDS)
}

@Test
fun `should fix issue 186 with auto commit true`() {
val product = "product" to "MERGE (p:Product {id: event.id}) ON CREATE SET p.name = event.name"
val customer = "customer" to "MERGE (c:Customer {id: event.id}) ON CREATE SET c.name = event.name"
val bought = "bought" to """
MERGE (c:Customer {id: event.id})
MERGE (p:Product {id: event.id})
MERGE (c)-[:BOUGHT]->(p)
""".trimIndent()
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${product.first}", product.second)
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${customer.first}", customer.second)
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${bought.first}", bought.second)
db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI

val props = mapOf("id" to 1, "name" to "My Awesome Product")
var producerRecord = ProducerRecord(product.first, UUID.randomUUID().toString(),
JSONUtils.writeValueAsBytes(props))
kafkaProducer.send(producerRecord).get()
assertEventually(ThrowingSupplier<Boolean, Exception> {
val query = """
MATCH (p:Product)
WHERE properties(p) = {props}
RETURN count(p) AS count
""".trimIndent()
val result = db.execute(query, mapOf("props" to props)).columnAs<Long>("count")
result.hasNext() && result.next() == 1L && !result.hasNext()
}, equalTo(true), 30, TimeUnit.SECONDS)
}

}

0 comments on commit 99edf5c

Please sign in to comment.