@@ -526,41 +526,51 @@ final class RDKafkaClient: Sendable {
526
526
}
527
527
}
528
528
529
- /// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka.
529
+ /// Non-blocking "fire-and-forget" commit of a `topic`, `partition`, and ` offset` to Kafka.
530
530
/// Schedules a commit and returns immediately.
531
531
/// Any errors encountered after scheduling the commit will be discarded.
532
532
///
533
- /// - Parameter message: Last received message that shall be marked as read.
533
+ /// - Parameter topic: Topic to commit to
534
+ /// - Parameter partition: Partition to commit to
535
+ /// - Parameter offset: Offset to commit
534
536
/// - Throws: A ``KafkaError`` if scheduling the commit failed.
535
- func scheduleCommit( _ message: KafkaConsumerMessage ) throws {
536
- // The offset committed is always the offset of the next requested message.
537
- // Thus, we increase the offset of the current message by one before committing it.
538
- // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
539
- let changesList = RDKafkaTopicPartitionList ( )
540
- changesList. setOffset (
541
- topic: message. topic,
542
- partition: message. partition,
543
- offset: Int64 ( message. offset. rawValue + 1 )
544
- )
545
-
546
- let error = changesList. withListPointer { listPointer in
547
- return rd_kafka_commit (
548
- self . kafkaHandle,
549
- listPointer,
550
- 1 // async = true
537
+ func scheduleCommit(
538
+ topic: String ,
539
+ partition: KafkaPartition ,
540
+ offset: KafkaOffset ) throws {
541
+ // The offset committed is always the offset of the next requested message.
542
+ // Thus, we increase the offset of the current message by one before committing it.
543
+ // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
544
+ let changesList = RDKafkaTopicPartitionList ( )
545
+ changesList. setOffset (
546
+ topic: topic,
547
+ partition: partition,
548
+ offset: Int64 ( offset. rawValue + 1 )
551
549
)
552
- }
553
550
554
- if error != RD_KAFKA_RESP_ERR_NO_ERROR {
555
- throw KafkaError . rdKafkaError ( wrapping: error)
556
- }
551
+ let error = changesList. withListPointer { listPointer in
552
+ return rd_kafka_commit (
553
+ self . kafkaHandle,
554
+ listPointer,
555
+ 1 // async = true
556
+ )
557
+ }
558
+
559
+ if error != RD_KAFKA_RESP_ERR_NO_ERROR {
560
+ throw KafkaError . rdKafkaError ( wrapping: error)
561
+ }
557
562
}
558
563
559
564
/// Non-blocking **awaitable** commit of a `message`'s offset to Kafka.
560
565
///
561
- /// - Parameter message: Last received message that shall be marked as read.
566
+ /// - Parameter topic: Topic to commit to
567
+ /// - Parameter partition: Partition to commit to
568
+ /// - Parameter offset: Offset to commit
562
569
/// - Throws: A ``KafkaError`` if the commit failed.
563
- func commit( _ message: KafkaConsumerMessage ) async throws {
570
+ func commit(
571
+ topic: String ,
572
+ partition: KafkaPartition ,
573
+ offset: KafkaOffset ) async throws {
564
574
// Declare captured closure outside of withCheckedContinuation.
565
575
// We do that because do an unretained pass of the captured closure to
566
576
// librdkafka which means we have to keep a reference to the closure
@@ -577,9 +587,9 @@ final class RDKafkaClient: Sendable {
577
587
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
578
588
let changesList = RDKafkaTopicPartitionList ( )
579
589
changesList. setOffset (
580
- topic: message . topic,
581
- partition: message . partition,
582
- offset: Int64 ( message . offset. rawValue + 1 )
590
+ topic: topic,
591
+ partition: partition,
592
+ offset: Int64 ( offset. rawValue + 1 )
583
593
)
584
594
585
595
// Unretained pass because the reference that librdkafka holds to capturedClosure
@@ -597,7 +607,7 @@ final class RDKafkaClient: Sendable {
597
607
}
598
608
}
599
609
}
600
-
610
+
601
611
/// Flush any outstanding produce requests.
602
612
///
603
613
/// - Parameters:
0 commit comments