Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid spin waits and dead connections in mysql server #406

Merged
merged 8 commits into from
Mar 24, 2025
99 changes: 80 additions & 19 deletions go/mysql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"net"
"strings"
"sync/atomic"
"time"

"github.com/dolthub/vitess/go/netutil"
Expand Down Expand Up @@ -198,6 +199,12 @@ type Listener struct {
// Max limit for connections
maxConns uint64

// maxWaitConns it the number of waiting connections allowed before new connections start getting rejected.
maxWaitConns uint32

// maxWaitConnsTimeout is the amount of time to block a new connection before giving up and rejecting it.
maxWaitConnsTimeout time.Duration

// The following parameters are read by multiple connection go
// routines. They are not protected by a mutex, so they
// should be set after NewListener, and not changed while
Expand Down Expand Up @@ -232,8 +239,8 @@ type Listener struct {
// Reads are unbuffered if it's <=0.
connReadBufferSize int

// shutdown indicates that Shutdown method was called.
shutdown sync2.AtomicBool
// shutdownCh - open channel until it's not. Used to block and handle shutdown without hanging
shutdownCh chan struct{}

// RequireSecureTransport configures the server to reject connections from insecure clients
RequireSecureTransport bool
Expand Down Expand Up @@ -274,6 +281,8 @@ type ListenerConfig struct {
ConnWriteTimeout time.Duration
ConnReadBufferSize int
MaxConns uint64
MaxWaitConns uint32
MaxWaitConnsTimeout time.Duration
AllowClearTextWithoutTLS bool
}

Expand Down Expand Up @@ -301,7 +310,10 @@ func NewListenerWithConfig(cfg ListenerConfig) (*Listener, error) {
connWriteTimeout: cfg.ConnWriteTimeout,
connReadBufferSize: cfg.ConnReadBufferSize,
maxConns: cfg.MaxConns,
maxWaitConns: cfg.MaxWaitConns,
maxWaitConnsTimeout: cfg.MaxWaitConnsTimeout,
AllowClearTextWithoutTLS: sync2.NewAtomicBool(cfg.AllowClearTextWithoutTLS),
shutdownCh: make(chan struct{}),
}, nil
}

Expand All @@ -312,6 +324,26 @@ func (l *Listener) Addr() net.Addr {

// Accept runs an accept loop until the listener is closed.
func (l *Listener) Accept() {
var sem chan struct{}
if l.maxConns > 0 {
sem = make(chan struct{}, l.maxConns)
}

// don't spam the logs if we have a bunch of waiting connections come in at once
warnOnWait := true
var waitingConnections atomic.Int32

accepted := func(ctx context.Context, conn net.Conn, id uint32, acceptTime time.Time) {
connCount.Add(1)
connAccept.Add(1)
go func() {
if sem != nil {
defer func() { <-sem }()
}
l.handle(ctx, conn, id, acceptTime)
}()
}

for {
conn, err := l.listener.Accept()
if err != nil {
Expand All @@ -320,24 +352,45 @@ func (l *Listener) Accept() {
}

acceptTime := time.Now()

connectionID := l.connectionID
l.connectionID++

maxConWarn := false
for l.maxConns > 0 && uint64(connCount.Get()) >= l.maxConns {
if !maxConWarn {
log.Warning("max connections reached. Clients waiting. Increase server max connections")
maxConWarn = true // Logging once for each connection seems adequate.
}

// TODO: make this behavior configurable (wait v. reject)
time.Sleep(500 * time.Millisecond)
if sem == nil {
accepted(context.Background(), conn, connectionID, acceptTime)
continue
}

connCount.Add(1)
connAccept.Add(1)
go l.handle(context.Background(), conn, connectionID, acceptTime)
select {
case sem <- struct{}{}:
accepted(context.Background(), conn, connectionID, acceptTime)
warnOnWait = true
default:
if warnOnWait {
log.Warning("max connections reached. Clients waiting. Increase server max_connections")
warnOnWait = false
continue
}
waitNum := waitingConnections.Add(1)
if uint32(waitNum) > l.maxWaitConns {
log.Warning("max waiting connections reached. Client rejected. Increase server max_connections and back_log")
conn.Close()
waitingConnections.Add(-1)
continue
}
go func(conn net.Conn, connectionID uint32, acceptTime time.Time) {
select {
case sem <- struct{}{}:
waitingConnections.Add(-1)
accepted(context.Background(), conn, connectionID, acceptTime)
case <-l.shutdownCh:
conn.Close()
waitingConnections.Add(-1)
case <-time.After(l.maxWaitConnsTimeout):
conn.Close()
waitingConnections.Add(-1)
}
}(conn, connectionID, acceptTime)
}
}
}

Expand Down Expand Up @@ -560,19 +613,27 @@ func (l *Listener) handleConnectionWarning(c *Conn, reason string) {

// Close stops the listener, which prevents accept of any new connections. Existing connections won't be closed.
func (l *Listener) Close() {
l.listener.Close()
l.Shutdown()
}

// Shutdown closes listener and fails any Ping requests from existing connections.
// This can be used for graceful shutdown, to let clients know that they should reconnect to another server.
func (l *Listener) Shutdown() {
if l.shutdown.CompareAndSwap(false, true) {
l.Close()
select {
case <-l.shutdownCh:
default:
close(l.shutdownCh)
l.listener.Close()
}
}

func (l *Listener) isShutdown() bool {
return l.shutdown.Get()
select {
case <-l.shutdownCh:
return true
default:
return false
}
}

// writeHandshakeV10 writes the Initial Handshake Packet, server side.
Expand Down