Skip to content

Commit

Permalink
Always defer stop
Browse files Browse the repository at this point in the history
  • Loading branch information
alok87 committed May 5, 2021
1 parent 91c39a4 commit dd2bb21
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion redshiftsink/pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,8 @@ func (b *loadProcessor) processBatch(
func (b *loadProcessor) Process(session sarama.ConsumerGroupSession, msgBuf []*serializer.Message) error {
start := time.Now()
b.metric.setStartRunning()
defer b.metric.setStopRunning()

b.setBatchId()
ctx := session.Context()

Expand All @@ -796,7 +798,6 @@ func (b *loadProcessor) Process(session sarama.ConsumerGroupSession, msgBuf []*s
return err
}
b.markOffset(session, msgBuf)
b.metric.setStopRunning()

var timeTaken string
secondsTaken := time.Since(start).Seconds()
Expand Down

0 comments on commit dd2bb21

Please sign in to comment.