Skip to content

Commit

Permalink
[FIXED] Preserve max delivered messages with Interest retention (#6575)
Browse files Browse the repository at this point in the history
Resolves #6538

If a consumer reached max deliveries for a message, it should preserve
the redelivered state and allow inspecting its content. However, if a
new consumer would be created and consume this message as well, it would
still be removed under Interest retention.

This PR fixes that by using the redelivered state to keep marking
there's interest.

Only downside is that the redelivered state gets cleaned up after a
restart (this PR does not change/fix that). So if the consumer that had
a max delivery message keeps acknowledging messages and its
acknowledgement floor moves up, it would clean up the redelivered state
below this ack floor.

Honestly I feel like keeping messages around if max delivery is reached
makes the code very complex. It would be a lot cleaner if we'd only have
the acknowledgement floor, starting sequence, and pending messages
in-between, not also redelivered state that can be below ack floor. It's
not something we can change now I suppose, but I'd be in favor of having
messages automatically be removed once max delivery is reached and all
consumers have consumed the message. DLQ-style behavior would then be
more explicitly (and reliably) handled by the client, for example by
publishing into another stream and then TERM the message, instead of
relying on advisories that could be missed.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Feb 24, 2025
2 parents a42b2e9 + 6caa858 commit e938ac0
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 21 deletions.
44 changes: 25 additions & 19 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1985,11 +1985,16 @@ func (o *consumer) hasMaxDeliveries(seq uint64) bool {
if o.maxp > 0 && len(o.pending) >= o.maxp {
o.signalNewMessages()
}
// Cleanup our tracking.
delete(o.pending, seq)
if o.rdc != nil {
delete(o.rdc, seq)
// Make sure to remove from pending.
if p, ok := o.pending[seq]; ok && p != nil {
delete(o.pending, seq)
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp)
}
// Ensure redelivered state is set, if not already.
if o.rdc == nil {
o.rdc = make(map[uint64]uint64)
}
o.rdc[seq] = dc
return true
}
return false
Expand Down Expand Up @@ -3264,6 +3269,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
var needAck bool
var asflr, osseq uint64
var pending map[uint64]*Pending
var rdc map[uint64]uint64

o.mu.RLock()
defer o.mu.RUnlock()
Expand All @@ -3288,7 +3294,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
}
if o.isLeader() {
asflr, osseq = o.asflr, o.sseq
pending = o.pending
pending, rdc = o.pending, o.rdc
} else {
if o.store == nil {
return false
Expand All @@ -3299,7 +3305,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
return sseq > o.asflr && !o.isFiltered()
}
// If loading state as here, the osseq is +1.
asflr, osseq, pending = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending
asflr, osseq, pending, rdc = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending, state.Redelivered
}

switch o.cfg.AckPolicy {
Expand All @@ -3315,6 +3321,12 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
}
}

// Finally check if redelivery of this message is tracked.
// If the message is not pending, it should be preserved if it reached max delivery.
if !needAck {
_, needAck = rdc[sseq]
}

return needAck
}

Expand Down Expand Up @@ -3917,7 +3929,10 @@ func (o *consumer) deliveryCount(seq uint64) uint64 {
if o.rdc == nil {
return 1
}
return o.rdc[seq]
if dc := o.rdc[seq]; dc >= 1 {
return dc
}
return 1
}

// Increase the delivery count for this message.
Expand Down Expand Up @@ -4231,10 +4246,7 @@ func (o *consumer) checkAckFloor() {
// Check if this message was pending.
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
rdc := o.deliveryCount(seq)
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
Expand All @@ -4252,10 +4264,7 @@ func (o *consumer) checkAckFloor() {
if p != nil {
dseq = p.Sequence
}
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
rdc := o.deliveryCount(seq)
toTerm = append(toTerm, seq, dseq, rdc)
}
}
Expand Down Expand Up @@ -5861,10 +5870,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {

// Check if this message was pending.
p, wasPending := o.pending[sseq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[sseq]
}
rdc := o.deliveryCount(sseq)

o.mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6624,7 +6624,7 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.NumRedelivered, 0)
require_Equal(t, ci.NumRedelivered, 1)
require_Equal(t, ci.NumPending, 0)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5426,7 +5426,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
require_Equal(t, a.AckFloor.Stream, 10)
}
require_Equal(t, a.NumPending, 40)
require_Equal(t, a.NumRedelivered, 0)
require_Equal(t, a.NumRedelivered, 10)
a.Cluster, b.Cluster = nil, nil
a.Delivered.Last, b.Delivered.Last = nil, nil
if !reflect.DeepEqual(a, b) {
Expand Down
53 changes: 53 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2714,3 +2714,56 @@ func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) {
})
}
}

