Skip to content

Commit

Permalink
feat: add consumer group deletion support
Browse files Browse the repository at this point in the history
  • Loading branch information
jonas-grgt committed Jan 29, 2025
1 parent 1487371 commit 48ba56c
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 14 deletions.
53 changes: 53 additions & 0 deletions kadmin/cgroup_deleter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package kadmin

import tea "github.com/charmbracelet/bubbletea"

type CGroupDeleter interface {
DeleteCGroup(name string) tea.Msg
}

type CGroupDeletionStartedMsg struct {
Deleted chan bool
Err chan error
}

func (c *CGroupDeletionStartedMsg) AwaitCompletion() tea.Msg {
select {
case <-c.Deleted:
return CGroupDeletedMsg{}
case err := <-c.Err:
return CGroupDeletionErrMsg{Err: err}
}
}

type CGroupDeletionErrMsg struct {
Err error
}

type CGroupDeletedMsg struct {
}

func (ka *SaramaKafkaAdmin) DeleteCGroup(name string) tea.Msg {
errChan := make(chan error)
deletedChan := make(chan bool)

go ka.doDeleteCGroup(name, deletedChan, errChan)

return CGroupDeletionStartedMsg{
Deleted: deletedChan,
Err: errChan,
}
}

func (ka *SaramaKafkaAdmin) doDeleteCGroup(
name string,
deletedChan chan bool,
errChan chan error,
) {
err := ka.admin.DeleteConsumerGroup(name)
if err != nil {
errChan <- err
return
}
deletedChan <- true
}
59 changes: 59 additions & 0 deletions kadmin/cgroup_deleter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package kadmin

import (
"context"
"github.com/IBM/sarama"
"testing"
"time"
)

func TestCGroupDeleter(t *testing.T) {
t.Run("successful deletion", func(t *testing.T) {
// create a topic
topic := topicName()
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Properties: nil,
}).(TopicCreationStartedMsg)

switch msg := msg.AwaitCompletion().(type) {
case TopicCreatedMsg:
case TopicCreationErrMsg:
t.Fatal("Unable to create topic", msg.Err)
}

// publish some data on the topic
for i := 0; i < 10; i++ {
ka.PublishRecord(&ProducerRecord{
Key: "key",
Value: "value",
Topic: topic,
Partition: nil,
})
}

// consume from topic using test-group consumer group
consumerGroup, err := sarama.NewConsumerGroupFromClient("test-group", kafkaClient())
if err != nil {
t.Fatal("Unable to create Consumer Group.", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Adjust timeout as needed
defer cancel()
handler := testConsumer{ExpectedMsgCount: 10}
consumerGroup.Consume(ctx, []string{topic}, &handler)
err = consumerGroup.Close()
if err != nil {
t.Fatal("Unable to close group", err)
}

// delete the group
cgroupDeletionStartedMsg := ka.DeleteCGroup("test-group").(CGroupDeletionStartedMsg)

switch cgroupDeletionStartedMsg.AwaitCompletion().(type) {
case CGroupDeletionErrMsg:
t.Fatal("Failed to delete cgroup", msg.Err)
case CGroupDeletedMsg:
}
})
}
2 changes: 1 addition & 1 deletion ktea.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (m *Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
case 1:
if m.cgroupsTabCtrl == nil {
var cmd tea.Cmd
m.cgroupsTabCtrl, cmd = cgroups_tab.New(m.ka, m.ka)
m.cgroupsTabCtrl, cmd = cgroups_tab.New(m.ka, m.ka, m.ka)
cmds = append(cmds, cmd)
}
m.tabCtrl = m.cgroupsTabCtrl
Expand Down
41 changes: 38 additions & 3 deletions ui/pages/cgroups_page/cgroups_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd {
}
case kadmin.ConsumerGroupsListedMsg:
m.groups = msg.ConsumerGroups
case kadmin.CGroupDeletionStartedMsg:
cmds = append(cmds, msg.AwaitCompletion)
}

var cmd tea.Cmd
Expand Down Expand Up @@ -140,7 +142,7 @@ func (m *Model) Title() string {
return "Consumer Groups"
}

