Skip to content

Describe cluster response rewrite #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ Session.vim
# Auto-generated tag files
tags

.rgignore
20 changes: 19 additions & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,25 @@ builds:
ignore:
- goos: windows
goarch: arm64
ldflags: -s -w -X github.com/grepplabs/kafka-proxy/config.Version={{.Version}}
ldflags: -s -w -X github.com/limoges/kafka-proxy/config.Version={{.Version}}
kos:
- repositories:
- ghcr.io/limoges/kafka-proxy
platforms:
- linux/arm64
- linux/amd64
tags:
- latest
- "{{.Tag}}"
- "{{.Version}}"
bare: true
preserve_import_paths: false
labels:
"org.opencontainers.image.source": "https://github.com/limoges/kafka-proxy"
"org.opencontainers.image.description": "Proxy connections to Kafka cluster. Connect through SOCKS Proxy, HTTP Proxy or to cluster running in Kubernetes."
annotations:
"org.opencontainers.image.source": "https://github.com/limoges/kafka-proxy"
"org.opencontainers.image.description": "Proxy connections to Kafka cluster. Connect through SOCKS Proxy, HTTP Proxy or to cluster running in Kubernetes."
archives:
- name_template: "{{ .ProjectName }}-{{ .Tag }}-{{ .Os }}-{{ .Arch }}"
files:
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ The Kafka Proxy is based on idea of [Cloud SQL Proxy](https://github.com/GoogleC
It allows a service to connect to Kafka brokers without having to deal with SASL/PLAIN authentication and SSL certificates.

It works by opening tcp sockets on the local machine and proxying connections to the associated Kafka brokers
when the sockets are used. The host and port in [Metadata](http://kafka.apache.org/protocol.html#The_Messages_Metadata)
and [FindCoordinator](http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator)
when the sockets are used. The host and port in [Metadata](http://kafka.apache.org/protocol.html#The_Messages_Metadata), [FindCoordinator](http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator) & [DescribeCluster](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DescribeClusterResponse.json)
responses received from the brokers are replaced by local counterparts.
For discovered brokers (not configured as the boostrap servers), local listeners are started on random ports.
The dynamic local listeners feature can be disabled and an additional list of external server mappings can be provided.
Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func SetLogger() {
}
logrus.SetFormatter(formatter)
} else {
logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true})
logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: false})
}
level, err := logrus.ParseLevel(c.Log.Level)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
module github.com/grepplabs/kafka-proxy

go 1.23
go 1.23.0

toolchain go1.23.5

