diff --git a/src/backend/kafka.rs b/src/backend/kafka.rs index d72933d..2328c5b 100644 --- a/src/backend/kafka.rs +++ b/src/backend/kafka.rs @@ -1,5 +1,6 @@ use futures::future; -use rdkafka::client::ClientContext; +use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use rdkafka::client::{ClientContext, DefaultClientContext}; use rdkafka::config::{ClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}; use rdkafka::consumer::BaseConsumer; use rdkafka::consumer::{Consumer, ConsumerContext}; @@ -60,6 +61,13 @@ impl KafkaFetch { pub const VALUES: [Self; 2] = [Self::Newest, Self::Oldest]; } +#[derive(Debug, Clone)] +pub struct CreateTopicRequest { + pub name: String, + pub partition_count: u16, + pub replica_count: u8, +} + #[derive(Debug, Clone)] pub struct KafkaBackend { pub config: KrustConnection, @@ -84,10 +92,11 @@ impl KafkaBackend { info!("kafka::connection::timeout: {:?}", timeout); timeout } - fn producer(&self) -> Result { - let producer: Result = match self.config.security_type { + fn create_config(&self) -> ClientConfig { + let mut config = ClientConfig::new(); + match self.config.security_type { KrustConnectionSecurityType::SASL_PLAINTEXT => { - ClientConfig::new() + config .set("bootstrap.servers", self.config.brokers_list.clone()) .set("group.id", GROUP_ID) .set("enable.partition.eof", "false") @@ -111,10 +120,9 @@ impl KafkaBackend { ) //.set("sasl.jaas.config", self.config.jaas_config.clone().unwrap_or_default()) .set_log_level(RDKafkaLogLevel::Debug) - .create() } _ => { - ClientConfig::new() + config .set("bootstrap.servers", self.config.brokers_list.clone()) .set("group.id", GROUP_ID) .set("enable.partition.eof", "false") @@ -123,56 +131,44 @@ impl KafkaBackend { .set("message.timeout.ms", "5000") //.set("statistics.interval.ms", "30000") .set("auto.offset.reset", "earliest") - .create() } - }; - producer + } + .to_owned() + } + fn producer(&self) -> Result { + self.create_config().create() } fn consumer(&self, context: C) -> KafkaResult where C: ClientContext, T: FromClientConfigAndContext, { - match self.config.security_type { - KrustConnectionSecurityType::SASL_PLAINTEXT => { - ClientConfig::new() - .set("bootstrap.servers", self.config.brokers_list.clone()) - .set("group.id", GROUP_ID) - .set("enable.partition.eof", "false") - .set("session.timeout.ms", "6000") - .set("enable.auto.commit", "false") - //.set("statistics.interval.ms", "30000") - .set("auto.offset.reset", "earliest") - .set("security.protocol", self.config.security_type.to_string()) - .set( - "sasl.mechanisms", - self.config.sasl_mechanism.clone().unwrap_or_default(), - ) - .set( - "sasl.username", - self.config.sasl_username.clone().unwrap_or_default(), - ) - .set( - "sasl.password", - self.config.sasl_password.clone().unwrap_or_default(), - ) - //.set("sasl.jaas.config", self.config.jaas_config.clone().unwrap_or_default()) - //.set("debug", "all") - .set_log_level(RDKafkaLogLevel::Debug) - .create_with_context::(context) - } - _ => { - ClientConfig::new() - .set("bootstrap.servers", self.config.brokers_list.clone()) - .set("group.id", GROUP_ID) - .set("enable.partition.eof", "false") - .set("session.timeout.ms", "6000") - .set("enable.auto.commit", "false") - //.set("statistics.interval.ms", "30000") - .set("auto.offset.reset", "earliest") - .create_with_context::(context) - } - } + self.create_config().create_with_context(context) + } + fn create_admin_client(&self) -> Result, KafkaError> { + self.create_config().create() + //.expect("admin client creation failed") + } + + pub async fn create_topic(self, request: &CreateTopicRequest) -> Result { + let admin_client = self.create_admin_client()?; + let opts = AdminOptions::new().operation_timeout(Some(self.timeout())); + let topic = NewTopic::new( + &request.name, + request.partition_count as i32, + TopicReplication::Fixed(request.replica_count as i32), + ); + admin_client.create_topics(vec![&topic], &opts).await?; + Ok(true) + } + + pub async fn delete_topic(self, topic_name: String) -> Result { + let admin_client = self.create_admin_client()?; + let opts = AdminOptions::new().operation_timeout(Some(self.timeout())); + admin_client + .delete_topics(&[topic_name.as_str()], &opts) + .await?; + Ok(true) } pub async fn list_topics(&self) -> Result, ExternalError> { diff --git a/src/backend/repository.rs b/src/backend/repository.rs index 3806e4d..520da37 100644 --- a/src/backend/repository.rs +++ b/src/backend/repository.rs @@ -681,7 +681,13 @@ impl Repository { "DELETE FROM kr_connection WHERE id = :cid", )?; - + let mut topics_stmt_by_id = self.conn.prepare_cached( + "DELETE FROM kr_topic + WHERE connection_id = :cid", + )?; + topics_stmt_by_id + .execute(named_params! { ":cid": &conn_id,}) + .map_err(ExternalError::DatabaseError)?; stmt_by_id .execute(named_params! { ":cid": &conn_id,}) .map_err(ExternalError::DatabaseError) diff --git a/src/component/connection_list.rs b/src/component/connection_list.rs index 63cb747..32c8d4a 100644 --- a/src/component/connection_list.rs +++ b/src/component/connection_list.rs @@ -1,12 +1,13 @@ -use gtk::prelude::*; +use adw::{prelude::*, AlertDialog}; use relm4::{ factory::{DynamicIndex, FactoryComponent}, - FactorySender, + main_application, FactorySender, }; use tracing::info; use crate::{ backend::repository::{KrustConnection, KrustConnectionSecurityType}, + modals::utils::build_confirmation_alert, Repository, }; @@ -16,6 +17,7 @@ pub enum KrustConnectionMsg { Disconnect, Edit(DynamicIndex), Remove(DynamicIndex), + //ConfirmRemove, Refresh, } @@ -39,6 +41,8 @@ pub struct ConnectionListModel { pub color: Option, pub timeout: Option, pub is_connected: bool, + pub confirm_delete_alert: AlertDialog, + pub selected: Option, } impl From<&mut ConnectionListModel> for KrustConnection { @@ -108,7 +112,22 @@ impl FactoryComponent for ConnectionListModel { } } - fn init_model(conn: Self::Init, _index: &DynamicIndex, _sender: FactorySender) -> Self { + fn init_model(conn: Self::Init, index: &DynamicIndex, sender: FactorySender) -> Self { + let confirm_delete_alert = build_confirmation_alert( + "Delete".to_string(), + "Are you sure you want to delete the connection?".to_string(), + ); + let snd = sender.clone(); + let index = index.clone(); + let connection = conn.clone(); + confirm_delete_alert.connect_response(Some("confirm"), move |_, _| { + snd.output(KrustConnectionOutput::Remove( + index.clone(), + connection.clone(), + )) + .unwrap(); + //snd.input(KrustConnectionMsg::ConfirmRemove); + }); Self { id: conn.id, name: conn.name, @@ -120,6 +139,8 @@ impl FactoryComponent for ConnectionListModel { color: conn.color, timeout: conn.timeout, is_connected: false, + confirm_delete_alert, + selected: None, } } fn post_view(&self, widgets: &mut Self::Widgets) {} @@ -171,10 +192,9 @@ impl FactoryComponent for ConnectionListModel { .unwrap(); } KrustConnectionMsg::Remove(index) => { - info!("Edit request for {}", self.name); - sender - .output(KrustConnectionOutput::Remove(index, self.into())) - .unwrap(); + info!("Delete request for {}", self.name); + let main_window = main_application().active_window().unwrap(); + self.confirm_delete_alert.present(&main_window); } KrustConnectionMsg::Refresh => { widgets.label.set_label(&self.name); diff --git a/src/component/topics/create_dialog.rs b/src/component/topics/create_dialog.rs new file mode 100644 index 0000000..2ffd5a7 --- /dev/null +++ b/src/component/topics/create_dialog.rs @@ -0,0 +1,236 @@ +use crate::backend::{ + kafka::{CreateTopicRequest, KafkaBackend}, + repository::KrustConnection, +}; +use adw::prelude::*; +use gtk::Adjustment; +use relm4::*; + +use tracing::*; + +#[derive(Debug)] +pub struct CreateTopicDialogModel { + pub connection: Option, + pub partition_count: Option, + pub replica_count: Option, +} + +#[derive(Debug)] +pub enum CreateTopicDialogMsg { + Show, + Create, + Cancel, + Close, + SetPartitionCount, + SetReplicaCount, +} + +#[derive(Debug)] +pub enum CreateTopicDialogOutput { + RefreshTopics, +} + +#[derive(Debug)] +pub enum AsyncCommandOutput { + CreateResult, +} + +#[relm4::component(pub)] +impl Component for CreateTopicDialogModel { + type Init = Option; + type Input = CreateTopicDialogMsg; + type Output = CreateTopicDialogOutput; + type CommandOutput = AsyncCommandOutput; + + view! { + #[root] + main_dialog = adw::Dialog { + set_title: "Create topic", + set_content_width: 400, + // set_content_height: dialog_height, + #[wrap(Some)] + set_child = >k::Box { + set_orientation: gtk::Orientation::Vertical, + adw::HeaderBar {}, + set_valign: gtk::Align::Fill, + gtk::Box { + set_orientation: gtk::Orientation::Vertical, + set_valign: gtk::Align::Fill, + set_margin_all: 10, + #[name(create_topic_settings_group)] + adw::PreferencesGroup { + set_title: "Settings", + set_margin_top: 10, + set_vexpand: false, + set_hexpand: true, + #[name(name)] + adw::EntryRow { + set_title: "Name", + }, + #[name(partition_count)] + adw::SpinRow { + set_title: "Partition count", + set_subtitle: "Number of partitions", + set_snap_to_ticks: true, + set_numeric: true, + set_wrap: false, + set_update_policy: gtk::SpinButtonUpdatePolicy::IfValid, + connect_value_notify => CreateTopicDialogMsg::SetPartitionCount, + }, + #[name(replica_count)] + adw::SpinRow { + set_title: "Replica count", + set_subtitle: "Number of replicas", + set_snap_to_ticks: true, + set_numeric: true, + set_wrap: false, + set_update_policy: gtk::SpinButtonUpdatePolicy::IfValid, + connect_value_notify => CreateTopicDialogMsg::SetReplicaCount, + }, + }, + gtk::Box { + set_margin_top: 10, + set_margin_bottom: 10, + set_orientation: gtk::Orientation::Horizontal, + set_halign: gtk::Align::End, + #[name(single_message_send)] + gtk::Button { + set_label: "Create", + add_css_class: "destructive-action", + connect_clicked[sender] => move |_| { + sender.input(CreateTopicDialogMsg::Create); + }, + }, + #[name(single_message_cancel)] + gtk::Button { + set_label: "Cancel", + set_margin_start: 10, + connect_clicked[sender] => move |_| { + sender.input(CreateTopicDialogMsg::Cancel); + }, + }, + } + }, + }, + connect_closed[sender] => move |_this| { + sender.input(CreateTopicDialogMsg::Close); + }, + } + } + + fn init( + current_connection: Self::Init, + root: Self::Root, + sender: ComponentSender, + ) -> ComponentParts { + let connection = current_connection.clone(); + + let model = CreateTopicDialogModel { + connection, + partition_count: None, + replica_count: None, + }; + + let widgets = view_output!(); + let adjustment_partition_count = Adjustment::builder() + .lower(1.0) + .upper(1000.0) + .page_size(0.0) + .step_increment(1.0) + .value(1.0) + .build(); + widgets + .partition_count + .set_adjustment(Some(&adjustment_partition_count)); + let adjustment_replica_count = Adjustment::builder() + .lower(1.0) + .upper(50.0) + .page_size(0.0) + .step_increment(1.0) + .value(1.0) + .build(); + widgets + .replica_count + .set_adjustment(Some(&adjustment_replica_count)); + ComponentParts { model, widgets } + } + + fn update_with_view( + &mut self, + widgets: &mut Self::Widgets, + msg: CreateTopicDialogMsg, + sender: ComponentSender, + root: &Self::Root, + ) { + debug!("received message: {:?}", msg); + + match msg { + CreateTopicDialogMsg::Show => { + let parent = &relm4::main_application().active_window().unwrap(); + root.queue_allocate(); + root.present(parent); + } + CreateTopicDialogMsg::Cancel => { + root.close(); + } + CreateTopicDialogMsg::Create => { + info!("create"); + let name: String = widgets.name.text().into(); + let partition_count = self.partition_count.unwrap_or(1); + let replica_count = self.replica_count.unwrap_or(1); + let connection = self.connection.clone().unwrap(); + sender.oneshot_command(async move { + let kafka = KafkaBackend::new(&connection); + let result = kafka + .create_topic(&CreateTopicRequest { + name: name.clone(), + partition_count, + replica_count, + }) + .await; + match result { + Err(e) => { + error!("problem creating topic {}: {}", &name, e) + } + Ok(_) => { + info!("topic {} created", &name) + } + }; + AsyncCommandOutput::CreateResult + }) + } + CreateTopicDialogMsg::Close => { + info!("close"); + } + CreateTopicDialogMsg::SetPartitionCount => { + let value = widgets.partition_count.value(); + self.partition_count = Some(value as u16); + } + CreateTopicDialogMsg::SetReplicaCount => { + let value = widgets.replica_count.value(); + self.replica_count = Some(value as u8); + } + }; + + self.update_view(widgets, sender); + } + + fn update_cmd_with_view( + &mut self, + widgets: &mut Self::Widgets, + message: Self::CommandOutput, + sender: ComponentSender, + root: &Self::Root, + ) { + match message { + AsyncCommandOutput::CreateResult => { + info!("CreateResult"); + widgets.name.set_text(""); + sender + .output(CreateTopicDialogOutput::RefreshTopics) + .expect("should send refresh to output"); + root.close(); + } + } + } +} diff --git a/src/component/topics/mod.rs b/src/component/topics/mod.rs index 151de81..fe009a2 100644 --- a/src/component/topics/mod.rs +++ b/src/component/topics/mod.rs @@ -1,2 +1,3 @@ +pub(crate) mod create_dialog; pub(crate) mod topics_page; pub(crate) mod topics_tab; diff --git a/src/component/topics/topics_tab.rs b/src/component/topics/topics_tab.rs index 56bdfde..c7fbb44 100644 --- a/src/component/topics/topics_tab.rs +++ b/src/component/topics/topics_tab.rs @@ -1,5 +1,7 @@ use std::{cell::RefCell, cmp::Ordering, collections::HashMap}; +use crate::modals::utils::build_confirmation_alert; +use crate::AppMsg; use crate::{ backend::{ kafka::KafkaBackend, @@ -8,15 +10,19 @@ use crate::{ component::status_bar::{StatusBarMsg, STATUS_BROKER}, config::ExternalError, modals::utils::show_error_alert, - Repository, + Repository, TOASTER_BROKER, }; -use gtk::{glib::SignalHandlerId, prelude::*}; +use adw::{prelude::*, AlertDialog}; +use gtk::glib::SignalHandlerId; use relm4::{ factory::{DynamicIndex, FactoryComponent}, typed_view::column::{LabelColumn, RelmColumn, TypedColumnView}, *, }; use tracing::{debug, error, info}; +use uuid::Uuid; + +use super::create_dialog::{CreateTopicDialogModel, CreateTopicDialogMsg, CreateTopicDialogOutput}; relm4::new_action_group!(pub(super) TopicListActionGroup, "topic-list"); relm4::new_stateless_action!(pub(super) FavouriteAction, TopicListActionGroup, "toggle-favourite"); @@ -165,6 +171,9 @@ pub struct TopicsTabModel { pub topics_wrapper: TypedColumnView, pub is_loading: bool, pub search_text: String, + pub create_topic: Controller, + pub confirmation_alert: AlertDialog, + pub selected_topic_name: Option, } #[derive(Debug)] @@ -175,6 +184,11 @@ pub enum TopicsTabMsg { FavouriteToggled { topic_name: String, is_active: bool }, ToggleFavouritesFilter(bool), RefreshTopics, + CreateTopic, + DeleteTopic, + ConfirmDeleteTopic, + Ignore, + SelectTopic(u32), } #[derive(Debug)] @@ -185,9 +199,9 @@ pub enum TopicsTabOutput { #[derive(Debug)] pub enum CommandMsg { - // Data(Vec), ListFinished(Vec), ShowError(ExternalError), + DeleteTopicResult, } impl TopicsTabModel { @@ -249,6 +263,26 @@ impl FactoryComponent for TopicsTabModel { #[wrap(Some)] set_end_widget = >k::Box { set_orientation: gtk::Orientation::Horizontal, + #[name(btn_create_topic)] + gtk::Button { + set_tooltip_text: Some("Create topic"), + set_icon_name: "list-add-symbolic", + set_margin_start: 5, + connect_clicked[sender] => move |_| { + info!("Create topic"); + sender.input(TopicsTabMsg::CreateTopic); + }, + }, + #[name(btn_delete_topic)] + gtk::Button { + set_tooltip_text: Some("Delete selected topic"), + set_icon_name: "edit-delete-symbolic", + set_margin_start: 5, + add_css_class: "krust-destroy", + connect_clicked[sender] => move |_| { + sender.input(TopicsTabMsg::DeleteTopic); + }, + }, #[name(btn_refresh)] gtk::Button { set_icon_name: "media-playlist-repeat-symbolic", @@ -286,11 +320,32 @@ impl FactoryComponent for TopicsTabModel { view_wrapper.set_filter_status(0, false); let connection = current.connection.clone(); + let create_topic = CreateTopicDialogModel::builder() + .launch(Some(connection.clone())) + .forward(sender.input_sender(), |msg| match msg { + CreateTopicDialogOutput::RefreshTopics => TopicsTabMsg::RefreshTopics, + }); + + let confirmation_alert = build_confirmation_alert( + "Delete".to_string(), + "Are you sure you want to delete the topic?".to_string(), + ); + let snd: FactorySender = sender.clone(); + confirmation_alert.connect_response(Some("cancel"), move |_, _| { + snd.input(TopicsTabMsg::Ignore); + }); + let snd: FactorySender = sender.clone(); + confirmation_alert.connect_response(Some("confirm"), move |_, _| { + snd.input(TopicsTabMsg::ConfirmDeleteTopic); + }); let model = TopicsTabModel { current: Some(connection), topics_wrapper: view_wrapper, is_loading: false, search_text: String::default(), + create_topic, + confirmation_alert, + selected_topic_name: None, }; let topics_view = &model.topics_wrapper.view; @@ -298,6 +353,18 @@ impl FactoryComponent for TopicsTabModel { topics_view.connect_activate(move |_view, idx| { snd.input(TopicsTabMsg::OpenTopic(idx)); }); + let snd: FactorySender = sender.clone(); + topics_view + .model() + .unwrap() + .connect_selection_changed(move |selection_model, _i, _j| { + let size = selection_model.selection().size(); + if size == 1 { + let selected = selection_model.selection().minimum(); + info!("messages_view::selection_changed[{}]", selected); + snd.input(TopicsTabMsg::SelectTopic(selected)); + } + }); sender.input(TopicsTabMsg::RefreshTopics); model } @@ -309,11 +376,37 @@ impl FactoryComponent for TopicsTabModel { sender: FactorySender, ) { match msg { + TopicsTabMsg::Ignore => {} + TopicsTabMsg::SelectTopic(idx) => { + let item = self.topics_wrapper.get_visible(idx).unwrap(); + let topic_name = item.borrow().name.clone(); + self.selected_topic_name = Some(topic_name); + } TopicsTabMsg::RefreshTopics => { if let Some(connection) = self.current.clone() { sender.input(TopicsTabMsg::List(connection)); } } + TopicsTabMsg::CreateTopic => { + self.create_topic.emit(CreateTopicDialogMsg::Show); + } + TopicsTabMsg::ConfirmDeleteTopic => { + info!("deleting topic {:?}", self.selected_topic_name.clone()); + let connection = self.current.clone().unwrap(); + if let Some(topic_name) = self.selected_topic_name.clone() { + sender.oneshot_command(async move { + let kafka = KafkaBackend::new(&connection); + let result = kafka.delete_topic(topic_name).await; + match result { + Err(e) => CommandMsg::ShowError(e), + Ok(_) => CommandMsg::DeleteTopicResult, + } + }) + } + } + TopicsTabMsg::DeleteTopic => { + self.confirmation_alert.present(&widgets.root); + } TopicsTabMsg::Search(term) => { self.topics_wrapper.clear_filters(); let search_term = term.clone(); @@ -322,6 +415,11 @@ impl FactoryComponent for TopicsTabModel { } TopicsTabMsg::List(conn) => { STATUS_BROKER.send(StatusBarMsg::Start); + let id = Uuid::new_v4(); + TOASTER_BROKER.send(AppMsg::ShowToast( + id.to_string(), + "Connecting...".to_string(), + )); self.topics_wrapper.clear(); self.current = Some(conn.clone()); let result_topics_map = self.fetch_persited_topics(); @@ -341,16 +439,30 @@ impl FactoryComponent for TopicsTabModel { topic.cached = t.cached; } } + TOASTER_BROKER.send(AppMsg::HideToast(id.to_string())); CommandMsg::ListFinished(topics) } Err(error) => { + TOASTER_BROKER.send(AppMsg::HideToast(id.to_string())); output.emit(TopicsTabOutput::HandleError(conn.clone(), true)); - CommandMsg::ShowError(error) + let display_error = if let ExternalError::KafkaUnexpectedError( + rdkafka::error::KafkaError::MetadataFetch(_), + ) = &error + { + ExternalError::DisplayError( + "connecting".to_string(), + "kafka broker unreachable".to_string(), + ) + } else { + error + }; + CommandMsg::ShowError(display_error) } } }); } Err(err) => { + TOASTER_BROKER.send(AppMsg::HideToast(id.to_string())); sender .output_sender() .emit(TopicsTabOutput::HandleError(conn.clone(), true)); @@ -452,10 +564,13 @@ impl FactoryComponent for TopicsTabModel { }); } CommandMsg::ShowError(error) => { - let error_message = format!("Error connecting: {}", error); + let error_message = format!("{}", error); error!(error_message); show_error_alert(&widgets.root, error_message); } + CommandMsg::DeleteTopicResult => { + sender.input(TopicsTabMsg::RefreshTopics); + } } } } diff --git a/src/config.rs b/src/config.rs index 9028a43..e1938a2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,6 +26,8 @@ pub enum ExternalError { ConfigurationError(String), #[error("error caching messages for topic {0}, duration: {1}")] CachingError(String, String), + #[error("Error {0}: {1}")] + DisplayError(String, String), } /// Application state that is not intended to be directly configurable by the user. The state is diff --git a/src/modals/utils.rs b/src/modals/utils.rs index 77523cd..8c35e16 100644 --- a/src/modals/utils.rs +++ b/src/modals/utils.rs @@ -1,18 +1,37 @@ use adw::prelude::*; - pub(crate) fn show_error_alert(parent: &impl IsA, message: String) { let alert = adw::AlertDialog::builder() - .heading_use_markup(true) - .heading("Error") - .title("Error") - .body(message.as_str()) - .close_response("close") - .default_response("close") - .can_close(true) - .receives_default(true) - .build(); - alert.add_response("close", "Ok"); - alert.set_response_appearance("close", adw::ResponseAppearance::Destructive); - alert.present(parent); + .heading_use_markup(true) + .heading("Error") + .title("Error") + .body(message.as_str()) + .close_response("close") + .default_response("close") + .can_close(true) + .receives_default(true) + .build(); + alert.add_response("close", "Ok"); + alert.set_response_appearance("close", adw::ResponseAppearance::Destructive); + alert.present(parent); +} + +pub(crate) fn build_confirmation_alert( + confirmation_label: String, + message: String, +) -> adw::AlertDialog { + let confirmation_alert = adw::AlertDialog::builder() + .heading_use_markup(true) + .heading("Confirmation") + .title("Warning") + .body(message.as_str()) + .close_response("confirm") + .default_response("cancel") + .can_close(true) + .receives_default(true) + .build(); + confirmation_alert.add_response("cancel", "Cancel"); + confirmation_alert.add_response("confirm", confirmation_label.as_str()); + confirmation_alert.set_response_appearance("confirm", adw::ResponseAppearance::Destructive); + confirmation_alert }