From f43ea65d506fac390536263cb64967ad6fa9d17a Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Tue, 18 Mar 2025 16:25:38 -0700 Subject: [PATCH 1/8] Don't spin wait for available connections --- go/mysql/server.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/go/mysql/server.go b/go/mysql/server.go index db7202d9248..06fb7e5022c 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -312,6 +312,8 @@ func (l *Listener) Addr() net.Addr { // Accept runs an accept loop until the listener is closed. func (l *Listener) Accept() { + sem := make(chan struct{}, l.maxConns) // Semaphore + for { conn, err := l.listener.Accept() if err != nil { @@ -320,24 +322,22 @@ func (l *Listener) Accept() { } acceptTime := time.Now() - connectionID := l.connectionID l.connectionID++ - maxConWarn := false - for l.maxConns > 0 && uint64(connCount.Get()) >= l.maxConns { - if !maxConWarn { - log.Warning("max connections reached. Clients waiting. Increase server max connections") - maxConWarn = true // Logging once for each connection seems adequate. - } - - // TODO: make this behavior configurable (wait v. reject) - time.Sleep(500 * time.Millisecond) + if len(sem) == cap(sem) { + log.Warning("max connections reached. Clients waiting. Increase server max connections") } + // Wait until a connection slot is available + sem <- struct{}{} + connCount.Add(1) connAccept.Add(1) - go l.handle(context.Background(), conn, connectionID, acceptTime) + go func() { + defer func() { <-sem }() // Release slot when done + l.handle(context.Background(), conn, connectionID, acceptTime) + }() } } From ee9acf6891f5a4e0c688798be2106c0e6cd543bd Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 19 Mar 2025 09:56:50 -0700 Subject: [PATCH 2/8] Handle infinite max conns without blocking --- go/mysql/server.go | 43 +++++++++++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/go/mysql/server.go b/go/mysql/server.go index 06fb7e5022c..9c5343b5be2 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -232,8 +232,8 @@ type Listener struct { // Reads are unbuffered if it's <=0. connReadBufferSize int - // shutdown indicates that Shutdown method was called. - shutdown sync2.AtomicBool + // shutdownCh - open channel until it's not. Used to block and handle shutdown without hanging + shutdownCh chan struct{} // RequireSecureTransport configures the server to reject connections from insecure clients RequireSecureTransport bool @@ -302,6 +302,7 @@ func NewListenerWithConfig(cfg ListenerConfig) (*Listener, error) { connReadBufferSize: cfg.ConnReadBufferSize, maxConns: cfg.MaxConns, AllowClearTextWithoutTLS: sync2.NewAtomicBool(cfg.AllowClearTextWithoutTLS), + shutdownCh: make(chan struct{}), }, nil } @@ -312,9 +313,12 @@ func (l *Listener) Addr() net.Addr { // Accept runs an accept loop until the listener is closed. func (l *Listener) Accept() { - sem := make(chan struct{}, l.maxConns) // Semaphore + var sem chan struct{} + if l.maxConns > 0 { + sem = make(chan struct{}, l.maxConns) + } - for { + for !l.isShutdown() { conn, err := l.listener.Accept() if err != nil { // Close() was probably called. @@ -325,17 +329,28 @@ func (l *Listener) Accept() { connectionID := l.connectionID l.connectionID++ - if len(sem) == cap(sem) { + if sem != nil && len(sem) == cap(sem) { log.Warning("max connections reached. Clients waiting. Increase server max connections") } - // Wait until a connection slot is available - sem <- struct{}{} + if sem != nil { + select { + case sem <- struct{}{}: + case <-l.shutdownCh: + // shutdown while waiting for a slot. give up. + conn.Close() + return + } + } connCount.Add(1) connAccept.Add(1) go func() { - defer func() { <-sem }() // Release slot when done + if sem != nil { + defer func() { + <-sem // release slot. + }() + } l.handle(context.Background(), conn, connectionID, acceptTime) }() } @@ -566,13 +581,21 @@ func (l *Listener) Close() { // Shutdown closes listener and fails any Ping requests from existing connections. // This can be used for graceful shutdown, to let clients know that they should reconnect to another server. func (l *Listener) Shutdown() { - if l.shutdown.CompareAndSwap(false, true) { + select { + case <-l.shutdownCh: + default: + close(l.shutdownCh) l.Close() } } func (l *Listener) isShutdown() bool { - return l.shutdown.Get() + select { + case <-l.shutdownCh: + return true + default: + return false + } } // writeHandshakeV10 writes the Initial Handshake Packet, server side. From 537a98b5c72ef5ac122a5d6c791aa19d7283c7f0 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 19 Mar 2025 10:49:17 -0700 Subject: [PATCH 3/8] Allow for servers to shutdown when there are waiting connections --- go/mysql/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/mysql/server.go b/go/mysql/server.go index 9c5343b5be2..d39eeb40543 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -575,7 +575,7 @@ func (l *Listener) handleConnectionWarning(c *Conn, reason string) { // Close stops the listener, which prevents accept of any new connections. Existing connections won't be closed. func (l *Listener) Close() { - l.listener.Close() + l.Shutdown() } // Shutdown closes listener and fails any Ping requests from existing connections. @@ -585,7 +585,7 @@ func (l *Listener) Shutdown() { case <-l.shutdownCh: default: close(l.shutdownCh) - l.Close() + l.listener.Close() } } From e51ba30eda1bb004b655f4d06e8bf466c072de19 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 19 Mar 2025 11:19:57 -0700 Subject: [PATCH 4/8] Allow more connections to queue up --- go/mysql/server.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/go/mysql/server.go b/go/mysql/server.go index d39eeb40543..4a4de75a5a0 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -333,24 +333,24 @@ func (l *Listener) Accept() { log.Warning("max connections reached. Clients waiting. Increase server max connections") } - if sem != nil { - select { - case sem <- struct{}{}: - case <-l.shutdownCh: - // shutdown while waiting for a slot. give up. - conn.Close() - return - } - } - - connCount.Add(1) - connAccept.Add(1) go func() { if sem != nil { + } + if sem != nil { + select { + case sem <- struct{}{}: + case <-l.shutdownCh: + // shutdown while waiting for a slot. give up. + conn.Close() + return + } defer func() { <-sem // release slot. }() } + + connCount.Add(1) + connAccept.Add(1) l.handle(context.Background(), conn, connectionID, acceptTime) }() } From 772562d89f3d925ec25720b93f30ed12e5982c4a Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 19 Mar 2025 14:05:56 -0700 Subject: [PATCH 5/8] Checkpoint - reject connections if we hit 3 waiting. Need tests and configuration --- go/mysql/server.go | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/go/mysql/server.go b/go/mysql/server.go index 4a4de75a5a0..86f9acf5524 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -237,6 +237,9 @@ type Listener struct { // RequireSecureTransport configures the server to reject connections from insecure clients RequireSecureTransport bool + + // MaxWaitingConnections specifies the maximum number of connections that can wait to be serviced. + maxWaitingConnections int } // NewFromListener creates a new mysql listener from an existing net.Listener @@ -318,6 +321,8 @@ func (l *Listener) Accept() { sem = make(chan struct{}, l.maxConns) } + waitingConnections := sync2.NewAtomicInt64(0) + for !l.isShutdown() { conn, err := l.listener.Accept() if err != nil { @@ -325,27 +330,34 @@ func (l *Listener) Accept() { return } + if sem != nil && len(sem) == cap(sem) { + log.Warning("max connections reached. Clients waiting. Increase server max_connections") + } + // Make configurable. NM4. + if waitingConnections.Get() >= 3 { + log.Warning("max waiting connections reached. Client rejected. Increase server max_connections and back_log") + conn.Close() + continue + } + acceptTime := time.Now() connectionID := l.connectionID l.connectionID++ - if sem != nil && len(sem) == cap(sem) { - log.Warning("max connections reached. Clients waiting. Increase server max connections") - } - go func() { if sem != nil { - } - if sem != nil { + waitingConnections.Add(1) select { case sem <- struct{}{}: case <-l.shutdownCh: // shutdown while waiting for a slot. give up. conn.Close() + waitingConnections.Add(-1) // probably not needed, but correct. return } + waitingConnections.Add(-1) defer func() { - <-sem // release slot. + <-sem // release slot when l.handle is done. }() } From 972fca765fb397b6791f6524dd9b17e513e9dc1f Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 19 Mar 2025 15:38:38 -0700 Subject: [PATCH 6/8] Add config for maxWaitConns. --- go/mysql/server.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/go/mysql/server.go b/go/mysql/server.go index 86f9acf5524..4bfe33d7af1 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -198,6 +198,9 @@ type Listener struct { // Max limit for connections maxConns uint64 + // maxWaitConns it the number of waiting connections allowed before new connections start getting rejected. + maxWaitConns uint32 + // The following parameters are read by multiple connection go // routines. They are not protected by a mutex, so they // should be set after NewListener, and not changed while @@ -237,9 +240,6 @@ type Listener struct { // RequireSecureTransport configures the server to reject connections from insecure clients RequireSecureTransport bool - - // MaxWaitingConnections specifies the maximum number of connections that can wait to be serviced. - maxWaitingConnections int } // NewFromListener creates a new mysql listener from an existing net.Listener @@ -277,6 +277,7 @@ type ListenerConfig struct { ConnWriteTimeout time.Duration ConnReadBufferSize int MaxConns uint64 + MaxWaitConns uint32 AllowClearTextWithoutTLS bool } @@ -304,6 +305,7 @@ func NewListenerWithConfig(cfg ListenerConfig) (*Listener, error) { connWriteTimeout: cfg.ConnWriteTimeout, connReadBufferSize: cfg.ConnReadBufferSize, maxConns: cfg.MaxConns, + maxWaitConns: cfg.MaxWaitConns, AllowClearTextWithoutTLS: sync2.NewAtomicBool(cfg.AllowClearTextWithoutTLS), shutdownCh: make(chan struct{}), }, nil From 8c317ed6224f86df04c9c05a3ca0334ef81b9e3b Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Fri, 21 Mar 2025 11:09:43 -0700 Subject: [PATCH 7/8] Add max waiting connection timeout. --- go/mysql/server.go | 74 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 19 deletions(-) diff --git a/go/mysql/server.go b/go/mysql/server.go index 4bfe33d7af1..b9a4aa22c65 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -201,6 +201,9 @@ type Listener struct { // maxWaitConns it the number of waiting connections allowed before new connections start getting rejected. maxWaitConns uint32 + // maxWaitConnsTimeout is the amount of time to block a new connection before giving up and rejecting it. + maxWaitConnsTimeout time.Duration + // The following parameters are read by multiple connection go // routines. They are not protected by a mutex, so they // should be set after NewListener, and not changed while @@ -278,6 +281,7 @@ type ListenerConfig struct { ConnReadBufferSize int MaxConns uint64 MaxWaitConns uint32 + MaxWaitConnsTimeout time.Duration AllowClearTextWithoutTLS bool } @@ -306,6 +310,7 @@ func NewListenerWithConfig(cfg ListenerConfig) (*Listener, error) { connReadBufferSize: cfg.ConnReadBufferSize, maxConns: cfg.MaxConns, maxWaitConns: cfg.MaxWaitConns, + maxWaitConnsTimeout: cfg.MaxWaitConnsTimeout, AllowClearTextWithoutTLS: sync2.NewAtomicBool(cfg.AllowClearTextWithoutTLS), shutdownCh: make(chan struct{}), }, nil @@ -334,12 +339,13 @@ func (l *Listener) Accept() { if sem != nil && len(sem) == cap(sem) { log.Warning("max connections reached. Clients waiting. Increase server max_connections") - } - // Make configurable. NM4. - if waitingConnections.Get() >= 3 { - log.Warning("max waiting connections reached. Client rejected. Increase server max_connections and back_log") - conn.Close() - continue + + if waitingConnections.Get() >= int64(l.maxWaitConns) { + // There is little hope for this connection to be accepted. Give up early before we spawn a goroutine. + log.Warning("max waiting connections reached. Client rejected. Increase server max_connections and back_log") + conn.Close() + continue + } } acceptTime := time.Now() @@ -347,25 +353,55 @@ func (l *Listener) Accept() { l.connectionID++ go func() { + cont := true if sem != nil { - waitingConnections.Add(1) select { case sem <- struct{}{}: - case <-l.shutdownCh: - // shutdown while waiting for a slot. give up. - conn.Close() - waitingConnections.Add(-1) // probably not needed, but correct. - return + default: // Didn't get a slot right away. Wait for one, but indicate that we're waiting using the waitingConnections counter. + // bool returns true if it's ok to continue to handle the request. + cont = func() bool { + for { // optimistic lock loop. + curWait := waitingConnections.Get() + if curWait+1 > int64(l.maxWaitConns) { + // Too many waiting connections. Reject this one. + log.Warning("max waiting connections reached. Client rejected. Increase server max_connections and back_log") + conn.Close() + return false + } + if waitingConnections.CompareAndSwap(curWait, curWait+1) { + break + } + } + // We'll only get here if we successfully incremented the counter once. Decrement it when we're done. + defer waitingConnections.Add(-1) + + select { + case sem <- struct{}{}: + case <-l.shutdownCh: + // shutdown while waiting for a slot. give up. + conn.Close() + return false + case <-time.After(l.maxWaitConnsTimeout): + // Timed out waiting for a slot + conn.Close() + return false + } + return true + }() + } + if cont { + // We only reach this point if we have a slot in the semaphore. + defer func() { + <-sem // release slot when l.handle is done. + }() } - waitingConnections.Add(-1) - defer func() { - <-sem // release slot when l.handle is done. - }() } - connCount.Add(1) - connAccept.Add(1) - l.handle(context.Background(), conn, connectionID, acceptTime) + if cont { + connCount.Add(1) + connAccept.Add(1) + l.handle(context.Background(), conn, connectionID, acceptTime) + } }() } } From 635bdc983c0990dbb897d247911cbf6d80276c23 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Mon, 24 Mar 2025 14:08:45 -0700 Subject: [PATCH 8/8] PR Feedback --- go/mysql/server.go | 108 ++++++++++++++++++++------------------------- 1 file changed, 48 insertions(+), 60 deletions(-) diff --git a/go/mysql/server.go b/go/mysql/server.go index b9a4aa22c65..46b3e76e6f0 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -23,6 +23,7 @@ import ( "io" "net" "strings" + "sync/atomic" "time" "github.com/dolthub/vitess/go/netutil" @@ -328,81 +329,68 @@ func (l *Listener) Accept() { sem = make(chan struct{}, l.maxConns) } - waitingConnections := sync2.NewAtomicInt64(0) + // don't spam the logs if we have a bunch of waiting connections come in at once + warnOnWait := true + var waitingConnections atomic.Int32 - for !l.isShutdown() { + accepted := func(ctx context.Context, conn net.Conn, id uint32, acceptTime time.Time) { + connCount.Add(1) + connAccept.Add(1) + go func() { + if sem != nil { + defer func() { <-sem }() + } + l.handle(ctx, conn, id, acceptTime) + }() + } + + for { conn, err := l.listener.Accept() if err != nil { // Close() was probably called. return } - if sem != nil && len(sem) == cap(sem) { - log.Warning("max connections reached. Clients waiting. Increase server max_connections") + acceptTime := time.Now() + connectionID := l.connectionID + l.connectionID++ - if waitingConnections.Get() >= int64(l.maxWaitConns) { - // There is little hope for this connection to be accepted. Give up early before we spawn a goroutine. + if sem == nil { + accepted(context.Background(), conn, connectionID, acceptTime) + continue + } + + select { + case sem <- struct{}{}: + accepted(context.Background(), conn, connectionID, acceptTime) + warnOnWait = true + default: + if warnOnWait { + log.Warning("max connections reached. Clients waiting. Increase server max_connections") + warnOnWait = false + continue + } + waitNum := waitingConnections.Add(1) + if uint32(waitNum) > l.maxWaitConns { log.Warning("max waiting connections reached. Client rejected. Increase server max_connections and back_log") conn.Close() + waitingConnections.Add(-1) continue } - } - - acceptTime := time.Now() - connectionID := l.connectionID - l.connectionID++ - - go func() { - cont := true - if sem != nil { + go func(conn net.Conn, connectionID uint32, acceptTime time.Time) { select { case sem <- struct{}{}: - default: // Didn't get a slot right away. Wait for one, but indicate that we're waiting using the waitingConnections counter. - // bool returns true if it's ok to continue to handle the request. - cont = func() bool { - for { // optimistic lock loop. - curWait := waitingConnections.Get() - if curWait+1 > int64(l.maxWaitConns) { - // Too many waiting connections. Reject this one. - log.Warning("max waiting connections reached. Client rejected. Increase server max_connections and back_log") - conn.Close() - return false - } - if waitingConnections.CompareAndSwap(curWait, curWait+1) { - break - } - } - // We'll only get here if we successfully incremented the counter once. Decrement it when we're done. - defer waitingConnections.Add(-1) - - select { - case sem <- struct{}{}: - case <-l.shutdownCh: - // shutdown while waiting for a slot. give up. - conn.Close() - return false - case <-time.After(l.maxWaitConnsTimeout): - // Timed out waiting for a slot - conn.Close() - return false - } - return true - }() - } - if cont { - // We only reach this point if we have a slot in the semaphore. - defer func() { - <-sem // release slot when l.handle is done. - }() + waitingConnections.Add(-1) + accepted(context.Background(), conn, connectionID, acceptTime) + case <-l.shutdownCh: + conn.Close() + waitingConnections.Add(-1) + case <-time.After(l.maxWaitConnsTimeout): + conn.Close() + waitingConnections.Add(-1) } - } - - if cont { - connCount.Add(1) - connAccept.Add(1) - l.handle(context.Background(), conn, connectionID, acceptTime) - } - }() + }(conn, connectionID, acceptTime) + } } }