func TestJetStreamConsumerDeliveryCount(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

for i := 0; i < 2; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}

sub, err := js.PullSubscribe(
"foo",
"CONSUMER",
nats.ManualAck(),
nats.AckExplicit(),
nats.AckWait(time.Second),
nats.MaxDeliver(1),
)
require_NoError(t, err)

acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("CONSUMER")
require_NotNil(t, o)

msgs, err := sub.Fetch(2)
require_NoError(t, err)
require_Len(t, len(msgs), 2)
require_NoError(t, msgs[1].Nak())

require_Equal(t, o.deliveryCount(1), 1)
require_Equal(t, o.deliveryCount(2), 1)

// max deliver 1 so this will fail
_, err = sub.Fetch(1, nats.MaxWait(250*time.Millisecond))
require_Error(t, err)

// This would previously report delivery count 0, because o.rdc!=nil
require_Equal(t, o.deliveryCount(1), 1)
require_Equal(t, o.deliveryCount(2), 1)

}
105 changes: 105 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25542,3 +25542,108 @@ func TestJetStreamSubjectDeleteMarkersAfterPurgeNoMarkers(t *testing.T) {
})
}
}

// https://github.com/nats-io/nats-server/issues/6538
func TestJetStreamInterestMaxDeliveryReached(t *testing.T) {
maxWait := 250 * time.Millisecond
for _, useNak := range []bool{true, false} {
for _, test := range []struct {
title string
action func(s *Server, sub *nats.Subscription)
}{
{
title: "fetch",
action: func(s *Server, sub *nats.Subscription) {
time.Sleep(time.Second)

// max deliver 1 so this will fail
_, err := sub.Fetch(1, nats.MaxWait(maxWait))
require_Error(t, err)
},
},
{
title: "expire pending",
action: func(s *Server, sub *nats.Subscription) {
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("consumer")
require_NotNil(t, o)

o.mu.Lock()
o.forceExpirePending()
o.mu.Unlock()
},
},
} {
title := fmt.Sprintf("nak/%s", test.title)
if !useNak {
title = fmt.Sprintf("no-%s", title)
}
t.Run(title, func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Storage: nats.FileStorage,
Subjects: []string{"test"},
Replicas: 1,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

sub, err := js.PullSubscribe("test", "consumer", nats.AckWait(time.Second), nats.MaxDeliver(1))
require_NoError(t, err)

_, err = nc.Request("test", []byte("hello"), maxWait)
require_NoError(t, err)

nfo, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, nfo.State.Msgs, uint64(1))

msg, err := sub.Fetch(1, nats.MaxWait(maxWait))
require_NoError(t, err)
require_Len(t, 1, len(msg))
if useNak {
require_NoError(t, msg[0].Nak())
}

cnfo, err := js.ConsumerInfo("TEST", "consumer")
require_NoError(t, err)
require_Equal(t, cnfo.NumAckPending, 1)

test.action(s, sub)

// max deliver 1 so this will fail
_, err = sub.Fetch(1, nats.MaxWait(maxWait))
require_Error(t, err)

cnfo, err = js.ConsumerInfo("TEST", "consumer")
require_NoError(t, err)
require_Equal(t, cnfo.NumAckPending, 0)

nfo, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, nfo.State.Msgs, uint64(1))

sub2, err := js.PullSubscribe("test", "consumer2")
require_NoError(t, err)

msg, err = sub2.Fetch(1)
require_NoError(t, err)
require_Len(t, 1, len(msg))
require_NoError(t, msg[0].AckSync())

nfo, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, nfo.State.Msgs, uint64(1))
})
}
}
}

0 comments on commit e938ac0

Please sign in to comment.