diff --git a/go.mod b/go.mod index f588eb1..8f91ef7 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.2 replace github.com/charmbracelet/huh v0.6.0 => github.com/jonas-grgt/huh v0.0.0-20250128201054-5def46fb981f require ( - github.com/IBM/sarama v1.42.1 + github.com/IBM/sarama v1.45.0 github.com/alecthomas/chroma/v2 v2.15.0 github.com/atotto/clipboard v0.1.4 github.com/charmbracelet/bubbles v0.20.0 @@ -18,7 +18,7 @@ require ( github.com/muesli/reflow v0.3.0 github.com/riferrei/srclient v0.7.1 github.com/segmentio/kafka-go v0.4.47 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go/modules/kafka v0.34.0 golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 gopkg.in/yaml.v3 v3.0.1 @@ -44,7 +44,7 @@ require ( github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect - github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect @@ -64,7 +64,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.17.10 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect @@ -84,7 +84,7 @@ require ( github.com/muesli/termenv v0.15.3-0.20240618155329-98d742f6907a // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect - github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect @@ -104,11 +104,11 @@ require ( go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect - golang.org/x/crypto v0.27.0 // indirect + golang.org/x/crypto v0.32.0 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.29.0 // indirect + golang.org/x/net v0.34.0 // indirect golang.org/x/sync v0.10.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/text v0.18.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.6.0 // indirect ) diff --git a/go.sum b/go.sum index 78de76f..b7af35b 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9 github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= -github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= -github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/IBM/sarama v1.45.0 h1:IzeBevTn809IJ/dhNKhP5mpxEXTmELuezO2tgHD9G5E= +github.com/IBM/sarama v1.45.0/go.mod h1:EEay63m8EZkeumco9TDXf2JT3uDnZsZqFgV46n4yZdY= github.com/MakeNowJust/heredoc v1.0.0 h1:cXCdzVdstXyiTqTvfqk9SDHpKNjxuom+DOlyEeQ4pzQ= github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= @@ -69,8 +69,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= -github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= @@ -132,8 +132,8 @@ github.com/jonas-grgt/huh v0.0.0-20250128201054-5def46fb981f/go.mod h1:Ue6iOm4AY github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= -github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -181,8 +181,8 @@ github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3I github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -226,8 +226,8 @@ github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/testcontainers/testcontainers-go v0.34.0 h1:5fbgF0vIN5u+nD3IWabQwRybuB4GY8G2HHgCkbMzMHo= github.com/testcontainers/testcontainers-go v0.34.0/go.mod h1:6P/kMkQe8yqPHfPWNulFGdFHTD8HB2vLq/231xY2iPQ= github.com/testcontainers/testcontainers-go/modules/kafka v0.34.0 h1:LrMlsBH+nKJ2c6M7rOjbi7UivgofgAQo+LAwsWttR+Q= @@ -269,8 +269,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o= golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -290,8 +290,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -318,15 +318,15 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= -golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= -golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= @@ -334,8 +334,8 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/kadmin/config_updater_test.go b/kadmin/config_updater_test.go index cef575f..218fa3a 100644 --- a/kadmin/config_updater_test.go +++ b/kadmin/config_updater_test.go @@ -63,7 +63,7 @@ func TestUpdateTopicConfig(t *testing.T) { // then assert.IsType(t, KAdminErrorMsg{}, msg) - assert.Equal(t, "Invalid value -172800000 for configuration delete.retention.ms: Value must be at least 0", msg.(KAdminErrorMsg).Error.Error()) + assert.Equal(t, "kafka server: Configuration is invalid - Invalid value -172800000 for configuration delete.retention.ms: Value must be at least 0", msg.(KAdminErrorMsg).Error.Error()) // clean up ka.DeleteTopic(topic) diff --git a/kadmin/record _reader.go b/kadmin/record _reader.go index 652c738..8b58937 100644 --- a/kadmin/record _reader.go +++ b/kadmin/record _reader.go @@ -5,10 +5,30 @@ import ( "github.com/IBM/sarama" tea "github.com/charmbracelet/bubbletea" "ktea/serdes" + "strings" "sync" "time" ) +type FilterType string + +func (filterDetails *Filter) Filter(value string) bool { + switch filterDetails.KeyFilter { + case ContainsFilterType: + return strings.Contains(value, filterDetails.KeySearchTerm) + case StartsWithFilterType: + return strings.HasPrefix(value, filterDetails.KeySearchTerm) + default: + return true + } +} + +const ( + ContainsFilterType FilterType = "contains" + StartsWithFilterType FilterType = "starts with" + NoFilterType FilterType = "none" +) + type StartPoint int const ( @@ -23,6 +43,14 @@ type RecordReader interface { type ReadingStartedMsg struct { ConsumerRecord chan ConsumerRecord Err chan error + CancelFunc context.CancelFunc +} + +type Filter struct { + KeyFilter FilterType + KeySearchTerm string + ValueFilter FilterType + ValueSearchTerm string } type ReadDetails struct { @@ -30,6 +58,7 @@ type ReadDetails struct { Partitions []int StartPoint StartPoint Limit int + Filter *Filter } type Header struct { @@ -64,6 +93,7 @@ func (ka *SaramaKafkaAdmin) ReadRecords(ctx context.Context, rd ReadDetails) tea startedMsg := ReadingStartedMsg{ ConsumerRecord: make(chan ConsumerRecord, len(rd.Partitions)), Err: make(chan error), + CancelFunc: cancelFunc, } client, err := sarama.NewConsumerFromClient(ka.client) @@ -132,9 +162,16 @@ func (ka *SaramaKafkaAdmin) ReadRecords(ctx context.Context, rd ReadDetails) tea }) } + key := string(msg.Key) + value := ka.deserialize(err, msg) + + if !ka.matchesFilter(key, value, rd.Filter) { + continue + } + consumerRecord := ConsumerRecord{ - Key: string(msg.Key), - Value: ka.deserialize(err, msg), + Key: key, + Value: value, Partition: int64(msg.Partition), Offset: msg.Offset, Headers: headers, @@ -180,10 +217,27 @@ func (ka *SaramaKafkaAdmin) ReadRecords(ctx context.Context, rd ReadDetails) tea if atLeastOnePartitionReadable { return startedMsg } else { + cancelFunc() return EmptyTopicMsg{} } } +func (ka *SaramaKafkaAdmin) matchesFilter(key, value string, filterDetails *Filter) bool { + if filterDetails == nil { + return true + } + + if filterDetails.KeyFilter != NoFilterType { + return filterDetails.Filter(key) + } + + if filterDetails.ValueSearchTerm != "" && !strings.Contains(value, filterDetails.ValueSearchTerm) { + return false + } + + return true +} + func (ka *SaramaKafkaAdmin) deserialize( err error, msg *sarama.ConsumerMessage, diff --git a/kadmin/record _reader_test.go b/kadmin/record _reader_test.go index ffc419b..4fccb8c 100644 --- a/kadmin/record _reader_test.go +++ b/kadmin/record _reader_test.go @@ -257,6 +257,146 @@ func TestReadRecords(t *testing.T) { assert.Equal(t, 54, slices.Max(receivedRecords)) } + // clean up + ka.DeleteTopic(topic) + }) + }) + + t.Run("Read filtered", func(t *testing.T) { + t.Run("with key filter", func(t *testing.T) { + t.Run("containing", func(t *testing.T) { + topic := topicName() + // given + msg := ka.CreateTopic(TopicCreationDetails{ + Name: topic, + NumPartitions: 1, + }).(TopicCreationStartedMsg) + + switch msg.AwaitCompletion().(type) { + case TopicCreatedMsg: + case TopicCreationErrMsg: + t.Fatal("Unable to create topic", msg.Err) + } + + // when + assert.EventuallyWithT(t, func(c *assert.CollectT) { + for i := 0; i < 55; i++ { + psm := ka.PublishRecord(&ProducerRecord{ + Topic: topic, + Key: strconv.Itoa(i), + Value: "{\"id\":\"3\"}", + }) + + select { + case err := <-psm.Err: + t.Fatal(c, "Unable to publish", err) + case p := <-psm.Published: + assert.True(c, p) + } + } + }, 10*time.Second, 10*time.Millisecond) + + // then + rsm := ka.ReadRecords(context.Background(), ReadDetails{ + Topic: &Topic{topic, 1, 1, 1}, + Partitions: []int{}, + StartPoint: MostRecent, + Limit: 55, + Filter: &Filter{ + KeySearchTerm: "1", + KeyFilter: ContainsFilterType, + }, + }).(ReadingStartedMsg) + + var receivedRecords []int + for { + select { + case r, ok := <-rsm.ConsumerRecord: + if !ok { + goto assertRecords + } + key, _ := strconv.Atoi(r.Key) + receivedRecords = append(receivedRecords, key) + if len(receivedRecords) == 15 { + rsm.CancelFunc() + } + case <-time.After(5 * time.Second): + rsm.CancelFunc() + } + } + + assertRecords: + assert.Equal(t, []int{1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 31, 41, 51}, receivedRecords) + + // clean up + ka.DeleteTopic(topic) + }) + }) + + t.Run("starts with", func(t *testing.T) { + topic := topicName() + // given + msg := ka.CreateTopic(TopicCreationDetails{ + Name: topic, + NumPartitions: 1, + }).(TopicCreationStartedMsg) + + switch msg.AwaitCompletion().(type) { + case TopicCreatedMsg: + case TopicCreationErrMsg: + t.Fatal("Unable to create topic", msg.Err) + } + + // when + assert.EventuallyWithT(t, func(c *assert.CollectT) { + for i := 0; i < 55; i++ { + psm := ka.PublishRecord(&ProducerRecord{ + Topic: topic, + Key: strconv.Itoa(i), + Value: "{\"id\":\"3\"}", + }) + + select { + case err := <-psm.Err: + t.Fatal(c, "Unable to publish", err) + case p := <-psm.Published: + assert.True(c, p) + } + } + }, 10*time.Second, 10*time.Millisecond) + + // then + rsm := ka.ReadRecords(context.Background(), ReadDetails{ + Topic: &Topic{topic, 1, 1, 1}, + Partitions: []int{}, + StartPoint: MostRecent, + Limit: 55, + Filter: &Filter{ + KeySearchTerm: "1", + KeyFilter: StartsWithFilterType, + }, + }).(ReadingStartedMsg) + + var receivedRecords []int + for { + select { + case r, ok := <-rsm.ConsumerRecord: + if !ok { + goto assertRecords + } + key, _ := strconv.Atoi(r.Key) + receivedRecords = append(receivedRecords, key) + if len(receivedRecords) == 11 { + rsm.CancelFunc() + } + case <-time.After(5 * time.Second): + rsm.CancelFunc() + } + } + + assertRecords: + assert.Equal(t, []int{1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}, receivedRecords) + // clean up ka.DeleteTopic(topic) }) diff --git a/ui/pages/consumption_form_page/consumption_form_page.go b/ui/pages/consumption_form_page/consumption_form_page.go index cd8794b..f665028 100644 --- a/ui/pages/consumption_form_page/consumption_form_page.go +++ b/ui/pages/consumption_form_page/consumption_form_page.go @@ -3,6 +3,7 @@ package consumption_form_page import ( tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/huh" + "github.com/charmbracelet/lipgloss" "ktea/kadmin" "ktea/kontext" "ktea/styles" @@ -12,26 +13,43 @@ import ( "strconv" ) +type selectionState int + +const ( + notSelected selectionState = iota + selected +) + type Model struct { - form *huh.Form - topic *kadmin.Topic - formValues *formValues - windowResized bool + form *huh.Form + topic *kadmin.Topic + formValues *formValues + windowResized bool + keyFilterSelectionState selectionState + valueFilterSelectionState selectionState + ktx *kontext.ProgramKtx + availableHeight int } type formValues struct { - startPoint kadmin.StartPoint - limit int - partitions []int + startPoint kadmin.StartPoint + limit int + partitions []int + keyFilter kadmin.FilterType + keyFilterTerm string + valueFilter kadmin.FilterType + valueFilterTerm string } func (m *Model) View(ktx *kontext.ProgramKtx, renderer *ui.Renderer) string { if m.form == nil { - m.form = m.newForm(m.topic.Partitions, ktx) + m.availableHeight = ktx.AvailableHeight + m.form = m.newForm(m.topic.Partitions, m.ktx) } if m.windowResized { m.windowResized = false + m.availableHeight = ktx.AvailableHeight m.form = m.newForm(m.topic.Partitions, ktx) } @@ -43,6 +61,41 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd { return nil } + form, cmd := m.form.Update(msg) + if f, ok := form.(*huh.Form); ok { + m.form = f + } + + if m.formValues.keyFilter != kadmin.NoFilterType && m.keyFilterSelectionState == notSelected { + // if key filter type is selected and previously not selected + m.keyFilterSelectionState = selected + m.form = m.newForm(m.topic.Partitions, m.ktx) + m.NextField(3) + m.form.NextGroup() + } else if m.formValues.keyFilter == kadmin.NoFilterType && m.keyFilterSelectionState == selected { + // if no key filter type is selected and previously selected + m.keyFilterSelectionState = notSelected + m.form = m.newForm(m.topic.Partitions, m.ktx) + m.NextField(3) + m.form.NextGroup() + } + + if m.formValues.valueFilter != kadmin.NoFilterType && m.valueFilterSelectionState == notSelected { + // if value filter type is selected and previously not selected + m.valueFilterSelectionState = selected + m.form = m.newForm(m.topic.Partitions, m.ktx) + m.NextField(3) + m.form.NextGroup() + m.NextField(1) + } else if m.formValues.valueFilter == kadmin.NoFilterType && m.valueFilterSelectionState == selected { + // if no key filter type is selected and previously selected + m.valueFilterSelectionState = notSelected + m.form = m.newForm(m.topic.Partitions, m.ktx) + m.NextField(3) + m.form.NextGroup() + m.NextField(1) + } + switch msg.(type) { case tea.WindowSizeMsg: m.windowResized = true @@ -56,9 +109,14 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd { } } - form, cmd := m.form.Update(msg) - if f, ok := form.(*huh.Form); ok { - m.form = f + filter := kadmin.Filter{} + if m.formValues.keyFilter != kadmin.NoFilterType { + filter.KeySearchTerm = m.formValues.keyFilterTerm + filter.KeyFilter = m.formValues.keyFilter + } + if m.formValues.valueFilter != kadmin.NoFilterType { + filter.ValueSearchTerm = m.formValues.valueFilterTerm + filter.ValueFilter = m.formValues.valueFilter } if m.form.State == huh.StateCompleted { return ui.PublishMsg(nav.LoadConsumptionPageMsg{ @@ -67,6 +125,7 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd { Partitions: m.formValues.partitions, StartPoint: m.formValues.startPoint, Limit: m.formValues.limit, + Filter: &filter, }, }) } @@ -96,43 +155,119 @@ func (m *Model) newForm(partitions int, ktx *kontext.ProgramKtx) *huh.Form { if len(partOptions) < 13 { optionsHeight = len(partOptions) + 2 // 2 for field title + padding } else { - optionsHeight = ktx.AvailableHeight - optionsHeight + optionsHeight = m.availableHeight - optionsHeight } + topicGroup := huh.NewGroup( + huh.NewSelect[kadmin.StartPoint](). + Value(&m.formValues.startPoint). + Title("Start form"). + Options( + huh.NewOption("Beginning", kadmin.Beginning), + huh.NewOption("Most Recent", kadmin.MostRecent)), + huh.NewMultiSelect[int](). + Value(&m.formValues.partitions). + Height(optionsHeight). + Title("Partitions"). + Description(m.getPartitionDescription(ktx)). + Options(partOptions...), + huh.NewSelect[int](). + Value(&m.formValues.limit). + Title("Limit"). + Options( + huh.NewOption("50", 50), + huh.NewOption("500", 500), + huh.NewOption("5000", 5000)), + ) + filterGroup := m.createFilterGroup() form := huh.NewForm( - huh.NewGroup( - huh.NewSelect[kadmin.StartPoint](). - Value(&m.formValues.startPoint). - Title("Start form"). - Options( - huh.NewOption("Beginning", kadmin.Beginning), - huh.NewOption("Most Recent", kadmin.MostRecent)), - huh.NewMultiSelect[int](). - Value(&m.formValues.partitions). - Height(optionsHeight). - Title("Partitions"). - Description("Select none to consume from all available partitions"). - Options(partOptions...), - huh.NewSelect[int](). - Value(&m.formValues.limit). - Title("Limit"). - Options( - huh.NewOption("50", 50), - huh.NewOption("500", 500), - huh.NewOption("5000", 5000)), - ), + topicGroup.WithWidth(ktx.WindowWidth/2), + filterGroup, ) + form.WithLayout(huh.LayoutColumns(2)) form.Init() return form } -func NewWithDetails(details *kadmin.ReadDetails) *Model { - return &Model{topic: details.Topic, formValues: &formValues{ - startPoint: details.StartPoint, - limit: details.Limit, - partitions: details.Partitions, - }} +func (m *Model) createFilterGroup() *huh.Group { + var fields []huh.Field + + fields = append(fields, m.keyFilterTypeField()) + if m.formValues.keyFilter != kadmin.NoFilterType { + fields = append(fields, m.keyFilterTermField()) + } + + fields = append(fields, m.valueFilterTypeField()) + if m.formValues.valueFilter != kadmin.NoFilterType { + fields = append(fields, m.valueFilterTermField()) + } + + return huh.NewGroup(fields...) +} + +func (m *Model) valueFilterTermField() *huh.Input { + return huh.NewInput(). + Value(&m.formValues.valueFilterTerm). + Title("Value Filter Term") +} + +func (m *Model) valueFilterTypeField() *huh.Select[kadmin.FilterType] { + return huh.NewSelect[kadmin.FilterType](). + Value(&m.formValues.valueFilter). + Title("Value Filter Type"). + Options( + huh.NewOption("None", kadmin.NoFilterType), + huh.NewOption("Contains", kadmin.ContainsFilterType), + huh.NewOption("Starts With", kadmin.StartsWithFilterType)) +} + +func (m *Model) keyFilterTermField() *huh.Input { + return huh.NewInput(). + Value(&m.formValues.keyFilterTerm). + Title("Key Filter Term") +} + +func (m *Model) keyFilterTypeField() *huh.Select[kadmin.FilterType] { + return huh.NewSelect[kadmin.FilterType](). + Value(&m.formValues.keyFilter). + Title("Key Filter Type"). + Options( + huh.NewOption("None", kadmin.NoFilterType), + huh.NewOption("Contains", kadmin.ContainsFilterType), + huh.NewOption("Starts With", kadmin.StartsWithFilterType)) +} + +// hack until https://github.com/charmbracelet/huh/issues/525 has been resolved +func (m *Model) getPartitionDescription(ktx *kontext.ProgramKtx) string { + partitionDescription := "Select none to consume from all available partitions" + columnWidth := ktx.WindowWidth / 2 + extraSpaces := columnWidth - lipgloss.Width(partitionDescription) + for i := 0; i < extraSpaces; i++ { + partitionDescription += " " + } + return partitionDescription +} + +func (m *Model) NextField(count int) { + for i := 0; i < count; i++ { + m.form.NextField() + } +} + +func NewWithDetails(details *kadmin.ReadDetails, ktx *kontext.ProgramKtx) *Model { + return &Model{ + topic: details.Topic, + ktx: ktx, + formValues: &formValues{ + startPoint: details.StartPoint, + limit: details.Limit, + partitions: details.Partitions, + keyFilter: details.Filter.KeyFilter, + keyFilterTerm: details.Filter.KeySearchTerm, + valueFilter: details.Filter.ValueFilter, + valueFilterTerm: details.Filter.ValueSearchTerm, + }} } -func New(topic *kadmin.Topic) *Model { - return &Model{topic: topic, formValues: &formValues{}} +func New(topic *kadmin.Topic, ktx *kontext.ProgramKtx) *Model { + return &Model{topic: topic, formValues: &formValues{}, ktx: ktx} } diff --git a/ui/pages/consumption_form_page/consumption_form_page_test.go b/ui/pages/consumption_form_page/consumption_form_page_test.go index 9e9057e..82a04e0 100644 --- a/ui/pages/consumption_form_page/consumption_form_page_test.go +++ b/ui/pages/consumption_form_page/consumption_form_page_test.go @@ -1,7 +1,9 @@ package consumption_form_page import ( + "fmt" tea "github.com/charmbracelet/bubbletea" + "github.com/charmbracelet/huh" "github.com/stretchr/testify/assert" "ktea/kadmin" "ktea/kontext" @@ -19,9 +21,9 @@ func TestConsumeForm_Navigation(t *testing.T) { Replicas: 1, Isr: 1, Partitions: 10, - }) + }, ui.NewTestKontext()) // make sure form has been initialized - m.View(ui.TestKontext, ui.TestRenderer) + m.View(ui.NewTestKontext(), ui.TestRenderer) cmd := m.Update(keys.Key(tea.KeyEsc)) @@ -34,56 +36,43 @@ func TestConsumeForm_Navigation(t *testing.T) { Replicas: 1, Isr: 1, Partitions: 10, - }) + }, ui.NewTestKontext()) + // make sure form has been initialized - render := m.View(&kontext.ProgramKtx{ - Config: nil, - WindowWidth: 100, - WindowHeight: 100, - AvailableHeight: 100, - }, ui.TestRenderer) + m.View(ui.TestKontext, ui.TestRenderer) - assert.Contains(t, render, ` - Partitions - Select none to consume from all available partitions - > • 0 - • 1 - • 2 - • 3 - • 4 - • 5 - • 6 - • 7 - • 8 - • 9`) + render := m.View(ui.NewTestKontext(), ui.TestRenderer) + + assert.Contains(t, render, "> • 0") + for i := 1; i < 10; i++ { + assert.Regexp(t, fmt.Sprintf("• %d", i), render) + } assert.NotContains(t, render, "• 10") }) t.Run("renders subset of partitions when there is not enough height", func(t *testing.T) { + ktx := &kontext.ProgramKtx{ + Config: nil, + WindowWidth: 100, + WindowHeight: 20, + AvailableHeight: 20, + } m := New(&kadmin.Topic{ Name: "topic1", Replicas: 1, Isr: 1, Partitions: 100, - }) + }, ktx) // make sure form has been initialized - render := m.View(&kontext.ProgramKtx{ - Config: nil, - WindowWidth: 100, - WindowHeight: 20, - AvailableHeight: 20, - }, ui.TestRenderer) + m.View(ktx, ui.TestRenderer) - assert.Contains(t, render, ` - Partitions - Select none to consume from all available partitions - > • 0 - • 1 - • 2 - • 3 - • 4 - - Limit`) + render := m.View(ktx, ui.TestRenderer) + + assert.Contains(t, render, `> • 0`) + assert.Contains(t, render, `• 1`) + assert.Contains(t, render, `• 2`) + assert.Contains(t, render, `• 3`) + assert.Contains(t, render, `• 4`) assert.NotContains(t, render, "• 5") }) @@ -97,8 +86,17 @@ func TestConsumeForm_Navigation(t *testing.T) { }, Partitions: []int{3, 6}, StartPoint: kadmin.MostRecent, - Limit: 500, - }) + Filter: &kadmin.Filter{ + KeyFilter: kadmin.StartsWithFilterType, + KeySearchTerm: "starts-with-key-term", + ValueFilter: kadmin.ContainsFilterType, + ValueSearchTerm: "contains-value-term", + }, + Limit: 500, + }, ui.NewTestKontext()) + + // make sure form has been initialized + m.View(ui.NewTestKontext(), ui.TestRenderer) render := m.View(ui.TestKontext, ui.TestRenderer) @@ -107,6 +105,8 @@ func TestConsumeForm_Navigation(t *testing.T) { assert.Contains(t, render, "✓ 3") assert.Contains(t, render, "✓ 6") assert.Contains(t, render, "> ") + assert.Contains(t, render, "starts-with-key-term") + assert.Contains(t, render, "contains-value-term") }) t.Run("submitting form loads consumption page with consumption information", func(t *testing.T) { @@ -115,9 +115,9 @@ func TestConsumeForm_Navigation(t *testing.T) { Partitions: 10, Replicas: 1, Isr: 1, - }) + }, ui.NewTestKontext()) // make sure form has been initialized - m.View(ui.TestKontext, ui.TestRenderer) + m.View(ui.NewTestKontext(), ui.TestRenderer) // select start from most recent cmd := m.Update(keys.Key(tea.KeyDown)) @@ -141,7 +141,13 @@ func TestConsumeForm_Navigation(t *testing.T) { // next field cmd = m.Update(cmd()) // next group + m.Update(cmd()) + // no key filter + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field cmd = m.Update(cmd()) + // no value filter + msgs := keys.Submit(m) assert.Equal(t, nav.LoadConsumptionPageMsg{ ReadDetails: kadmin.ReadDetails{ @@ -151,11 +157,15 @@ func TestConsumeForm_Navigation(t *testing.T) { Replicas: 1, Isr: 1, }, + Filter: &kadmin.Filter{ + KeySearchTerm: "", + ValueSearchTerm: "", + }, Limit: 500, Partitions: []int{3, 5}, StartPoint: kadmin.MostRecent, }, - }, cmd()) + }, msgs[0]) }) t.Run("selecting partitions is optional", func(t *testing.T) { @@ -164,9 +174,9 @@ func TestConsumeForm_Navigation(t *testing.T) { Partitions: 10, Replicas: 1, Isr: 1, - }) + }, ui.NewTestKontext()) // make sure form has been initialized - m.View(ui.TestKontext, ui.TestRenderer) + m.View(ui.NewTestKontext(), ui.TestRenderer) // select start from most recent cmd := m.Update(keys.Key(tea.KeyDown)) @@ -183,7 +193,13 @@ func TestConsumeForm_Navigation(t *testing.T) { // next field cmd = m.Update(cmd()) // next group + m.Update(cmd()) + // no key filter + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field cmd = m.Update(cmd()) + // no value filter + msgs := keys.Submit(m) assert.Equal(t, nav.LoadConsumptionPageMsg{ ReadDetails: kadmin.ReadDetails{ @@ -193,11 +209,273 @@ func TestConsumeForm_Navigation(t *testing.T) { Replicas: 1, Isr: 1, }, + Filter: &kadmin.Filter{ + KeySearchTerm: "", + ValueSearchTerm: "", + }, + Limit: 500, + Partitions: []int{}, + StartPoint: kadmin.MostRecent, + }, + }, msgs[0]) + }) + + t.Run("selecting key filter type starts-with displays key filter value field", func(t *testing.T) { + m := New(&kadmin.Topic{ + Name: "topic1", + Partitions: 10, + Replicas: 1, + Isr: 1, + }, ui.NewTestKontext()) + // make sure form has been initialized + m.View(ui.NewTestKontext(), ui.TestRenderer) + + // select start from most recent + cmd := m.Update(keys.Key(tea.KeyDown)) + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + m.Update(cmd()) + // select no partitions + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // select limit 500 + m.Update(keys.Key(tea.KeyDown)) + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // next group + m.Update(cmd()) + // starts-with key filter + m.Update(keys.Key(tea.KeyDown)) + m.Update(keys.Key(tea.KeyDown)) + + render := m.View(&kontext.ProgramKtx{ + Config: nil, + WindowWidth: 100, + WindowHeight: 20, + AvailableHeight: 20, + }, ui.TestRenderer) + + assert.Contains(t, render, "Key Filter Term") + + t.Run("selecting no key filter type hides key value field again", func(t *testing.T) { + m.Update(keys.Key(tea.KeyUp)) + m.Update(keys.Key(tea.KeyUp)) + + render := m.View(&kontext.ProgramKtx{ + Config: nil, + WindowWidth: 100, + WindowHeight: 20, + AvailableHeight: 20, + }, ui.TestRenderer) + + assert.NotContains(t, render, "Key Filter Value") + }) + + t.Run("selecting no key filter after filling in key filter term does not search for entered value", func(t *testing.T) { + // select starts-with + m.Update(keys.Key(tea.KeyDown)) + m.Update(keys.Key(tea.KeyDown)) + + keys.UpdateKeys(m, "search-term") + + // selects none + m.Update(keys.Key(tea.KeyUp)) + m.Update(keys.Key(tea.KeyUp)) + + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // no value filter + msgs := keys.Submit(m) + + assert.Equal(t, nav.LoadConsumptionPageMsg{ + ReadDetails: kadmin.ReadDetails{ + Topic: &kadmin.Topic{ + Name: "topic1", + Partitions: 10, + Replicas: 1, + Isr: 1, + }, + Filter: &kadmin.Filter{ + KeySearchTerm: "", + ValueSearchTerm: "", + }, + Limit: 500, + Partitions: []int{}, + StartPoint: kadmin.MostRecent, + }, + }, msgs[0]) + }) + }) + + t.Run("filter on key value", func(t *testing.T) { + m := New(&kadmin.Topic{ + Name: "topic1", + Partitions: 10, + Replicas: 1, + Isr: 1, + }, ui.NewTestKontext()) + // make sure form has been initialized + m.View(ui.NewTestKontext(), ui.TestRenderer) + + // select start from most recent + cmd := m.Update(keys.Key(tea.KeyDown)) + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + m.Update(cmd()) + // select no partitions + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // select limit 500 + m.Update(keys.Key(tea.KeyDown)) + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // next group + m.Update(cmd()) + // starts-with key filter + m.Update(keys.Key(tea.KeyDown)) + m.Update(keys.Key(tea.KeyDown)) + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // filter on key value search-term + keys.UpdateKeys(m, "search-term") + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // no value filter + msgs := keys.Submit(m) + + assert.EqualValues(t, nav.LoadConsumptionPageMsg{ + ReadDetails: kadmin.ReadDetails{ + Topic: &kadmin.Topic{ + Name: "topic1", + Partitions: 10, + Replicas: 1, + Isr: 1, + }, + Filter: &kadmin.Filter{ + KeyFilter: kadmin.StartsWithFilterType, + KeySearchTerm: "search-term", + ValueSearchTerm: "", + }, Limit: 500, Partitions: []int{}, StartPoint: kadmin.MostRecent, }, - }, cmd()) + }, msgs[0]) }) + t.Run("selecting value filter type starts-with displays filter value field", func(t *testing.T) { + m := New(&kadmin.Topic{ + Name: "topic1", + Partitions: 10, + Replicas: 1, + Isr: 1, + }, ui.NewTestKontext()) + // make sure form has been initialized + m.View(ui.NewTestKontext(), ui.TestRenderer) + + // select start from most recent + cmd := m.Update(keys.Key(tea.KeyDown)) + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + m.Update(cmd()) + // select no partitions + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // select limit 500 + m.Update(keys.Key(tea.KeyDown)) + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // next group + m.Update(cmd()) + // no key filter + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // starts-with value filter + m.Update(keys.Key(tea.KeyDown)) + m.Update(keys.Key(tea.KeyDown)) + + render := m.View(&kontext.ProgramKtx{ + Config: nil, + WindowWidth: 100, + WindowHeight: 20, + AvailableHeight: 20, + }, ui.TestRenderer) + + assert.Contains(t, render, "Value Filter Term") + + // make sure the value filter term field is focussed + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // next group + m.Update(cmd()) + + field := m.form.GetFocusedField() + assert.IsType(t, &huh.Input{}, field) + assert.Contains(t, field.View(), "Value Filter Term") + + t.Run("selecting no value filter type hides key filter value field again", func(t *testing.T) { + cmd = m.Update(keys.Key(tea.KeyShiftTab)) + // prev field + cmd = m.Update(cmd()) + + m.Update(keys.Key(tea.KeyUp)) + m.Update(keys.Key(tea.KeyUp)) + + render := m.View(&kontext.ProgramKtx{ + Config: nil, + WindowWidth: 100, + WindowHeight: 20, + AvailableHeight: 20, + }, ui.TestRenderer) + + assert.NotContains(t, render, "Value Filter Term") + }) + + t.Run("selecting no value filter after filling in a value filter term does not search for entered value", func(t *testing.T) { + // select starts-with + m.Update(keys.Key(tea.KeyDown)) + m.Update(keys.Key(tea.KeyDown)) + + keys.UpdateKeys(m, "search-term") + + // selects none + m.Update(keys.Key(tea.KeyUp)) + m.Update(keys.Key(tea.KeyUp)) + + cmd = m.Update(keys.Key(tea.KeyEnter)) + // next field + cmd = m.Update(cmd()) + // no value filter + msgs := keys.Submit(m) + + assert.Equal(t, nav.LoadConsumptionPageMsg{ + ReadDetails: kadmin.ReadDetails{ + Topic: &kadmin.Topic{ + Name: "topic1", + Partitions: 10, + Replicas: 1, + Isr: 1, + }, + Filter: &kadmin.Filter{ + KeySearchTerm: "", + ValueSearchTerm: "", + }, + Limit: 500, + Partitions: []int{}, + StartPoint: kadmin.MostRecent, + }, + }, msgs[0]) + }) + }) } diff --git a/ui/pages/create_cluster_page/upsert_cluster_page.go b/ui/pages/create_cluster_page/upsert_cluster_page.go index 6149590..ab0a61a 100644 --- a/ui/pages/create_cluster_page/upsert_cluster_page.go +++ b/ui/pages/create_cluster_page/upsert_cluster_page.go @@ -320,7 +320,9 @@ func (m *Model) createForm() *huh.Form { } form := huh.NewForm( - huh.NewGroup(clusterFields...).Title("Cluster").Description("de").WithWidth(m.ktx.WindowWidth/2), + huh.NewGroup(clusterFields...). + Title("Cluster"). + WithWidth(m.ktx.WindowWidth/2), huh.NewGroup(schemaRegistryFields...), ) form.WithLayout(huh.LayoutColumns(2)) diff --git a/ui/pages/create_cluster_page/upsert_cluster_page_test.go b/ui/pages/create_cluster_page/upsert_cluster_page_test.go index 78affad..ff3a337 100644 --- a/ui/pages/create_cluster_page/upsert_cluster_page_test.go +++ b/ui/pages/create_cluster_page/upsert_cluster_page_test.go @@ -301,7 +301,7 @@ func TestCreateClusterPage(t *testing.T) { }, msgs[0]) }) - t.Run("Selecting SASL_SSL auth method displays username and password fields", func(t *testing.T) { + t.Run("Selecting SASL auth method displays username and password fields", func(t *testing.T) { // given programKtx := kontext.ProgramKtx{ WindowWidth: 100, @@ -331,14 +331,12 @@ func TestCreateClusterPage(t *testing.T) { keys.UpdateKeys(createEnvPage, "localhost:9092") cmd = createEnvPage.Update(keys.Key(tea.KeyEnter)) createEnvPage.Update(cmd()) - // and: auth method none is selected - cmd = createEnvPage.Update(keys.Key(tea.KeyDown)) - cmd = createEnvPage.Update(keys.Key(tea.KeyEnter)) - // next field - createEnvPage.Update(cmd()) + // and: auth method SASL is selected + createEnvPage.Update(keys.Key(tea.KeyDown)) // then render := createEnvPage.View(&programKtx, ui.TestRenderer) + assert.Contains(t, render, "SASL_SSL") assert.Contains(t, render, "Username") assert.Contains(t, render, "Password") }) diff --git a/ui/pages/record_details_page/record_details_page.go b/ui/pages/record_details_page/record_details_page.go index 3d61851..d73d2f2 100644 --- a/ui/pages/record_details_page/record_details_page.go +++ b/ui/pages/record_details_page/record_details_page.go @@ -130,11 +130,11 @@ func (m *Model) createPayloadViewPort(payloadWidth int, height int) { if m.payloadVp == nil { payloadVp := viewport.New(payloadWidth, height) m.payloadVp = &payloadVp + m.payloadVp.SetContent(m.payload) } else { m.payloadVp.Height = height m.payloadVp.Width = payloadWidth } - m.payloadVp.SetContent(m.payload) } func (m *Model) determineStyles() (lipgloss.Style, lipgloss.Style) { diff --git a/ui/tabs/topics_tab/topics_tab.go b/ui/tabs/topics_tab/topics_tab.go index efb850b..7e79298 100644 --- a/ui/tabs/topics_tab/topics_tab.go +++ b/ui/tabs/topics_tab/topics_tab.go @@ -60,9 +60,9 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd { case nav.LoadConsumptionFormPageMsg: if msg.ReadDetails != nil { - m.active = consumption_form_page.NewWithDetails(msg.ReadDetails) + m.active = consumption_form_page.NewWithDetails(msg.ReadDetails, m.ktx) } else { - m.active = consumption_form_page.New(msg.Topic) + m.active = consumption_form_page.New(msg.Topic, m.ktx) } case nav.LoadRecordDetailPageMsg: