Skip to content

Commit

Permalink
feat: add replication factor to create topic form
Browse files Browse the repository at this point in the history
  • Loading branch information
jonas-grgt committed Feb 16, 2025
1 parent 9f25e8b commit fd8e7e2
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .run/go build ktea.go.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<configuration default="false" name="go build ktea.go" type="GoApplicationRunConfiguration" factoryName="Go Application" nameIsGenerated="true">
<module name="ktea" />
<working_directory value="$PROJECT_DIR$" />
<go_parameters value="-tags prd" />
<go_parameters value="-tags dev" />
<kind value="FILE" />
<package value="ktea" />
<directory value="$PROJECT_DIR$" />
Expand Down
7 changes: 4 additions & 3 deletions kadmin/cgroup_deleter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ func TestCGroupDeleter(t *testing.T) {
// create a topic
topic := topicName()
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Properties: nil,
Name: topic,
NumPartitions: 1,
Properties: nil,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg := msg.AwaitCompletion().(type) {
Expand Down
7 changes: 4 additions & 3 deletions kadmin/cgroup_lister_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ func TestConsumerGroups(t *testing.T) {
topic := topicName()
// given
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Properties: nil,
Name: topic,
NumPartitions: 1,
Properties: nil,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg := msg.AwaitCompletion().(type) {
Expand Down
7 changes: 4 additions & 3 deletions kadmin/offset_lister_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ func TestConsumerGroupOffsets(t *testing.T) {
topic := topicName()
// given
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Properties: nil,
Name: topic,
NumPartitions: 1,
Properties: nil,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg.AwaitCompletion().(type) {
Expand Down
30 changes: 18 additions & 12 deletions kadmin/record _reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ func TestReadRecords(t *testing.T) {
topic := topicName()
// given
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Name: topic,
NumPartitions: 1,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg.AwaitCompletion().(type) {
Expand Down Expand Up @@ -77,8 +78,9 @@ func TestReadRecords(t *testing.T) {
topic := topicName()
// given
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 4,
Name: topic,
NumPartitions: 4,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg.AwaitCompletion().(type) {
Expand Down Expand Up @@ -143,8 +145,9 @@ func TestReadRecords(t *testing.T) {
topic := topicName()
// given
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Name: topic,
NumPartitions: 1,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg.AwaitCompletion().(type) {
Expand Down Expand Up @@ -204,8 +207,9 @@ func TestReadRecords(t *testing.T) {
topic := topicName()
// given
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Name: topic,
NumPartitions: 1,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg.AwaitCompletion().(type) {
Expand Down Expand Up @@ -268,8 +272,9 @@ func TestReadRecords(t *testing.T) {
topic := topicName()
// given
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Name: topic,
NumPartitions: 1,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg.AwaitCompletion().(type) {
Expand Down Expand Up @@ -337,8 +342,9 @@ func TestReadRecords(t *testing.T) {
topic := topicName()
// given
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 1,
Name: topic,
NumPartitions: 1,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg.AwaitCompletion().(type) {
Expand Down
10 changes: 6 additions & 4 deletions kadmin/topic_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ type TopicCreator interface {
}

type TopicCreationDetails struct {
Name string
NumPartitions int
Properties map[string]string
Name string
NumPartitions int
Properties map[string]string
ReplicationFactor int16
}

type TopicCreatedMsg struct {
Expand Down Expand Up @@ -49,13 +50,14 @@ func (ka *SaramaKafkaAdmin) CreateTopic(tcd TopicCreationDetails) tea.Msg {
}

func (ka *SaramaKafkaAdmin) doCreateTopic(tcd TopicCreationDetails, created chan bool, errChan chan error) {
maybeIntroduceLatency()
properties := make(map[string]*string)
for k, v := range tcd.Properties {
properties[k] = &v
}
err := ka.admin.CreateTopic(tcd.Name, &sarama.TopicDetail{
NumPartitions: int32(tcd.NumPartitions),
ReplicationFactor: 1,
ReplicationFactor: tcd.ReplicationFactor,
ReplicaAssignment: nil,
ConfigEntries: properties,
}, false)
Expand Down
14 changes: 12 additions & 2 deletions kadmin/topic_creator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ func TestCreateTopic(t *testing.T) {
// when
properties := map[string]string{}
properties["compression.type"] = "lz4"
topicCreatedMsg := ka.CreateTopic(TopicCreationDetails{topic, 2, properties}).(TopicCreationStartedMsg)
topicCreatedMsg := ka.CreateTopic(TopicCreationDetails{
topic,
2,
properties,
1,
}).(TopicCreationStartedMsg)

select {
case <-topicCreatedMsg.Created:
Expand Down Expand Up @@ -51,7 +56,12 @@ func TestCreateTopic(t *testing.T) {

t.Run("Creation fails", func(t *testing.T) {
// when
topicCreatedMsg := ka.CreateTopic(TopicCreationDetails{topic, 2, map[string]string{}}).(TopicCreationStartedMsg)
topicCreatedMsg := ka.CreateTopic(TopicCreationDetails{
topic,
2,
map[string]string{},
3,
}).(TopicCreationStartedMsg)

msg = topicCreatedMsg.AwaitCompletion()
switch msg := msg.(type) {
Expand Down
5 changes: 3 additions & 2 deletions kadmin/topic_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func TestPublish(t *testing.T) {
topic := topicName()
// given
msg := ka.CreateTopic(TopicCreationDetails{
Name: topic,
NumPartitions: 3,
Name: topic,
NumPartitions: 3,
ReplicationFactor: 1,
}).(TopicCreationStartedMsg)

switch msg.AwaitCompletion().(type) {
Expand Down
85 changes: 66 additions & 19 deletions ui/pages/create_topic_page/create_topic_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"ktea/kontext"
"ktea/styles"
"ktea/ui"
"ktea/ui/components/cmdbar"
"ktea/ui/components/notifier"
"ktea/ui/components/statusbar"
"ktea/ui/pages/nav"
Expand All @@ -30,7 +31,7 @@ const (
type Model struct {
shortcuts []statusbar.Shortcut
form *huh.Form
notifier *notifier.Model
notifier *cmdbar.NotifierCmdBar
topicCreator kadmin.TopicCreator
formValues topicFormValues
formState formState
Expand All @@ -43,11 +44,12 @@ type config struct {
}

type topicFormValues struct {
name string
numPartitions string
config string
configs []config
cleanupPolicy string
name string
numPartitions string
config string
configs []config
cleanupPolicy string
replicationFactor string
}

func (m *Model) View(ktx *kontext.ProgramKtx, renderer *ui.Renderer) string {
Expand All @@ -67,15 +69,20 @@ func (m *Model) View(ktx *kontext.ProgramKtx, renderer *ui.Renderer) string {
}

func (m *Model) Update(msg tea.Msg) tea.Cmd {
var cmds []tea.Cmd

_, _, cmd := m.notifier.Update(msg)
cmds = append(cmds, cmd)

switch msg := msg.(type) {
case kadmin.TopicCreationStartedMsg:
return msg.AwaitCompletion
cmds = append(cmds, msg.AwaitCompletion)
return tea.Batch(cmds...)
case kadmin.TopicCreationErrMsg:
m.initForm(initial)
return m.notifier.ShowErrorMsg("Topic creation failure", msg.Err)
return tea.Batch(cmds...)
case bsp.TickMsg:
cmd := m.notifier.Update(msg)
return cmd
return tea.Batch(cmds...)
case tea.KeyMsg:
if msg.String() == "esc" && m.formState != loading {
return ui.PublishMsg(nav.LoadTopicsPageMsg{Refresh: m.createdAtLeastOneTopic})
Expand All @@ -84,14 +91,14 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd {
m.formValues.cleanupPolicy = ""
m.formValues.config = ""
m.formValues.numPartitions = ""
m.formValues.replicationFactor = ""
m.formValues.configs = []config{}
m.initForm(initial)
return propagateMsgToForm(m, msg)
} else {
return propagateMsgToForm(m, msg)
}
case kadmin.TopicCreatedMsg:
m.notifier.ShowSuccessMsg("Topic created!")
m.formValues.name = ""
m.formValues.cleanupPolicy = ""
m.formValues.config = ""
Expand All @@ -103,6 +110,7 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd {
default:
return propagateMsgToForm(m, msg)
}

}

func propagateMsgToForm(m *Model, msg tea.Msg) tea.Cmd {
Expand All @@ -117,7 +125,7 @@ func propagateMsgToForm(m *Model, msg tea.Msg) tea.Cmd {
if m.formValues.config == "" {
m.formState = loading
return tea.Batch(
m.notifier.SpinWithRocketMsg("Creating topic"),
//m.notifier.SpinWithRocketMsg("Creating topic"),
func() tea.Msg {
numPartitions, _ := strconv.Atoi(m.formValues.numPartitions)
configs := map[string]string{
Expand All @@ -126,19 +134,21 @@ func propagateMsgToForm(m *Model, msg tea.Msg) tea.Cmd {
for _, c := range m.formValues.configs {
configs[c.key] = c.value
}
replicationFactor, _ := strconv.Atoi(m.formValues.replicationFactor)
return m.topicCreator.CreateTopic(
kadmin.TopicCreationDetails{
Name: m.formValues.name,
NumPartitions: numPartitions,
Properties: configs,
Name: m.formValues.name,
NumPartitions: numPartitions,
Properties: configs,
ReplicationFactor: int16(replicationFactor),
})
})
} else {
m.formState = configEntered
split := strings.Split(m.formValues.config, "=")
m.formValues.configs = append(m.formValues.configs, config{split[0], split[1]})
m.formValues.config = ""
m.initForm(0)
m.initForm(configEntered)
return cmd
}
}
Expand All @@ -164,7 +174,7 @@ func (m *Model) initForm(fs formState) {
return nil
})

validateInput := huh.NewInput().
numPartField := huh.NewInput().
Title("Number of Partitions").
Value(&m.formValues.numPartitions).
Validate(func(str string) error {
Expand All @@ -178,6 +188,22 @@ func (m *Model) initForm(fs formState) {
}
return nil
})

replicationFactorField := huh.NewInput().
Title("Replication Factor").
Validate(func(r string) error {
if r == "" {
return errors.New("replication factory cannot be empty")
}
if n, e := strconv.Atoi(r); e != nil {
return errors.New(fmt.Sprintf("'%s' is not a valid numeric replication factor value", r))
} else if n <= 0 {
return errors.New("value must be greater than zero")
}
return nil
}).
Value(&m.formValues.replicationFactor)

cleanupPolicySelect := huh.NewSelect[string]().
Title("Cleanup Policy").
Value(&m.formValues.cleanupPolicy).
Expand Down Expand Up @@ -235,12 +261,19 @@ func (m *Model) initForm(fs formState) {
}).
Value(&m.formValues.config)

form := huh.NewForm(huh.NewGroup(topicNameInput, validateInput, cleanupPolicySelect, configInput))
form := huh.NewForm(huh.NewGroup(
topicNameInput,
numPartField,
replicationFactorField,
cleanupPolicySelect,
configInput,
))
form.QuitAfterSubmit = false
if m.formState == configEntered {
form.NextField()
form.NextField()
form.NextField()
form.NextField()
}
form.Init()
m.formState = fs
Expand All @@ -258,6 +291,20 @@ func New(tc kadmin.TopicCreator) *Model {
{"Go Back", "esc"},
}
t.initForm(initial)
t.notifier = notifier.New()
notifierCmdBar := cmdbar.NewNotifierCmdBar()
cmdbar.WithMsgHandler(notifierCmdBar, func(msg kadmin.TopicCreationStartedMsg, m *notifier.Model) (bool, tea.Cmd) {
cmd := m.SpinWithLoadingMsg("Creating Topic")
return true, cmd
})
cmdbar.WithMsgHandler(notifierCmdBar, func(msg kadmin.TopicCreationErrMsg, m *notifier.Model) (bool, tea.Cmd) {
m.ShowErrorMsg("Failed to create Topic", msg.Err)
return true, nil
})
cmdbar.WithMsgHandler(notifierCmdBar, func(msg kadmin.TopicCreatedMsg, m *notifier.Model) (bool, tea.Cmd) {
m.ShowSuccessMsg("Topic created!")
return true, m.AutoHideCmd()
})
t.notifier = notifierCmdBar

return &t
}
Loading

0 comments on commit fd8e7e2

Please sign in to comment.