From bdd345754c7a9088da7c66a6581506395af3f5c3 Mon Sep 17 00:00:00 2001 From: Shaun Cooley Date: Sat, 20 Nov 2021 15:03:26 -0800 Subject: [PATCH] fix: 64-bit struct member alignment issues for sync/atomic for #158 --- consumer.go | 176 ++++++++++++++++++++++----------------------- consumer_config.go | 12 ++-- memqueue/queue.go | 8 +-- queue.go | 12 ++-- redisq/queue.go | 6 +- 5 files changed, 107 insertions(+), 107 deletions(-) diff --git a/consumer.go b/consumer.go index 8a31c15..141ac5e 100644 --- a/consumer.go +++ b/consumer.go @@ -24,16 +24,16 @@ type Delayer interface { } type ConsumerStats struct { - NumWorker uint32 - NumFetcher uint32 + NumWorker uint64 + NumFetcher uint64 - BufferSize uint32 - Buffered uint32 + BufferSize uint64 + Buffered uint64 - InFlight uint32 - Processed uint32 - Retries uint32 - Fails uint32 + InFlight uint64 + Processed uint64 + Retries uint64 + Fails uint64 Timing time.Duration } @@ -56,23 +56,23 @@ type Consumer struct { limiter *limiter startStopMu sync.Mutex - state int32 // atomic + state int64 // atomic stopCh chan struct{} cfgs *configRoulette - numWorker int32 // atomic - numFetcher int32 // atomic + numWorker int64 // atomic + numFetcher int64 // atomic fetchersWG sync.WaitGroup workersWG sync.WaitGroup - consecutiveNumErr uint32 - queueEmptyVote int32 + consecutiveNumErr uint64 + queueEmptyVote int64 - inFlight uint32 - processed uint32 - fails uint32 - retries uint32 + inFlight uint64 + processed uint64 + fails uint64 + retries uint64 timings sync.Map hooks []ConsumerHook @@ -125,16 +125,16 @@ func (c *Consumer) Len() int { // Stats returns processor stats. func (c *Consumer) Stats() *ConsumerStats { return &ConsumerStats{ - NumWorker: uint32(atomic.LoadInt32(&c.numWorker)), - NumFetcher: uint32(atomic.LoadInt32(&c.numFetcher)), + NumWorker: uint64(atomic.LoadInt64(&c.numWorker)), + NumFetcher: uint64(atomic.LoadInt64(&c.numFetcher)), - BufferSize: uint32(cap(c.buffer)), - Buffered: uint32(len(c.buffer)), + BufferSize: uint64(cap(c.buffer)), + Buffered: uint64(len(c.buffer)), - InFlight: atomic.LoadUint32(&c.inFlight), - Processed: atomic.LoadUint32(&c.processed), - Retries: atomic.LoadUint32(&c.retries), - Fails: atomic.LoadUint32(&c.fails), + InFlight: atomic.LoadUint64(&c.inFlight), + Processed: atomic.LoadUint64(&c.processed), + Retries: atomic.LoadUint64(&c.retries), + Fails: atomic.LoadUint64(&c.fails), Timing: c.timing(), } @@ -176,9 +176,9 @@ func (c *Consumer) start() error { c.startStopMu.Lock() defer c.startStopMu.Unlock() - switch atomic.LoadInt32(&c.state) { + switch atomic.LoadInt64(&c.state) { case stateInit: - atomic.StoreInt32(&c.state, stateStarted) + atomic.StoreInt64(&c.state, stateStarted) c.stopCh = make(chan struct{}) case stateStarted: return fmt.Errorf("taskq: Consumer is already started") @@ -186,8 +186,8 @@ func (c *Consumer) start() error { return fmt.Errorf("taskq: Consumer is stopping") } - atomic.StoreInt32(&c.numFetcher, 0) - atomic.StoreInt32(&c.numWorker, 0) + atomic.StoreInt64(&c.numFetcher, 0) + atomic.StoreInt64(&c.numWorker, 0) return nil } @@ -203,21 +203,21 @@ func (c *Consumer) StopTimeout(timeout time.Duration) error { c.startStopMu.Lock() defer c.startStopMu.Unlock() - switch atomic.LoadInt32(&c.state) { + switch atomic.LoadInt64(&c.state) { case stateInit: return fmt.Errorf("taskq: Consumer is not started") case stateStarted: - atomic.StoreInt32(&c.state, stateStoppingFetchers) + atomic.StoreInt64(&c.state, stateStoppingFetchers) close(c.stopCh) case stateStoppingFetchers, stateStoppingWorkers: return fmt.Errorf("taskq: Consumer is stopping") } // Stop all fetchers. - atomic.StoreInt32(&c.numFetcher, -1) + atomic.StoreInt64(&c.numFetcher, -1) defer func() { - atomic.StoreInt32(&c.numWorker, -1) - atomic.StoreInt32(&c.state, stateInit) + atomic.StoreInt64(&c.numWorker, -1) + atomic.StoreInt64(&c.state, stateInit) }() timer := time.NewTimer(timeout) @@ -236,7 +236,7 @@ func (c *Consumer) StopTimeout(timeout time.Duration) error { firstErr = fmt.Errorf("taskq: %s: fetchers are not stopped after %s", c, timeout) } - if !atomic.CompareAndSwapInt32(&c.state, stateStoppingFetchers, stateStoppingWorkers) { + if !atomic.CompareAndSwapInt64(&c.state, stateStoppingFetchers, stateStoppingWorkers) { panic("not reached") } if firstErr != nil { @@ -259,17 +259,17 @@ func (c *Consumer) StopTimeout(timeout time.Duration) error { func (c *Consumer) paused() time.Duration { if c.opt.PauseErrorsThreshold == 0 || - atomic.LoadUint32(&c.consecutiveNumErr) < uint32(c.opt.PauseErrorsThreshold) { + atomic.LoadUint64(&c.consecutiveNumErr) < uint64(c.opt.PauseErrorsThreshold) { return 0 } return time.Minute } -func (c *Consumer) addWorker(ctx context.Context, id int32) bool { +func (c *Consumer) addWorker(ctx context.Context, id int64) bool { c.startStopMu.Lock() defer c.startStopMu.Unlock() - if atomic.CompareAndSwapInt32(&c.numWorker, id, id+1) { + if atomic.CompareAndSwapInt64(&c.numWorker, id, id+1) { c.workersWG.Add(1) go func() { defer c.workersWG.Done() @@ -280,15 +280,15 @@ func (c *Consumer) addWorker(ctx context.Context, id int32) bool { return false } -func (c *Consumer) removeWorker(id int32) bool { //nolint:unused - return atomic.CompareAndSwapInt32(&c.numWorker, id+1, id) +func (c *Consumer) removeWorker(id int64) bool { //nolint:unused + return atomic.CompareAndSwapInt64(&c.numWorker, id+1, id) } -func (c *Consumer) addFetcher(ctx context.Context, id int32) bool { +func (c *Consumer) addFetcher(ctx context.Context, id int64) bool { c.startStopMu.Lock() defer c.startStopMu.Unlock() - if atomic.CompareAndSwapInt32(&c.numFetcher, id, id+1) { + if atomic.CompareAndSwapInt64(&c.numFetcher, id, id+1) { c.fetchersWG.Add(1) go func() { defer c.fetchersWG.Done() @@ -300,13 +300,13 @@ func (c *Consumer) addFetcher(ctx context.Context, id int32) bool { } func (c *Consumer) ensureFetcher(ctx context.Context) { - if atomic.LoadInt32(&c.numFetcher) == 0 { + if atomic.LoadInt64(&c.numFetcher) == 0 { c.addFetcher(ctx, 0) } } -func (c *Consumer) removeFetcher(num int32) bool { - return atomic.CompareAndSwapInt32(&c.numFetcher, num+1, num) +func (c *Consumer) removeFetcher(num int64) bool { + return atomic.CompareAndSwapInt64(&c.numFetcher, num+1, num) } // ProcessAll starts workers to process messages in the queue and then stops @@ -372,7 +372,7 @@ func (c *Consumer) reserveOne(ctx context.Context) (*Message, error) { return &msgs[0], nil } -func (c *Consumer) fetcher(ctx context.Context, fetcherID int32) { +func (c *Consumer) fetcher(ctx context.Context, fetcherID int64) { timer := time.NewTimer(time.Minute) timer.Stop() @@ -380,7 +380,7 @@ func (c *Consumer) fetcher(ctx context.Context, fetcherID int32) { fetchTimeout -= fetchTimeout / 10 for { - if fetcherID >= atomic.LoadInt32(&c.numFetcher) { + if fetcherID >= atomic.LoadInt64(&c.numFetcher) { return } @@ -394,7 +394,7 @@ func (c *Consumer) fetcher(ctx context.Context, fetcherID int32) { timeout, err := c.fetchMessages(ctx, timer, fetchTimeout) if err != nil { if err == internal.ErrNotSupported { - atomic.StoreInt32(&c.numFetcher, -1) + atomic.StoreInt64(&c.numFetcher, -1) continue } @@ -449,7 +449,7 @@ func (c *Consumer) fetchMessages( return false, nil } -func (c *Consumer) worker(ctx context.Context, workerID int32) { +func (c *Consumer) worker(ctx context.Context, workerID int64) { var lock *redislock.Lock defer func() { if lock != nil { @@ -461,7 +461,7 @@ func (c *Consumer) worker(ctx context.Context, workerID int32) { timer.Stop() for { - if workerID >= atomic.LoadInt32(&c.numWorker) { + if workerID >= atomic.LoadInt64(&c.numWorker) { return } if c.opt.WorkerLimit > 0 { @@ -470,7 +470,7 @@ func (c *Consumer) worker(ctx context.Context, workerID int32) { msg := c.waitMessage(ctx, timer) if msg == nil { - if atomic.LoadInt32(&c.state) >= stateStoppingWorkers { + if atomic.LoadInt64(&c.state) >= stateStoppingWorkers { return } continue @@ -509,7 +509,7 @@ func (c *Consumer) waitMessage(ctx context.Context, timer *time.Timer) *Message // Process is low-level API to process message bypassing the internal queue. func (c *Consumer) Process(msg *Message) error { - atomic.AddUint32(&c.inFlight, 1) + atomic.AddUint64(&c.inFlight, 1) if msg.Delay > 0 { err := c.q.Add(msg) @@ -595,19 +595,19 @@ func (c *Consumer) Put(msg *Message) { if msg.Err == nil { c.resetPause() - atomic.AddUint32(&c.processed, 1) + atomic.AddUint64(&c.processed, 1) c.delete(msg) return } - atomic.AddUint32(&c.consecutiveNumErr, 1) + atomic.AddUint64(&c.consecutiveNumErr, 1) if msg.Delay <= 0 { - atomic.AddUint32(&c.fails, 1) + atomic.AddUint64(&c.fails, 1) c.delete(msg) return } - atomic.AddUint32(&c.retries, 1) + atomic.AddUint64(&c.retries, 1) c.release(msg) } @@ -621,7 +621,7 @@ func (c *Consumer) release(msg *Message) { if err != nil { internal.Logger.Printf("task=%q Release failed: %s", msg.TaskName, err) } - atomic.AddUint32(&c.inFlight, ^uint32(0)) + atomic.AddUint64(&c.inFlight, ^uint64(0)) } func (c *Consumer) delete(msg *Message) { @@ -639,7 +639,7 @@ func (c *Consumer) delete(msg *Message) { if err != nil { internal.Logger.Printf("task=%q Delete failed: %s", msg.TaskName, err) } - atomic.AddUint32(&c.inFlight, ^uint32(0)) + atomic.AddUint64(&c.inFlight, ^uint64(0)) } // Purge discards messages from the internal queue. @@ -700,13 +700,13 @@ func (c *Consumer) afterProcessMessage(msg *Message) error { } func (c *Consumer) resetPause() { - atomic.StoreUint32(&c.consecutiveNumErr, 0) + atomic.StoreUint64(&c.consecutiveNumErr, 0) } func (c *Consumer) lockWorker( ctx context.Context, lock *redislock.Lock, - workerID int32, + workerID int64, ) *redislock.Lock { lockTimeout := c.opt.ReservationTimeout + 10*time.Second @@ -746,12 +746,12 @@ func (c *Consumer) lockWorker( } func (c *Consumer) String() string { - fnum := atomic.LoadInt32(&c.numFetcher) - wnum := atomic.LoadInt32(&c.numWorker) - inFlight := atomic.LoadUint32(&c.inFlight) - processed := atomic.LoadUint32(&c.processed) - retries := atomic.LoadUint32(&c.retries) - fails := atomic.LoadUint32(&c.fails) + fnum := atomic.LoadInt64(&c.numFetcher) + wnum := atomic.LoadInt64(&c.numWorker) + inFlight := atomic.LoadUint64(&c.inFlight) + processed := atomic.LoadUint64(&c.processed) + retries := atomic.LoadUint64(&c.retries) + fails := atomic.LoadUint64(&c.fails) timing := c.timing() return fmt.Sprintf( @@ -771,22 +771,22 @@ func (c *Consumer) voteQueueFull() { c.changeQueueEmptyVote(-1) } -func (c *Consumer) changeQueueEmptyVote(d int32) { +func (c *Consumer) changeQueueEmptyVote(d int64) { const quorum = 7 for i := 0; i < 100; i++ { - n := atomic.LoadInt32(&c.queueEmptyVote) + n := atomic.LoadInt64(&c.queueEmptyVote) if (d < 0 && n <= -quorum) || (d > 0 && n >= quorum) { break } - if atomic.CompareAndSwapInt32(&c.queueEmptyVote, n, n+d) { + if atomic.CompareAndSwapInt64(&c.queueEmptyVote, n, n+d) { break } } } func (c *Consumer) queueEmpty() bool { - return atomic.LoadInt32(&c.queueEmptyVote) >= 3 + return atomic.LoadInt64(&c.queueEmptyVote) >= 3 } func (c *Consumer) autotune(ctx context.Context, cfg *consumerConfig) { @@ -829,8 +829,8 @@ func (c *Consumer) autotuneInterval() time.Duration { } func (c *Consumer) autotuneTick(ctx context.Context, cfg *consumerConfig) *consumerConfig { - processed := int(atomic.LoadUint32(&c.processed)) - retries := int(atomic.LoadUint32(&c.retries)) + processed := int(atomic.LoadUint64(&c.processed)) + retries := int(atomic.LoadUint64(&c.retries)) cfg.Update(processed, retries, c.timing()) if newCfg := c.cfgs.Select(cfg, c.queueEmpty()); newCfg != nil { @@ -842,10 +842,10 @@ func (c *Consumer) autotuneTick(ctx context.Context, cfg *consumerConfig) *consu } func (c *Consumer) replaceConfig(ctx context.Context, cfg *consumerConfig) { - if numFetcher := atomic.LoadInt32(&c.numFetcher); numFetcher != -1 { + if numFetcher := atomic.LoadInt64(&c.numFetcher); numFetcher != -1 { if numFetcher > cfg.NumFetcher { // Remove extra fetchers. - atomic.StoreInt32(&c.numFetcher, cfg.NumFetcher) + atomic.StoreInt64(&c.numFetcher, cfg.NumFetcher) } else { for id := numFetcher; id < cfg.NumFetcher; id++ { if !c.addFetcher(ctx, id) { @@ -855,10 +855,10 @@ func (c *Consumer) replaceConfig(ctx context.Context, cfg *consumerConfig) { } } - numWorker := atomic.LoadInt32(&c.numWorker) + numWorker := atomic.LoadInt64(&c.numWorker) if numWorker > cfg.NumWorker { // Remove extra workers. - atomic.StoreInt32(&c.numWorker, cfg.NumWorker) + atomic.StoreInt64(&c.numWorker, cfg.NumWorker) } else { for id := numWorker; id < cfg.NumWorker; id++ { if !c.addWorker(ctx, id) { @@ -868,8 +868,8 @@ func (c *Consumer) replaceConfig(ctx context.Context, cfg *consumerConfig) { } cfg.Reset( - int(atomic.LoadUint32(&c.processed)), - int(atomic.LoadUint32(&c.retries))) + int(atomic.LoadUint64(&c.processed)), + int(atomic.LoadUint64(&c.retries))) } //------------------------------------------------------------------------------ @@ -879,8 +879,8 @@ type limiter struct { limiter *redis_rate.Limiter limit redis_rate.Limit - allowedCount uint32 // atomic - cancelled uint32 // atomic + allowedCount uint64 // atomic + cancelled uint64 // atomic } func (l *limiter) Reserve(ctx context.Context, max int) int { @@ -889,19 +889,19 @@ func (l *limiter) Reserve(ctx context.Context, max int) int { } for { - cancelled := atomic.LoadUint32(&l.cancelled) + cancelled := atomic.LoadUint64(&l.cancelled) if cancelled == 0 { break } - if cancelled >= uint32(max) { - if atomic.CompareAndSwapUint32(&l.cancelled, cancelled, uint32(max)-1) { + if cancelled >= uint64(max) { + if atomic.CompareAndSwapUint64(&l.cancelled, cancelled, uint64(max)-1) { return max } continue } - if atomic.CompareAndSwapUint32(&l.cancelled, cancelled, uint32(cancelled)-1) { + if atomic.CompareAndSwapUint64(&l.cancelled, cancelled, uint64(cancelled)-1) { return int(cancelled) } } @@ -914,11 +914,11 @@ func (l *limiter) Reserve(ctx context.Context, max int) int { } if res.Allowed > 0 { - atomic.AddUint32(&l.allowedCount, 1) + atomic.AddUint64(&l.allowedCount, 1) return res.Allowed } - atomic.StoreUint32(&l.allowedCount, 0) + atomic.StoreUint64(&l.allowedCount, 0) time.Sleep(res.RetryAfter) } } @@ -927,11 +927,11 @@ func (l *limiter) Cancel(n int) { if l.limiter == nil { return } - atomic.AddUint32(&l.cancelled, uint32(n)) + atomic.AddUint64(&l.cancelled, uint64(n)) } func (l *limiter) Limited() bool { - return l.limiter != nil && atomic.LoadUint32(&l.allowedCount) < 3 + return l.limiter != nil && atomic.LoadUint64(&l.allowedCount) < 3 } //------------------------------------------------------------------------------ diff --git a/consumer_config.go b/consumer_config.go index 4beaf33..4e1881f 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -59,8 +59,8 @@ func (p *perfProfile) Timing() time.Duration { //------------------------------------------------------------------------------ type consumerConfig struct { - NumFetcher int32 - NumWorker int32 + NumFetcher int64 + NumWorker int64 perfProfile @@ -68,7 +68,7 @@ type consumerConfig struct { Score float64 } -func newConsumerConfig(numFetcher, numWorker int32) *consumerConfig { +func newConsumerConfig(numFetcher, numWorker int64) *consumerConfig { return &consumerConfig{ NumFetcher: numFetcher, NumWorker: numWorker, @@ -124,7 +124,7 @@ func newConfigRoulette(opt *QueueOptions) *configRoulette { rnd: rand.New(rand.NewSource(time.Now().UnixNano())), } - cfg := newConsumerConfig(1, int32(opt.MinNumWorker)) + cfg := newConsumerConfig(1, int64(opt.MinNumWorker)) r.resetConfigs(cfg, false) return r @@ -272,7 +272,7 @@ func (r *configRoulette) addConfig(cfg *consumerConfig) { r.cfgs = append(r.cfgs, cfg) } -func (r *configRoulette) withLessWorkers(cfg *consumerConfig, n int32) *consumerConfig { +func (r *configRoulette) withLessWorkers(cfg *consumerConfig, n int64) *consumerConfig { if n <= 0 { n = 1 } else if n > 10 { @@ -286,7 +286,7 @@ func (r *configRoulette) withLessWorkers(cfg *consumerConfig, n int32) *consumer return cfg } -func (r *configRoulette) withMoreWorkers(cfg *consumerConfig, n int32) *consumerConfig { +func (r *configRoulette) withMoreWorkers(cfg *consumerConfig, n int64) *consumerConfig { if n <= 0 { n = 1 } else if n > 10 { diff --git a/memqueue/queue.go b/memqueue/queue.go index 844dafe..82ede74 100644 --- a/memqueue/queue.go +++ b/memqueue/queue.go @@ -82,7 +82,7 @@ type Queue struct { scheduler scheduler - _state int32 + _state int64 } var _ taskq.Queue = (*Queue)(nil) @@ -133,12 +133,12 @@ func (q *Queue) Close() error { // CloseTimeout closes the queue waiting for pending messages to be processed. func (q *Queue) CloseTimeout(timeout time.Duration) error { - if !atomic.CompareAndSwapInt32(&q._state, stateRunning, stateClosing) { + if !atomic.CompareAndSwapInt64(&q._state, stateRunning, stateClosing) { return fmt.Errorf("taskq: %s is already closed", q) } err := q.WaitTimeout(timeout) - if !atomic.CompareAndSwapInt32(&q._state, stateClosing, stateClosed) { + if !atomic.CompareAndSwapInt64(&q._state, stateClosing, stateClosed) { panic("not reached") } @@ -251,7 +251,7 @@ func (q *Queue) Purge() error { } func (q *Queue) closed() bool { - return atomic.LoadInt32(&q._state) == stateClosed + return atomic.LoadInt64(&q._state) == stateClosed } func (q *Queue) isDuplicate(msg *taskq.Message) bool { diff --git a/queue.go b/queue.go index 5d11ce9..44934b4 100644 --- a/queue.go +++ b/queue.go @@ -15,16 +15,16 @@ type QueueOptions struct { // Minimum number of goroutines processing messages. // Default is 1. - MinNumWorker int32 + MinNumWorker int64 // Maximum number of goroutines processing messages. // Default is 32 * number of CPUs. - MaxNumWorker int32 + MaxNumWorker int64 // Global limit of concurrently running workers across all servers. // Overrides MaxNumWorker. - WorkerLimit int32 + WorkerLimit int64 // Maximum number of goroutines fetching messages. // Default is 8 * number of CPUs. - MaxNumFetcher int32 + MaxNumFetcher int64 // Number of messages reserved by a fetcher in the queue in one request. // Default is 10 messages. @@ -84,11 +84,11 @@ func (opt *QueueOptions) Init() { opt.MinNumWorker = 1 } if opt.MaxNumWorker == 0 { - opt.MaxNumWorker = 32 * int32(runtime.NumCPU()) + opt.MaxNumWorker = 32 * int64(runtime.NumCPU()) } } if opt.MaxNumFetcher == 0 { - opt.MaxNumFetcher = 8 * int32(runtime.NumCPU()) + opt.MaxNumFetcher = 8 * int64(runtime.NumCPU()) } switch opt.PauseErrorsThreshold { diff --git a/redisq/queue.go b/redisq/queue.go index 60af781..4add2fe 100644 --- a/redisq/queue.go +++ b/redisq/queue.go @@ -58,7 +58,7 @@ type Queue struct { streamConsumer string schedulerLockPrefix string - _closed uint32 + _closed uint64 } var _ taskq.Queue = (*Queue)(nil) @@ -269,7 +269,7 @@ func (q *Queue) Close() error { // CloseTimeout closes the queue waiting for pending messages to be processed. func (q *Queue) CloseTimeout(timeout time.Duration) error { - if !atomic.CompareAndSwapUint32(&q._closed, 0, 1) { + if !atomic.CompareAndSwapUint64(&q._closed, 0, 1) { return nil } @@ -284,7 +284,7 @@ func (q *Queue) CloseTimeout(timeout time.Duration) error { } func (q *Queue) closed() bool { - return atomic.LoadUint32(&q._closed) == 1 + return atomic.LoadUint64(&q._closed) == 1 } func (q *Queue) scheduler(name string, fn func(ctx context.Context) (int, error)) {