diff --git a/redshiftsink/pkg/redshiftloader/load_processor.go b/redshiftsink/pkg/redshiftloader/load_processor.go index 4bc426587..ce7e79a75 100644 --- a/redshiftsink/pkg/redshiftloader/load_processor.go +++ b/redshiftsink/pkg/redshiftloader/load_processor.go @@ -776,6 +776,8 @@ func (b *loadProcessor) processBatch( func (b *loadProcessor) Process(session sarama.ConsumerGroupSession, msgBuf []*serializer.Message) error { start := time.Now() b.metric.setStartRunning() + defer b.metric.setStopRunning() + b.setBatchId() ctx := session.Context() @@ -796,7 +798,6 @@ func (b *loadProcessor) Process(session sarama.ConsumerGroupSession, msgBuf []*s return err } b.markOffset(session, msgBuf) - b.metric.setStopRunning() var timeTaken string secondsTaken := time.Since(start).Seconds()