@@ -23,6 +23,7 @@ import (
23
23
"io"
24
24
"net"
25
25
"strings"
26
+ "sync/atomic"
26
27
"time"
27
28
28
29
"github.com/dolthub/vitess/go/netutil"
@@ -198,6 +199,12 @@ type Listener struct {
198
199
// Max limit for connections
199
200
maxConns uint64
200
201
202
+ // maxWaitConns it the number of waiting connections allowed before new connections start getting rejected.
203
+ maxWaitConns uint32
204
+
205
+ // maxWaitConnsTimeout is the amount of time to block a new connection before giving up and rejecting it.
206
+ maxWaitConnsTimeout time.Duration
207
+
201
208
// The following parameters are read by multiple connection go
202
209
// routines. They are not protected by a mutex, so they
203
210
// should be set after NewListener, and not changed while
@@ -232,8 +239,8 @@ type Listener struct {
232
239
// Reads are unbuffered if it's <=0.
233
240
connReadBufferSize int
234
241
235
- // shutdown indicates that Shutdown method was called.
236
- shutdown sync2. AtomicBool
242
+ // shutdownCh - open channel until it's not. Used to block and handle shutdown without hanging
243
+ shutdownCh chan struct {}
237
244
238
245
// RequireSecureTransport configures the server to reject connections from insecure clients
239
246
RequireSecureTransport bool
@@ -274,6 +281,8 @@ type ListenerConfig struct {
274
281
ConnWriteTimeout time.Duration
275
282
ConnReadBufferSize int
276
283
MaxConns uint64
284
+ MaxWaitConns uint32
285
+ MaxWaitConnsTimeout time.Duration
277
286
AllowClearTextWithoutTLS bool
278
287
}
279
288
@@ -301,7 +310,10 @@ func NewListenerWithConfig(cfg ListenerConfig) (*Listener, error) {
301
310
connWriteTimeout : cfg .ConnWriteTimeout ,
302
311
connReadBufferSize : cfg .ConnReadBufferSize ,
303
312
maxConns : cfg .MaxConns ,
313
+ maxWaitConns : cfg .MaxWaitConns ,
314
+ maxWaitConnsTimeout : cfg .MaxWaitConnsTimeout ,
304
315
AllowClearTextWithoutTLS : sync2 .NewAtomicBool (cfg .AllowClearTextWithoutTLS ),
316
+ shutdownCh : make (chan struct {}),
305
317
}, nil
306
318
}
307
319
@@ -312,6 +324,26 @@ func (l *Listener) Addr() net.Addr {
312
324
313
325
// Accept runs an accept loop until the listener is closed.
314
326
func (l * Listener ) Accept () {
327
+ var sem chan struct {}
328
+ if l .maxConns > 0 {
329
+ sem = make (chan struct {}, l .maxConns )
330
+ }
331
+
332
+ // don't spam the logs if we have a bunch of waiting connections come in at once
333
+ warnOnWait := true
334
+ var waitingConnections atomic.Int32
335
+
336
+ accepted := func (ctx context.Context , conn net.Conn , id uint32 , acceptTime time.Time ) {
337
+ connCount .Add (1 )
338
+ connAccept .Add (1 )
339
+ go func () {
340
+ if sem != nil {
341
+ defer func () { <- sem }()
342
+ }
343
+ l .handle (ctx , conn , id , acceptTime )
344
+ }()
345
+ }
346
+
315
347
for {
316
348
conn , err := l .listener .Accept ()
317
349
if err != nil {
@@ -320,24 +352,45 @@ func (l *Listener) Accept() {
320
352
}
321
353
322
354
acceptTime := time .Now ()
323
-
324
355
connectionID := l .connectionID
325
356
l .connectionID ++
326
357
327
- maxConWarn := false
328
- for l .maxConns > 0 && uint64 (connCount .Get ()) >= l .maxConns {
329
- if ! maxConWarn {
330
- log .Warning ("max connections reached. Clients waiting. Increase server max connections" )
331
- maxConWarn = true // Logging once for each connection seems adequate.
332
- }
333
-
334
- // TODO: make this behavior configurable (wait v. reject)
335
- time .Sleep (500 * time .Millisecond )
358
+ if sem == nil {
359
+ accepted (context .Background (), conn , connectionID , acceptTime )
360
+ continue
336
361
}
337
362
338
- connCount .Add (1 )
339
- connAccept .Add (1 )
340
- go l .handle (context .Background (), conn , connectionID , acceptTime )
363
+ select {
364
+ case sem <- struct {}{}:
365
+ accepted (context .Background (), conn , connectionID , acceptTime )
366
+ warnOnWait = true
367
+ default :
368
+ if warnOnWait {
369
+ log .Warning ("max connections reached. Clients waiting. Increase server max_connections" )
370
+ warnOnWait = false
371
+ continue
372
+ }
373
+ waitNum := waitingConnections .Add (1 )
374
+ if uint32 (waitNum ) > l .maxWaitConns {
375
+ log .Warning ("max waiting connections reached. Client rejected. Increase server max_connections and back_log" )
376
+ conn .Close ()
377
+ waitingConnections .Add (- 1 )
378
+ continue
379
+ }
380
+ go func (conn net.Conn , connectionID uint32 , acceptTime time.Time ) {
381
+ select {
382
+ case sem <- struct {}{}:
383
+ waitingConnections .Add (- 1 )
384
+ accepted (context .Background (), conn , connectionID , acceptTime )
385
+ case <- l .shutdownCh :
386
+ conn .Close ()
387
+ waitingConnections .Add (- 1 )
388
+ case <- time .After (l .maxWaitConnsTimeout ):
389
+ conn .Close ()
390
+ waitingConnections .Add (- 1 )
391
+ }
392
+ }(conn , connectionID , acceptTime )
393
+ }
341
394
}
342
395
}
343
396
@@ -560,19 +613,27 @@ func (l *Listener) handleConnectionWarning(c *Conn, reason string) {
560
613
561
614
// Close stops the listener, which prevents accept of any new connections. Existing connections won't be closed.
562
615
func (l * Listener ) Close () {
563
- l .listener . Close ()
616
+ l .Shutdown ()
564
617
}
565
618
566
619
// Shutdown closes listener and fails any Ping requests from existing connections.
567
620
// This can be used for graceful shutdown, to let clients know that they should reconnect to another server.
568
621
func (l * Listener ) Shutdown () {
569
- if l .shutdown .CompareAndSwap (false , true ) {
570
- l .Close ()
622
+ select {
623
+ case <- l .shutdownCh :
624
+ default :
625
+ close (l .shutdownCh )
626
+ l .listener .Close ()
571
627
}
572
628
}
573
629
574
630
func (l * Listener ) isShutdown () bool {
575
- return l .shutdown .Get ()
631
+ select {
632
+ case <- l .shutdownCh :
633
+ return true
634
+ default :
635
+ return false
636
+ }
576
637
}
577
638
578
639
// writeHandshakeV10 writes the Initial Handshake Packet, server side.
0 commit comments