Skip to content

Commit

Permalink
feat: check cluster connectivity when upserting
Browse files Browse the repository at this point in the history
  • Loading branch information
jonas-grgt committed Feb 5, 2025
1 parent 53a9431 commit 83aa7b7
Show file tree
Hide file tree
Showing 10 changed files with 331 additions and 279 deletions.
48 changes: 27 additions & 21 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,13 @@ type ClusterRegisterer interface {
//
// It returns a ClusterRegisteredMsg with the registered cluster.
func (c *Config) RegisterCluster(details RegistrationDetails) tea.Msg {
cluster := Cluster{
Name: details.Name,
Color: details.Color,
BootstrapServers: []string{details.Host},
}
cluster := ToCluster(details)

// When no clusters exist yet, the first one created becomes the active one by default.
if len(c.Clusters) == 0 {
cluster.Active = true
}

if details.AuthMethod == SASLAuthMethod {
cluster.SASLConfig = &SASLConfig{
Username: details.Username,
Password: details.Password,
SecurityProtocol: details.SecurityProtocol,
}
}

if details.SchemaRegistry != nil {
cluster.SchemaRegistry = &SchemaRegistryConfig{
Url: details.SchemaRegistry.Url,
Username: details.SchemaRegistry.Username,
Password: details.SchemaRegistry.Password,
}
}

// did the newly registered cluster update an existing one
var isUpdated bool

Expand All @@ -142,6 +122,31 @@ func (c *Config) RegisterCluster(details RegistrationDetails) tea.Msg {
return ClusterRegisteredMsg{&cluster}
}

func ToCluster(details RegistrationDetails) Cluster {
cluster := Cluster{
Name: details.Name,
Color: details.Color,
BootstrapServers: []string{details.Host},
}

if details.AuthMethod == SASLAuthMethod {
cluster.SASLConfig = &SASLConfig{
Username: details.Username,
Password: details.Password,
SecurityProtocol: details.SecurityProtocol,
}
}

if details.SchemaRegistry != nil {
cluster.SchemaRegistry = &SchemaRegistryConfig{
Url: details.SchemaRegistry.Url,
Username: details.SchemaRegistry.Username,
Password: details.SchemaRegistry.Password,
}
}
return cluster
}

func (c *Config) ActiveCluster() *Cluster {
for _, c := range c.Clusters {
if c.Active {
Expand Down Expand Up @@ -235,5 +240,6 @@ func New(configIO IO) *Config {
}

func ReLoadConfig() tea.Msg {
// TODO
return nil
}
2 changes: 1 addition & 1 deletion kadmin/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ package kadmin
import "time"

func maybeIntroduceLatency() {
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
}
4 changes: 4 additions & 0 deletions kadmin/kadmin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kadmin

import (
tea "github.com/charmbracelet/bubbletea"
"ktea/config"
"time"
)

Expand Down Expand Up @@ -53,6 +55,8 @@ type Kadmin interface {

type Instantiator func(cd ConnectionDetails) (Kadmin, error)

type ConnChecker func(cluster *config.Cluster) tea.Msg

func SaramaInstantiator() Instantiator {
return func(cd ConnectionDetails) (Kadmin, error) {
return NewSaramaKadmin(cd)
Expand Down
105 changes: 92 additions & 13 deletions kadmin/sarama_kadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package kadmin

import (
"github.com/IBM/sarama"
tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/log"
"ktea/config"
"ktea/sradmin"
)

Expand All @@ -14,27 +17,64 @@ type SaramaKafkaAdmin struct {
sra sradmin.SrAdmin
}

type ConnectivityCheckStartedMsg struct {
Connected chan bool
Err chan error
}

func (c *ConnectivityCheckStartedMsg) AwaitCompletion() tea.Msg {
select {
case <-c.Connected:
return ConnectionCheckSucceeded{}
case err := <-c.Err:
return ConnectivityCheckErrMsg{Err: err}
}
}

type ConnectionCheckSucceeded struct{}

type ConnectivityCheckErrMsg struct {
Err error
}

func ToConnectionDetails(cluster *config.Cluster) ConnectionDetails {
var saslConfig *SASLConfig
if cluster.SASLConfig != nil {
saslConfig = &SASLConfig{
Username: cluster.SASLConfig.Username,
Password: cluster.SASLConfig.Password,
Protocol: SSL,
}
}

connDetails := ConnectionDetails{
BootstrapServers: cluster.BootstrapServers,
SASLConfig: saslConfig,
}
return connDetails
}

func NewSaramaKadmin(cd ConnectionDetails) (Kadmin, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
config.Consumer.Offsets.Initial = sarama.OffsetOldest
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest

if cd.SASLConfig != nil {
config.Net.TLS.Enable = true
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = cd.SASLConfig.Username
config.Net.SASL.Password = cd.SASLConfig.Password
cfg.Net.TLS.Enable = true
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
cfg.Net.SASL.User = cd.SASLConfig.Username
cfg.Net.SASL.Password = cd.SASLConfig.Password
}

client, err := sarama.NewClient(cd.BootstrapServers, config)
client, err := sarama.NewClient(cd.BootstrapServers, cfg)
if err != nil {
return nil, err
}

admin, err := sarama.NewClusterAdmin(cd.BootstrapServers, config)
admin, err := sarama.NewClusterAdmin(cd.BootstrapServers, cfg)
if err != nil {
return nil, err
}
Expand All @@ -49,6 +89,45 @@ func NewSaramaKadmin(cd ConnectionDetails) (Kadmin, error) {
admin: admin,
addrs: cd.BootstrapServers,
producer: producer,
config: config,
config: cfg,
}, nil
}

func SaramaConnectivityChecker(cluster *config.Cluster) tea.Msg {
connectedChan := make(chan bool)
errChan := make(chan error)

cd := ToConnectionDetails(cluster)
cfg := sarama.NewConfig()

if cd.SASLConfig != nil {
cfg.Net.TLS.Enable = true
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
cfg.Net.SASL.User = cd.SASLConfig.Username
cfg.Net.SASL.Password = cd.SASLConfig.Password
}

go doCheckConnectivity(cd, cfg, errChan, connectedChan)

return ConnectivityCheckStartedMsg{
Connected: connectedChan,
Err: errChan,
}
}

func doCheckConnectivity(cd ConnectionDetails, config *sarama.Config, errChan chan error, connectedChan chan bool) {
maybeIntroduceLatency()
c, err := sarama.NewClient(cd.BootstrapServers, config)
if err != nil {
errChan <- err
return
}
defer func(c sarama.Client) {
err := c.Close()
if err != nil {
log.Error("Unable to close connectivity check connection", err)
}
}(c)
connectedChan <- true
}
18 changes: 3 additions & 15 deletions ktea.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (m *Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
cmds = append(cmds, cmd)
return m, tea.Batch(cmds...)
} else {
t, c := clusters_tab.New(m.ktx)
t, c := clusters_tab.New(m.ktx, kadmin.SaramaConnectivityChecker)
m.tabCtrl = t
m.tabs.GoToTab(tabs.ClustersTab)
return m, c
Expand Down Expand Up @@ -202,7 +202,7 @@ func (m *Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
case 3:
if m.clustersTabCtrl == nil {
var cmd tea.Cmd
m.clustersTabCtrl, cmd = clusters_tab.New(m.ktx)
m.clustersTabCtrl, cmd = clusters_tab.New(m.ktx, kadmin.SaramaConnectivityChecker)
cmds = append(cmds, cmd)
}
m.tabCtrl = m.clustersTabCtrl
Expand Down Expand Up @@ -235,19 +235,7 @@ func (m *Model) createTabs(cluster *config.Cluster) {
// activateCluster creates the kadmin.Model and kadmin.SrAdmin
// based on the given cluster
func (m *Model) activateCluster(cluster *config.Cluster) error {
var saslConfig *kadmin.SASLConfig
if cluster.SASLConfig != nil {
saslConfig = &kadmin.SASLConfig{
Username: cluster.SASLConfig.Username,
Password: cluster.SASLConfig.Password,
Protocol: kadmin.SSL,
}
}

connDetails := kadmin.ConnectionDetails{
BootstrapServers: cluster.BootstrapServers,
SASLConfig: saslConfig,
}
connDetails := kadmin.ToConnectionDetails(cluster)
if ka, err := m.kaInstantiator(connDetails); err != nil {
return err
} else {
Expand Down
1 change: 1 addition & 0 deletions ui/components/cmdbar/notifier_cmdbar.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (n *NotifierCmdBar) IsFocussed() bool {
}

func (n *NotifierCmdBar) Shortcuts() []statusbar.Shortcut {
// TODO
return nil
}

Expand Down
Loading

0 comments on commit 83aa7b7

Please sign in to comment.