require (
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/aws/aws-sdk-go-v2 v1.36.1
Expand All @@ -20,6 +23,7 @@ require (
github.com/jcmturner/gofork v1.7.6
github.com/jcmturner/gokrb5/v8 v8.4.3
github.com/oklog/run v1.1.0
github.com/pires/go-proxyproto v0.8.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1
github.com/samber/slog-logrus/v2 v2.5.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pires/go-proxyproto v0.8.0 h1:5unRmEAPbHXHuLjDg01CxJWf91cw3lKHc/0xzKpXEe0=
github.com/pires/go-proxyproto v0.8.0/go.mod h1:iknsfgnH8EkjrMeMyvfKByp9TiBZCKZM0jx2xmKqnVY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
12 changes: 12 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import (
"github.com/grepplabs/kafka-proxy/cmd/kafka-proxy"
"github.com/grepplabs/kafka-proxy/cmd/tools"
"github.com/spf13/cobra"
"log"
"net/http"
"os"
"runtime"

_ "net/http/pprof"
)

var RootCmd = &cobra.Command{
Expand All @@ -25,6 +30,13 @@ func init() {
}

func main() {
runtime.SetBlockProfileRate(1) // blocking sync primitives
runtime.SetMutexProfileFraction(1) // contended mutexes

go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()

if err := RootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
20 changes: 15 additions & 5 deletions proxy/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxy

import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
Expand Down Expand Up @@ -295,38 +296,47 @@ func (c *Client) handleConn(conn Conn) {
logrus.Infof("Dial address changed from %s to %s", conn.BrokerAddress, dialAddress)
}

server, err := c.DialAndAuth(dialAddress)
server, err := c.DialAndAuth(conn, dialAddress)
if err != nil {
logrus.Infof("couldn't connect to %s(%s): %v", dialAddress, conn.BrokerAddress, err)
_ = conn.LocalConnection.Close()
return
}

if tcpConn, ok := server.(*net.TCPConn); ok {
if err := c.tcpConnOptions.setTCPConnOptions(tcpConn); err != nil {
logrus.Infof("WARNING: Error while setting TCP options for kafka connection %s on %v: %v", conn.BrokerAddress, server.LocalAddr(), err)
}
}
c.conns.Add(conn.BrokerAddress, conn.LocalConnection)
localDesc := "local connection on " + conn.LocalConnection.LocalAddr().String() + " from " + conn.LocalConnection.RemoteAddr().String() + " (" + conn.BrokerAddress + ")"
copyThenClose(c.processorConfig, server, conn.LocalConnection, conn.BrokerAddress, conn.BrokerAddress, localDesc)
copyThenClose(c.processorConfig, server, conn.LocalConnection, conn.BrokerAddress, conn.LocalConnection.RemoteAddr(), conn.LocalConnection.LocalAddr())
if err := c.conns.Remove(conn.BrokerAddress, conn.LocalConnection); err != nil {
logrus.Info(err)
}
}

func (c *Client) DialAndAuth(brokerAddress string) (net.Conn, error) {
conn, err := c.dialer.Dial("tcp", brokerAddress)
func (c *Client) DialAndAuth(downstream Conn, brokerAddress string) (conn net.Conn, err error) {
conn, err = c.dialer.Dial("tcp", brokerAddress)
if err != nil {
return nil, err
}
if err := conn.SetDeadline(time.Time{}); err != nil {
_ = conn.Close()
return nil, err
}

if tlsConn, ok := conn.(*tls.Conn); ok {
err := tlsConn.Handshake()
if err != nil {
return nil, fmt.Errorf("client handshake with upstream broker failed: %w", err)
}
}
err = c.auth(conn, brokerAddress)
if err != nil {
return nil, err
}

logrus.Infof("%s: Client(%s) Successfully connected to upstream: %s", downstream.LocalConnection.LocalAddr(), downstream.LocalConnection.RemoteAddr(), brokerAddress)
return conn, nil
}

Expand Down
14 changes: 8 additions & 6 deletions proxy/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"bytes"
"errors"
"fmt"
"github.com/sirupsen/logrus"
"io"
"net"
"sync"
"time"

"github.com/sirupsen/logrus"
)

type DeadlineReadWriteCloser interface {
Expand Down Expand Up @@ -112,8 +113,9 @@ func copyError(readDesc, writeDesc string, readErr bool, err error) {
logrus.Infof("%v had error: %s", desc, err.Error())
}

func copyThenClose(cfg ProcessorConfig, remote, local DeadlineReadWriteCloser, brokerAddress string, remoteDesc, localDesc string) {
func copyThenClose(cfg ProcessorConfig, remote, local DeadlineReadWriteCloser, brokerAddress string, remoteAddr, localAddr net.Addr) {

localDesc := "local connection on " + localAddr.String() + " from " + remoteAddr.String() + " (" + brokerAddress + ")"
processor := newProcessor(cfg, brokerAddress)

firstErr := make(chan error, 1)
Expand All @@ -123,9 +125,9 @@ func copyThenClose(cfg ProcessorConfig, remote, local DeadlineReadWriteCloser, b
select {
case firstErr <- err:
if readErr && err == io.EOF {
logrus.Infof("Client closed %v", localDesc)
logrus.Infof("%s: Client(%s) closed connection for broker (%s)", localAddr, remoteAddr, brokerAddress)
} else {
copyError(localDesc, remoteDesc, readErr, err)
copyError(localDesc, remoteAddr.String(), readErr, err)
}
remote.Close()
local.Close()
Expand All @@ -137,9 +139,9 @@ func copyThenClose(cfg ProcessorConfig, remote, local DeadlineReadWriteCloser, b
select {
case firstErr <- err:
if readErr && err == io.EOF {
logrus.Infof("Server %v closed connection", remoteDesc)
logrus.Infof("Server %v closed connection", remoteAddr.String())
} else {
copyError(remoteDesc, localDesc, readErr, err)
copyError(remoteAddr.String(), localDesc, readErr, err)
}
remote.Close()
local.Close()
Expand Down
3 changes: 2 additions & 1 deletion proxy/protocol/real_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package protocol

import (
"encoding/binary"
"github.com/google/uuid"
"math"

"github.com/google/uuid"
)

var errInvalidArrayLength = PacketDecodingError{"invalid array length"}
Expand Down
112 changes: 112 additions & 0 deletions proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
const (
apiKeyMetadata = 3
apiKeyFindCoordinator = 10
apiKeyDescribeCluster = 60

brokersKeyName = "brokers"
brokerKeyName = "broker_id"
hostKeyName = "host"
portKeyName = "port"
nodeKeyName = "node_id"
Expand All @@ -23,6 +25,7 @@ const (
var (
metadataResponseSchemaVersions = createMetadataResponseSchemaVersions()
findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions()
describeClusterResponseSchemaVersions = createDescribeClusterResponseSchemaVersions()
)

func createMetadataResponseSchemaVersions() []Schema {
Expand Down Expand Up @@ -300,6 +303,63 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6}
}

func createDescribeClusterResponseSchemaVersions() []Schema {
describeClusterBrokerV0 := NewSchema("describe_cluster_broker_v0",
&Mfield{Name: brokerKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
// by the spec it should not be here. However, in kafka itself they don't check API version
// and put this field starting from v0
// See https://github.com/tinaselenge/kafka/blob/814212f103c980f080593544079c4507f2557b08/core/src/main/scala/kafka/server/KafkaApis.scala#L3607
&Mfield{Name: "is_fenced", Ty: TypeBool},
)

describeClusterBrokerV2 := NewSchema("describe_cluster_broker_v2",
&Mfield{Name: brokerKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeNullableStr},
&Mfield{Name: "is_fenced", Ty: TypeBool},
)

describeClusterV0 := NewSchema("describe_cluster_response_v0",
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&Mfield{Name: "error_code", Ty: TypeInt16},
&Mfield{Name: "error_message", Ty: TypeCompactNullableStr},
&Mfield{Name: "cluster_id", Ty: TypeCompactStr},
&Mfield{Name: "controller_id", Ty: TypeInt32},
&CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0},
&Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

describeClusterV1 := NewSchema("describe_cluster_response_v1",
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&Mfield{Name: "error_code", Ty: TypeInt16},
&Mfield{Name: "error_message", Ty: TypeCompactNullableStr},
&Mfield{Name: "endpoint_type", Ty: TypeInt8},
&Mfield{Name: "cluster_id", Ty: TypeCompactStr},
&Mfield{Name: "controller_id", Ty: TypeInt32},
&CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0},
&Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

describeClusterV2 := NewSchema("describe_cluster_response_v2",
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&Mfield{Name: "error_code", Ty: TypeInt16},
&Mfield{Name: "error_message", Ty: TypeNullableStr},
&Mfield{Name: "endpoint_type", Ty: TypeInt8},
&Mfield{Name: "cluster_id", Ty: TypeStr},
&Mfield{Name: "controller_id", Ty: TypeInt32},
&CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV2},
&Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)
return []Schema{describeClusterV0, describeClusterV1, describeClusterV2}
}

func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
Expand Down Expand Up @@ -416,6 +476,56 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e
return nil
}

func modifyDescribeClusterResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
}
if fn == nil {
return errors.New("net address mapper must not be nil")
}
brokersArray, ok := decodedStruct.Get(brokersKeyName).([]interface{})
if !ok {
return errors.New("brokers list not found")
}
for _, brokerElement := range brokersArray {
broker := brokerElement.(*Struct)
host, ok := broker.Get(hostKeyName).(string)
if !ok {
return errors.New("broker.host not found")
}
port, ok := broker.Get(portKeyName).(int32)
if !ok {
return errors.New("broker.port not found")
}
brokerId, ok := broker.Get(brokerKeyName).(int32)
if !ok {
return errors.New("broker.broker_id not found")
}

if host == "" && port <= 0 {
continue
}

newHost, newPort, err := fn(host, port, brokerId)
if err != nil {
return err
}
if host != newHost {
err := broker.Replace(hostKeyName, newHost)
if err != nil {
return err
}
}
if port != newPort {
err = broker.Replace(portKeyName, newPort)
if err != nil {
return err
}
}
}
return nil
}

type ResponseModifier interface {
Apply(resp []byte) ([]byte, error)
}
Expand Down Expand Up @@ -446,6 +556,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse)
case apiKeyFindCoordinator:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse)
case apiKeyDescribeCluster:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, describeClusterResponseSchemaVersions, modifyDescribeClusterResponse)
default:
return nil, nil
}
Expand Down
Loading