func New(lister kadmin.CGroupLister) (*Model, tea.Cmd) {
func New(lister kadmin.CGroupLister, deleter kadmin.CGroupDeleter) (*Model, tea.Cmd) {
m := &Model{}
m.lister = lister
m.table = table.New(
Expand All @@ -155,9 +157,9 @@ func New(lister kadmin.CGroupLister) (*Model, tea.Cmd) {
return message
}

deleteFunc := func(topic string) tea.Cmd {
deleteFunc := func(group string) tea.Cmd {
return func() tea.Msg {
return nil
return deleter.DeleteCGroup(group)
}
}
notifierCmdBar := cmdbar.NewNotifierCmdBar()
Expand All @@ -183,6 +185,39 @@ func New(lister kadmin.CGroupLister) (*Model, tea.Cmd) {
},
)

cmdbar.WithMsgHandler(
notifierCmdBar,
func(
msg kadmin.CGroupDeletionStartedMsg,
m *notifier.Model,
) (bool, tea.Cmd) {
cmd := m.SpinWithLoadingMsg("Deleting Consumer Group")
return true, cmd
},
)

cmdbar.WithMsgHandler(
notifierCmdBar,
func(
msg kadmin.CGroupDeletedMsg,
m *notifier.Model,
) (bool, tea.Cmd) {
m.Idle()
return false, nil
},
)

cmdbar.WithMsgHandler(
notifierCmdBar,
func(
msg kadmin.CGroupDeletionErrMsg,
m *notifier.Model,
) (bool, tea.Cmd) {
cmd := m.ShowErrorMsg("Failed to delete group", msg.Err)
return true, cmd
},
)

m.cmdBar = cmdbar.NewTableCmdsBar[string](
cmdbar.NewDeleteCmdBar(deleteMsgFunc, deleteFunc, nil),
cmdbar.NewSearchCmdBar("Search Consumer Group"),
Expand Down
21 changes: 12 additions & 9 deletions ui/tabs/cgroups_tab/cgroups_tab.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (
)

type Model struct {
active nav.Page
statusbar *statusbar.Model
offsetLister kadmin.OffsetLister
consumerGroupLister kadmin.CGroupLister
cgroupsPage *cgroups_page.Model
active nav.Page
statusbar *statusbar.Model
offsetLister kadmin.OffsetLister
cgroupLister kadmin.CGroupLister
cgroupDeleter kadmin.CGroupDeleter
cgroupsPage *cgroups_page.Model
}

func (m *Model) View(ktx *kontext.ProgramKtx, renderer *ui.Renderer) string {
Expand All @@ -39,7 +40,7 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd {
case nav.LoadCGroupsPageMsg:
var cmd tea.Cmd
if m.cgroupsPage == nil {
m.cgroupsPage, cmd = cgroups_page.New(m.consumerGroupLister)
m.cgroupsPage, cmd = cgroups_page.New(m.cgroupLister, m.cgroupDeleter)
}
m.active = m.cgroupsPage
return cmd
Expand All @@ -57,14 +58,16 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd {
}

func New(
consumerGroupLister kadmin.CGroupLister,
cgroupLister kadmin.CGroupLister,
cgroupDeleter kadmin.CGroupDeleter,
consumerGroupOffsetLister kadmin.OffsetLister,
) (*Model, tea.Cmd) {
cgroupsPage, cmd := cgroups_page.New(consumerGroupLister)
cgroupsPage, cmd := cgroups_page.New(cgroupLister, cgroupDeleter)

m := &Model{}
m.offsetLister = consumerGroupOffsetLister
m.consumerGroupLister = consumerGroupLister
m.cgroupLister = cgroupLister
m.cgroupDeleter = cgroupDeleter
m.cgroupsPage = cgroupsPage
m.active = cgroupsPage
m.statusbar = statusbar.New(m.active)
Expand Down
8 changes: 7 additions & 1 deletion ui/tabs/cgroups_tab/cgroups_tab_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ func (m *MockConsumerGroupLister) ListCGroups() tea.Msg {
return nil
}

type MockConsumerGroupDeleter struct{}

func (m *MockConsumerGroupDeleter) DeleteCGroup(name string) tea.Msg {
return nil
}

func TestGroupsTab(t *testing.T) {
t.Run("List consumer groups", func(t *testing.T) {
groupsTab, _ := New(&MockConsumerGroupLister{}, &MockConsumerGroupOffsetLister{})
groupsTab, _ := New(&MockConsumerGroupLister{}, &MockConsumerGroupDeleter{}, &MockConsumerGroupOffsetLister{})

groupsTab.Update(kadmin.ConsumerGroupsListedMsg{
ConsumerGroups: []*kadmin.ConsumerGroup{
Expand Down

0 comments on commit 48ba56c

Please sign in to comment.