-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlayer_uniform_transformer.go
105 lines (93 loc) · 2.94 KB
/
layer_uniform_transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package sinoname
import (
"context"
"errors"
"sync"
"sync/atomic"
"golang.org/x/sync/errgroup"
)
// UniformTransformerLayer syncronises writes to the the downsream layer whilst not blocking
// the upstream layer.
//
// UniformTransformerLayer should be used when having transformers with very different
// transforming speeds. Instead of flooding the pipeline with lower speed messages from
// faster transformers the UniformTransformerLayer waits for the slower and faster transformers
// to write at the same time.
//
// Even though the layer waits for the messages to be written at the same time it doesent
// "sleep". Each transformer has a buffer so that when new messages come and a previous
// message is synced, the layer doesent wait for the previous message to be synced, it takes
// the new message, processes it and then writes it to the buffer. When the previous message
// is synced the layer pulls messages from each transformers buffer and syncs them.
type UniformTransformerLayer struct {
cfg *Config
init int32
transformers []Transformer
transformerFactories []TransformerFactory
}
func (l *UniformTransformerLayer) PumpOut(ctx context.Context, g *errgroup.Group, in <-chan MessagePacket) (<-chan MessagePacket, error) {
if len(l.transformers) == 0 && len(l.transformerFactories) == 0 {
return nil, errors.New("sinoname: layer has no transformers")
}
// local copy of statefull trasnformers.
transformers := l.getStatefullTransformers()
transformers = append(transformers, l.transformers...)
outC := make(chan MessagePacket)
out := newSyncOut(len(l.transformers)+len(l.transformerFactories), outC)
handleValue := func(_ context.Context, wg *sync.WaitGroup, id int, v MessagePacket) error {
defer wg.Done()
out.Write(id, v)
return nil
}
handleSkip := func(ctx context.Context, wg *sync.WaitGroup, id int, v MessagePacket) error {
if id == -1 {
select {
case <-ctx.Done():
return ctx.Err()
case outC <- v:
return nil
}
}
defer wg.Done()
out.Advance(id)
return nil
}
handleExit := func(wg *sync.WaitGroup, forced bool) {
if forced {
out.Close()
return
}
wg.Wait()
out.Close()
}
broadcast := newPacketBroadcatser(
ctx,
l.cfg,
in,
g,
transformers,
handleValue,
handleSkip,
handleExit,
)
broadcast.StartListen()
return outC, nil
}
func (l *UniformTransformerLayer) getStatefullTransformers() []Transformer {
if len(l.transformerFactories) == 0 {
return nil
}
statefullTransformers := make([]Transformer, len(l.transformerFactories))
// get initiall values if the first caller.
if atomic.CompareAndSwapInt32(&l.init, 0, 1) {
diff := len(l.transformers) - len(l.transformerFactories)
copy(statefullTransformers, l.transformers[diff:])
l.transformers = l.transformers[:diff]
return statefullTransformers
}
for i, f := range l.transformerFactories {
t, _ := f(l.cfg)
statefullTransformers[i] = t
}
return statefullTransformers
}