Skip to content

Commit c85f8d4

Browse files
author
Steven.Osborne
committed
Use an atomic pointer to track if batcher is disposed
1 parent dc01c0f commit c85f8d4

File tree

1 file changed

+30
-21
lines changed

1 file changed

+30
-21
lines changed

batcher/batcher.go

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ import (
2323
"time"
2424
)
2525

26+
const (
27+
batcherActive = uint32(0)
28+
batcherDisposed = uint32(1)
29+
)
30+
2631
// Batcher provides an API for accumulating items into a batch for processing.
2732
type Batcher interface {
2833
// Put adds items to the batcher.
@@ -55,10 +60,11 @@ type basicBatcher struct {
5560
maxItems uint
5661
maxBytes uint
5762
calculateBytes CalculateBytes
58-
disposed bool
63+
disposed uint32
5964
items []interface{}
6065
lock sync.RWMutex
6166
batchChan chan []interface{}
67+
disposeChan chan struct{}
6268
availableBytes uint
6369
waiting int32
6470
}
@@ -82,19 +88,19 @@ func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate Cal
8288
calculateBytes: calculate,
8389
items: make([]interface{}, 0, maxItems),
8490
batchChan: make(chan []interface{}, queueLen),
91+
disposeChan: make(chan struct{}),
8592
}, nil
8693
}
8794

8895
// Put adds items to the batcher. If Put is continually called without calls to
8996
// Get, an unbounded number of go-routines will be generated.
9097
// Note: there is no order guarantee for items entering/leaving the batcher.
9198
func (b *basicBatcher) Put(item interface{}) error {
92-
b.lock.Lock()
93-
if b.disposed {
94-
b.lock.Unlock()
99+
// Check to see if disposed before putting
100+
if b.IsDisposed() {
95101
return ErrDisposed
96102
}
97-
103+
b.lock.Lock()
98104
b.items = append(b.items, item)
99105
if b.calculateBytes != nil {
100106
b.availableBytes += b.calculateBytes(item)
@@ -121,18 +127,25 @@ func (b *basicBatcher) Get() ([]interface{}, error) {
121127
timeout = time.After(b.maxTime)
122128
}
123129

130+
// Check to see if disposed before blocking
131+
if b.IsDisposed() {
132+
return nil, ErrDisposed
133+
}
134+
124135
select {
125-
case items, ok := <-b.batchChan:
136+
case items := <-b.batchChan:
137+
return items, nil
138+
case _, ok := <-b.disposeChan:
126139
if !ok {
127140
return nil, ErrDisposed
128141
}
129-
return items, nil
142+
return nil, nil
130143
case <-timeout:
131-
b.lock.Lock()
132-
if b.disposed {
133-
b.lock.Unlock()
144+
// Check to see if disposed before getting lock
145+
if b.IsDisposed() {
134146
return nil, ErrDisposed
135147
}
148+
b.lock.Lock()
136149
items := b.items
137150
b.items = make([]interface{}, 0, b.maxItems)
138151
b.availableBytes = 0
@@ -143,11 +156,10 @@ func (b *basicBatcher) Get() ([]interface{}, error) {
143156

144157
// Flush forcibly completes the batch currently being built
145158
func (b *basicBatcher) Flush() error {
146-
b.lock.Lock()
147-
if b.disposed {
148-
b.lock.Unlock()
159+
if b.IsDisposed() {
149160
return ErrDisposed
150161
}
162+
b.lock.Lock()
151163
b.flush()
152164
b.lock.Unlock()
153165
return nil
@@ -157,14 +169,14 @@ func (b *basicBatcher) Flush() error {
157169
// will return ErrDisposed, calls to Get will return an error iff
158170
// there are no more ready batches.
159171
func (b *basicBatcher) Dispose() {
160-
b.lock.Lock()
161-
if b.disposed {
162-
b.lock.Unlock()
172+
// Check to see if disposed before attempting to dispose
173+
if atomic.CompareAndSwapUint32(&b.disposed, batcherActive, batcherDisposed) {
163174
return
164175
}
176+
b.lock.Lock()
165177
b.flush()
166-
b.disposed = true
167178
b.items = nil
179+
close(b.disposeChan)
168180

169181
// Drain the batch channel and all routines waiting to put on the channel
170182
for len(b.batchChan) > 0 || atomic.LoadInt32(&b.waiting) > 0 {
@@ -176,10 +188,7 @@ func (b *basicBatcher) Dispose() {
176188

177189
// IsDisposed will determine if the batcher is disposed
178190
func (b *basicBatcher) IsDisposed() bool {
179-
b.lock.RLock()
180-
disposed := b.disposed
181-
b.lock.RUnlock()
182-
return disposed
191+
return atomic.LoadUint32(&b.disposed) == batcherDisposed
183192
}
184193

185194
// flush adds the batch currently being built to the queue of completed batches.

0 commit comments

Comments
 (0)