-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbridge_async.go
75 lines (69 loc) · 1.89 KB
/
bridge_async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package opinionatedevents
import (
"context"
"time"
)
type asyncBridgeDeliveryConfig struct {
maxAttempts int
waitBetween int
}
type asyncBridge struct {
destinations []Destination
deliveryConfig *asyncBridgeDeliveryConfig
}
func newAsyncBridge(maxDeliveryAttempts int, waitBetweenAttempts int, destinations ...Destination) *asyncBridge {
return &asyncBridge{
destinations: destinations,
deliveryConfig: &asyncBridgeDeliveryConfig{
maxAttempts: maxDeliveryAttempts,
waitBetween: waitBetweenAttempts,
},
}
}
func (b *asyncBridge) take(ctx context.Context, batch []*Message) *envelope {
env := newEnvelope(ctx, batch)
go b.deliver(env)
return env
}
func (b *asyncBridge) deliver(envelope *envelope) {
destinations := b.destinations
attemptsLeft := b.deliveryConfig.maxAttempts
// attempt to deliver until no more attempts left
for attemptsLeft > 0 {
attemptsLeft -= 1
// try delivering the message to all (pending) destinations
deliveredTo := []int{}
for i, destination := range destinations {
if err := destination.Deliver(envelope.ctx, envelope.batch); err == nil {
deliveredTo = append(deliveredTo, i)
}
}
// remove the delivered destinations from the pending list
tmp := []Destination{}
for i, destination := range destinations {
successful := false
for _, u := range deliveredTo {
if u == i {
successful = true
break
}
}
if !successful {
tmp = append(tmp, destination)
}
}
destinations = tmp
// if there are no more pending destinations, we are done
if len(destinations) == 0 {
envelope.closeWith(newDeliveryEvent(deliveryEventSuccessName))
break
}
// otherwise, possibly try again or fail with an error
if attemptsLeft > 0 {
waitFor := time.Duration(b.deliveryConfig.waitBetween)
time.Sleep(waitFor * time.Millisecond)
} else {
envelope.closeWith(newDeliveryEvent(deliveryEventFailureName))
}
}
}