Skip to content

Commit

Permalink
changed msgCount mutex to atomic int
Browse files Browse the repository at this point in the history
  • Loading branch information
Pineapple217 authored and jonas-grgt committed Feb 19, 2025
1 parent f032720 commit 6473cde
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions kadmin/record _reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package kadmin

import (
"context"
"github.com/IBM/sarama"
tea "github.com/charmbracelet/bubbletea"
"ktea/serdes"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/IBM/sarama"
tea "github.com/charmbracelet/bubbletea"
)

type FilterType string
Expand Down Expand Up @@ -104,8 +106,7 @@ func (ka *SaramaKafkaAdmin) ReadRecords(ctx context.Context, rd ReadDetails) tea
}

var (
msgCount int
mu sync.Mutex
msgCount atomic.Int64
closeOnce sync.Once
wg sync.WaitGroup
offsets map[int]offsets
Expand Down Expand Up @@ -180,12 +181,9 @@ func (ka *SaramaKafkaAdmin) ReadRecords(ctx context.Context, rd ReadDetails) tea

var shouldClose bool

mu.Lock()
msgCount++
if msgCount >= rd.Limit {
if msgCount.Add(1) >= int64(rd.Limit) {
shouldClose = true
}
mu.Unlock()

select {
case startedMsg.ConsumerRecord <- consumerRecord:
Expand Down

0 comments on commit 6473cde

Please sign in to comment.