Skip to content

Commit

Permalink
Minor fixes and improvements.
Browse files Browse the repository at this point in the history
  - #13 Delete connection confirmation.
  - #21 Topic creation and deletion with confirmation.
  • Loading branch information
miguelbaldi committed Sep 14, 2024
1 parent 7066a29 commit f82d630
Show file tree
Hide file tree
Showing 8 changed files with 470 additions and 75 deletions.
94 changes: 45 additions & 49 deletions src/backend/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::future;
use rdkafka::client::ClientContext;
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::client::{ClientContext, DefaultClientContext};
use rdkafka::config::{ClientConfig, FromClientConfigAndContext, RDKafkaLogLevel};
use rdkafka::consumer::BaseConsumer;
use rdkafka::consumer::{Consumer, ConsumerContext};
Expand Down Expand Up @@ -60,6 +61,13 @@ impl KafkaFetch {
pub const VALUES: [Self; 2] = [Self::Newest, Self::Oldest];
}

#[derive(Debug, Clone)]
pub struct CreateTopicRequest {
pub name: String,
pub partition_count: u16,
pub replica_count: u8,
}

#[derive(Debug, Clone)]
pub struct KafkaBackend {
pub config: KrustConnection,
Expand All @@ -84,10 +92,11 @@ impl KafkaBackend {
info!("kafka::connection::timeout: {:?}", timeout);
timeout
}
fn producer(&self) -> Result<FutureProducer, KafkaError> {
let producer: Result<FutureProducer, KafkaError> = match self.config.security_type {
fn create_config(&self) -> ClientConfig {
let mut config = ClientConfig::new();
match self.config.security_type {
KrustConnectionSecurityType::SASL_PLAINTEXT => {
ClientConfig::new()
config
.set("bootstrap.servers", self.config.brokers_list.clone())
.set("group.id", GROUP_ID)
.set("enable.partition.eof", "false")
Expand All @@ -111,10 +120,9 @@ impl KafkaBackend {
)
//.set("sasl.jaas.config", self.config.jaas_config.clone().unwrap_or_default())
.set_log_level(RDKafkaLogLevel::Debug)
.create()
}
_ => {
ClientConfig::new()
config
.set("bootstrap.servers", self.config.brokers_list.clone())
.set("group.id", GROUP_ID)
.set("enable.partition.eof", "false")
Expand All @@ -123,56 +131,44 @@ impl KafkaBackend {
.set("message.timeout.ms", "5000")
//.set("statistics.interval.ms", "30000")
.set("auto.offset.reset", "earliest")
.create()
}
};
producer
}
.to_owned()
}
fn producer(&self) -> Result<FutureProducer, KafkaError> {
self.create_config().create()
}
fn consumer<C, T>(&self, context: C) -> KafkaResult<T>
where
C: ClientContext,
T: FromClientConfigAndContext<C>,
{
match self.config.security_type {
KrustConnectionSecurityType::SASL_PLAINTEXT => {
ClientConfig::new()
.set("bootstrap.servers", self.config.brokers_list.clone())
.set("group.id", GROUP_ID)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
//.set("statistics.interval.ms", "30000")
.set("auto.offset.reset", "earliest")
.set("security.protocol", self.config.security_type.to_string())
.set(
"sasl.mechanisms",
self.config.sasl_mechanism.clone().unwrap_or_default(),
)
.set(
"sasl.username",
self.config.sasl_username.clone().unwrap_or_default(),
)
.set(
"sasl.password",
self.config.sasl_password.clone().unwrap_or_default(),
)
//.set("sasl.jaas.config", self.config.jaas_config.clone().unwrap_or_default())
//.set("debug", "all")
.set_log_level(RDKafkaLogLevel::Debug)
.create_with_context::<C, T>(context)
}
_ => {
ClientConfig::new()
.set("bootstrap.servers", self.config.brokers_list.clone())
.set("group.id", GROUP_ID)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
//.set("statistics.interval.ms", "30000")
.set("auto.offset.reset", "earliest")
.create_with_context::<C, T>(context)
}
}
self.create_config().create_with_context(context)
}
fn create_admin_client(&self) -> Result<AdminClient<DefaultClientContext>, KafkaError> {
self.create_config().create()
//.expect("admin client creation failed")
}

pub async fn create_topic(self, request: &CreateTopicRequest) -> Result<bool, ExternalError> {
let admin_client = self.create_admin_client()?;
let opts = AdminOptions::new().operation_timeout(Some(self.timeout()));
let topic = NewTopic::new(
&request.name,
request.partition_count as i32,
TopicReplication::Fixed(request.replica_count as i32),
);
admin_client.create_topics(vec![&topic], &opts).await?;
Ok(true)
}

pub async fn delete_topic(self, topic_name: String) -> Result<bool, ExternalError> {
let admin_client = self.create_admin_client()?;
let opts = AdminOptions::new().operation_timeout(Some(self.timeout()));
admin_client
.delete_topics(&[topic_name.as_str()], &opts)
.await?;
Ok(true)
}

pub async fn list_topics(&self) -> Result<Vec<KrustTopic>, ExternalError> {
Expand Down
8 changes: 7 additions & 1 deletion src/backend/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,13 @@ impl Repository {
"DELETE FROM kr_connection
WHERE id = :cid",
)?;

let mut topics_stmt_by_id = self.conn.prepare_cached(
"DELETE FROM kr_topic
WHERE connection_id = :cid",
)?;
topics_stmt_by_id
.execute(named_params! { ":cid": &conn_id,})
.map_err(ExternalError::DatabaseError)?;
stmt_by_id
.execute(named_params! { ":cid": &conn_id,})
.map_err(ExternalError::DatabaseError)
Expand Down
34 changes: 27 additions & 7 deletions src/component/connection_list.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use gtk::prelude::*;
use adw::{prelude::*, AlertDialog};
use relm4::{
factory::{DynamicIndex, FactoryComponent},
FactorySender,
main_application, FactorySender,
};
use tracing::info;

use crate::{
backend::repository::{KrustConnection, KrustConnectionSecurityType},
modals::utils::build_confirmation_alert,
Repository,
};

Expand All @@ -16,6 +17,7 @@ pub enum KrustConnectionMsg {
Disconnect,
Edit(DynamicIndex),
Remove(DynamicIndex),
//ConfirmRemove,
Refresh,
}

Expand All @@ -39,6 +41,8 @@ pub struct ConnectionListModel {
pub color: Option<String>,
pub timeout: Option<usize>,
pub is_connected: bool,
pub confirm_delete_alert: AlertDialog,
pub selected: Option<DynamicIndex>,
}

impl From<&mut ConnectionListModel> for KrustConnection {
Expand Down Expand Up @@ -108,7 +112,22 @@ impl FactoryComponent for ConnectionListModel {
}
}

fn init_model(conn: Self::Init, _index: &DynamicIndex, _sender: FactorySender<Self>) -> Self {
fn init_model(conn: Self::Init, index: &DynamicIndex, sender: FactorySender<Self>) -> Self {
let confirm_delete_alert = build_confirmation_alert(
"Delete".to_string(),
"Are you sure you want to delete the connection?".to_string(),
);
let snd = sender.clone();
let index = index.clone();
let connection = conn.clone();
confirm_delete_alert.connect_response(Some("confirm"), move |_, _| {
snd.output(KrustConnectionOutput::Remove(
index.clone(),
connection.clone(),
))
.unwrap();
//snd.input(KrustConnectionMsg::ConfirmRemove);
});
Self {
id: conn.id,
name: conn.name,
Expand All @@ -120,6 +139,8 @@ impl FactoryComponent for ConnectionListModel {
color: conn.color,
timeout: conn.timeout,
is_connected: false,
confirm_delete_alert,
selected: None,
}
}
fn post_view(&self, widgets: &mut Self::Widgets) {}
Expand Down Expand Up @@ -171,10 +192,9 @@ impl FactoryComponent for ConnectionListModel {
.unwrap();
}
KrustConnectionMsg::Remove(index) => {
info!("Edit request for {}", self.name);
sender
.output(KrustConnectionOutput::Remove(index, self.into()))
.unwrap();
info!("Delete request for {}", self.name);
let main_window = main_application().active_window().unwrap();
self.confirm_delete_alert.present(&main_window);
}
KrustConnectionMsg::Refresh => {
widgets.label.set_label(&self.name);
Expand Down
Loading

0 comments on commit f82d630

Please sign in to comment.