From c1d405b0150f17f6522c8b055dd1c8847419f961 Mon Sep 17 00:00:00 2001 From: Miguel Aranha Baldi Horlle Date: Wed, 24 Apr 2024 00:02:30 -0300 Subject: [PATCH] Messages page improvements. - Live: supporting fetch types: `Newest|Oldest`. - Live: supporting max messages per partition. - UI improvements. - Adding message key to the list and cache. - Messages copy: popover menu with copy commands, including CSV format. --- Cargo.lock | 22 ++ Cargo.toml | 1 + src/backend/kafka.rs | 201 ++++++++++++++----- src/backend/repository.rs | 63 +++--- src/backend/worker.rs | 29 +-- src/component/app.rs | 1 - src/component/messages/lists.rs | 35 +++- src/component/messages/messages_page.rs | 1 - src/component/messages/messages_tab.rs | 254 +++++++++++++++++++++--- src/main.rs | 3 +- 10 files changed, 491 insertions(+), 119 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26ee1c6..0bb09cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -218,6 +218,27 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "directories" version = "4.0.1" @@ -748,6 +769,7 @@ dependencies = [ "anyhow", "chrono", "chrono-tz", + "csv", "directories", "futures", "gtk4", diff --git a/Cargo.toml b/Cargo.toml index cfd0f40..14a15f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ chrono = { version = "0.4.37", features = ["serde"] } chrono-tz = { version = "0.9.0", features = [ "filter-by-regex" ] } strum = { version = "0.26.2", features = ["derive"] } rdkafka = { version = "0.36.2", features = [ "cmake-build", "gssapi", "ssl" ] } +csv = "1.3.0" [target.'cfg(target_os = "windows")'.dependencies] sasl2-sys = { version = "0.1.20",features = ["openssl-vendored"]} diff --git a/src/backend/kafka.rs b/src/backend/kafka.rs index 7e00594..9961cb5 100644 --- a/src/backend/kafka.rs +++ b/src/backend/kafka.rs @@ -7,7 +7,6 @@ use rdkafka::message::Headers; use rdkafka::topic_partition_list::TopicPartitionList; use rdkafka::{Message, Offset}; -use std::borrow::Borrow; use std::collections::HashMap; use std::time::{Duration, Instant}; use tracing::{debug, info, trace, warn}; @@ -41,13 +40,17 @@ type LoggingConsumer = StreamConsumer; // rdkafka: end -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, strum::EnumString, strum::Display)] pub enum KafkaFetch { #[default] Newest, Oldest, } +impl KafkaFetch { + pub const VALUES: [Self; 2] = [Self::Newest, Self::Oldest]; +} + #[derive(Debug, Clone)] pub struct KafkaBackend { pub config: KrustConnection, @@ -182,14 +185,21 @@ impl KafkaBackend { pub async fn topic_message_count( &self, topic: &String, + fetch: Option, + max_messages: Option, current_partitions: Option>, ) -> KrustTopic { - info!("couting messages for topic {}", topic); + info!( + "couting messages for topic {}, fetch {:?}, max messages {:?}", + topic, fetch, max_messages + ); let mut message_count: usize = 0; let partitions = &self.fetch_partitions(topic).await; let mut result = current_partitions.clone().unwrap_or_default(); let cpartitions = ¤t_partitions.unwrap_or_default().clone(); + let fetch = fetch.unwrap_or_default(); + let max_messages: i64 = max_messages.unwrap_or_default(); let part_map = cpartitions .into_iter() @@ -212,8 +222,56 @@ impl KafkaBackend { message_count += usize::try_from(p.offset_high.unwrap_or_default()).unwrap() - usize::try_from(low).unwrap(); } else { - message_count += usize::try_from(p.offset_high.unwrap_or_default()).unwrap() - - usize::try_from(p.offset_low.unwrap_or_default()).unwrap(); + let (low, high) = match fetch { + KafkaFetch::Newest => { + let low = p.offset_high.unwrap_or_default() - max_messages; + debug!( + "Newest::[low={},new_low={},high={},max={}]", + p.offset_low.unwrap_or_default(), + low, + p.offset_high.unwrap_or_default(), + max_messages + ); + if max_messages > 0 + && p.offset_high.unwrap_or_default() >= max_messages + && low >= p.offset_low.unwrap_or_default() + { + (low, p.offset_high.unwrap_or_default()) + } else { + ( + p.offset_low.unwrap_or_default(), + p.offset_high.unwrap_or_default(), + ) + } + } + KafkaFetch::Oldest => { + let high = p.offset_low.unwrap_or_default() + max_messages; + debug!( + "Oldest::[low={},high={},new_high={},max={}]", + p.offset_low.unwrap_or_default(), + p.offset_high.unwrap_or_default(), + high, + max_messages + ); + if max_messages > 0 + && p.offset_low.unwrap_or_default() < high + && high <= p.offset_high.unwrap_or_default() + { + (p.offset_low.unwrap_or_default(), high) + } else { + ( + p.offset_low.unwrap_or_default(), + p.offset_high.unwrap_or_default(), + ) + } + } + }; + result.push(Partition { + id: p.id, + offset_low: Some(low), + offset_high: Some(high), + }); + message_count += usize::try_from(high).unwrap() - usize::try_from(low).unwrap(); }; } @@ -222,7 +280,7 @@ impl KafkaBackend { connection_id: None, name: topic.clone(), cached: None, - partitions: if !cpartitions.is_empty() { + partitions: if !result.is_empty() { result } else { partitions.clone() @@ -288,8 +346,16 @@ impl KafkaBackend { "" } }; + let key = match m.key_view::() { + None => "", + Some(Ok(s)) => s, + Some(Err(e)) => { + warn!("Error while deserializing message key: {:?}", e); + "" + } + }; trace!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}", - m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp()); + key, payload, m.topic(), m.partition(), m.offset(), m.timestamp()); let headers = if let Some(headers) = m.headers() { let mut header_list: Vec = vec![]; for header in headers.iter() { @@ -309,13 +375,18 @@ impl KafkaBackend { topic: m.topic().to_string(), partition: m.partition(), offset: m.offset(), + key: key.to_string(), timestamp: m.timestamp().to_millis(), value: payload.to_string(), headers, }; match mrepo.save_message(&conn, &message) { Ok(_) => trace!("message with offset {} saved", &message.offset), - Err(err) => warn!("unable to save message with offset {}: {}", &message.offset, err.to_string()), + Err(err) => warn!( + "unable to save message with offset {}: {}", + &message.offset, + err.to_string() + ), }; counter += 1; } @@ -331,7 +402,8 @@ impl KafkaBackend { pub async fn list_messages_for_topic( &self, topic: &String, - total: usize, + fetch: Option, + max_messages: Option, ) -> Result, ExternalError> { let start_mark = Instant::now(); info!("starting listing messages for topic {}", topic); @@ -341,51 +413,86 @@ impl KafkaBackend { let mut counter = 0; - info!("consumer created"); + let topic = self + .topic_message_count(topic, fetch.clone(), max_messages, None) + .await; + let total = topic.total.clone().unwrap_or_default(); + let partitions = topic.partitions.clone(); + + let max_offset_map = partitions + .clone() + .into_iter() + .map(|p| (p.id, p.offset_high.clone().unwrap_or_default())) + .collect::>(); + + let mut partition_list = TopicPartitionList::with_capacity(partitions.capacity()); + for p in partitions.iter() { + let offset = Offset::from_raw(p.offset_low.unwrap_or_default()); + partition_list + .add_partition_offset(topic_name, p.id, offset) + .unwrap(); + } + info!("seeking partitions\n{:?}", partition_list); consumer - .subscribe(&[topic_name]) - .expect("Can't subscribe to specified topics"); + .assign(&partition_list) + .expect("Can't subscribe to partition list"); + let mut messages: Vec = Vec::with_capacity(total); while counter < total { match consumer.recv().await { Err(e) => warn!("Kafka error: {}", e), Ok(m) => { - let payload = match m.payload_view::() { - None => "", - Some(Ok(s)) => s, - Some(Err(e)) => { - warn!("Error while deserializing message payload: {:?}", e); - "" - } + let max_offset = match max_offset_map.get(&m.partition()) { + Some(max) => *max, + None => 0, }; - trace!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}", - m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp()); - let headers = if let Some(headers) = m.headers() { - let mut header_list: Vec = vec![]; - for header in headers.iter() { - let h = KrustHeader { - key: header.key.to_string(), - value: header - .value - .map(|v| String::from_utf8(v.to_vec()).unwrap_or_default()), - }; - header_list.push(h); - } - header_list - } else { - vec![] - }; - let message = KrustMessage { - topic: m.topic().to_string(), - partition: m.partition(), - offset: m.offset(), - timestamp: m.timestamp().to_millis(), - value: payload.to_string(), - headers, - }; - - messages.push(message); - counter += 1; + if m.offset() <= max_offset { + let payload = match m.payload_view::() { + None => "", + Some(Ok(s)) => s, + Some(Err(e)) => { + warn!("Error while deserializing message payload: {:?}", e); + "" + } + }; + let key = match m.key_view::() { + None => "", + Some(Ok(s)) => s, + Some(Err(e)) => { + warn!("Error while deserializing message key: {:?}", e); + "" + } + }; + trace!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}", + key, payload, m.topic(), m.partition(), m.offset(), m.timestamp()); + let headers = if let Some(headers) = m.headers() { + let mut header_list: Vec = vec![]; + for header in headers.iter() { + let h = KrustHeader { + key: header.key.to_string(), + value: header + .value + .map(|v| String::from_utf8(v.to_vec()).unwrap_or_default()), + }; + header_list.push(h); + } + header_list + } else { + vec![] + }; + let message = KrustMessage { + topic: m.topic().to_string(), + partition: m.partition(), + offset: m.offset(), + key: key.to_string(), + timestamp: m.timestamp().to_millis(), + value: payload.to_string(), + headers, + }; + + messages.push(message); + counter += 1; + } } }; } diff --git a/src/backend/repository.rs b/src/backend/repository.rs index fae7358..c73663e 100644 --- a/src/backend/repository.rs +++ b/src/backend/repository.rs @@ -63,6 +63,7 @@ pub struct KrustMessage { pub topic: String, pub partition: i32, pub offset: i64, + pub key: String, pub value: String, pub timestamp: Option, pub headers: Vec, @@ -106,10 +107,19 @@ impl MessagesRepository { .unwrap(); conn } + pub fn get_init_connection(&mut self) -> Connection { + let conn = database_connection_with_name(&self.path, &self.database_name) + .expect("problem acquiring database connection"); + conn + } pub fn init(&mut self) -> Result<(), ExternalError> { - self.get_connection().execute_batch( - "CREATE TABLE IF NOT EXISTS kr_message (partition INTEGER, offset INTEGER, value TEXT, timestamp INTEGER, headers TEXT, PRIMARY KEY (partition, offset));" - ).map_err(ExternalError::DatabaseError) + let result = self.get_init_connection().execute_batch( + "CREATE TABLE IF NOT EXISTS kr_message (partition INTEGER, offset INTEGER, key TEXT, value TEXT, timestamp INTEGER, headers TEXT, PRIMARY KEY (partition, offset));" + ).map_err(ExternalError::DatabaseError); + let _ = self.get_init_connection().execute_batch( + "ALTER TABLE kr_message ADD COLUMN key TEXT;" + ).ok(); + result } pub fn destroy(&mut self) -> Result<(), ExternalError> { @@ -123,14 +133,14 @@ impl MessagesRepository { ) -> Result { //let conn = self.get_connection(); let mut stmt_by_id = conn.prepare_cached( - "INSERT INTO kr_message(partition, offset, value, timestamp, headers) - VALUES (:p, :o, :v, :t, :h)", + "INSERT INTO kr_message(partition, offset, key, value, timestamp, headers) + VALUES (:p, :o, :k, :v, :t, :h)", )?; let headers = ron::ser::to_string::>(message.headers.as_ref()).unwrap_or_default(); let maybe_message = stmt_by_id .execute( - named_params! { ":p": message.partition, ":o": message.offset, ":v": message.value, ":t": message.timestamp, ":h": headers}, + named_params! { ":p": message.partition, ":o": message.offset, ":k": message.key, ":v": message.value, ":t": message.timestamp, ":h": headers}, ) .map_err(ExternalError::DatabaseError); @@ -201,14 +211,14 @@ impl MessagesRepository { let conn = self.get_connection(); let mut stmt_query = match search { Some(_) => conn.prepare_cached( - "SELECT partition, offset, value, timestamp, headers + "SELECT partition, offset, key, value, timestamp, headers FROM kr_message WHERE value LIKE :search ORDER BY offset, partition LIMIT :ps", )?, None => conn.prepare_cached( - "SELECT partition, offset, value, timestamp, headers + "SELECT partition, offset, key, value, timestamp, headers FROM kr_message ORDER BY offset, partition LIMIT :ps", @@ -224,9 +234,10 @@ impl MessagesRepository { Ok(KrustMessage { partition: row.get(0)?, offset: row.get(1)?, - value: row.get(2)?, - timestamp: Some(row.get(3)?), - headers: string_to_headers(row.get(4)?)?, + key: row.get(2)?, + value: row.get(3)?, + timestamp: Some(row.get(4)?), + headers: string_to_headers(row.get(5)?)?, topic: topic_name.clone(), }) }; @@ -259,7 +270,7 @@ impl MessagesRepository { let conn = self.get_connection(); let mut stmt_query = match search { Some(_) => conn.prepare_cached( - "SELECT partition, offset, value, timestamp, headers + "SELECT partition, offset, key, value, timestamp, headers FROM kr_message WHERE (offset, partition) > (:o, :p) AND value LIKE :search @@ -267,7 +278,7 @@ impl MessagesRepository { LIMIT :ps", )?, None => conn.prepare_cached( - "SELECT partition, offset, value, timestamp, headers + "SELECT partition, offset, key, value, timestamp, headers FROM kr_message WHERE (offset, partition) > (:o, :p) ORDER BY offset, partition @@ -284,9 +295,10 @@ impl MessagesRepository { Ok(KrustMessage { partition: row.get(0)?, offset: row.get(1)?, - value: row.get(2)?, - timestamp: Some(row.get(3)?), - headers: string_to_headers(row.get(4)?)?, + key: row.get(2)?, + value: row.get(3)?, + timestamp: Some(row.get(4)?), + headers: string_to_headers(row.get(5)?)?, topic: topic_name.clone(), }) }; @@ -318,8 +330,8 @@ impl MessagesRepository { let conn = self.get_connection(); let mut stmt_query = match search { Some(_) => conn.prepare_cached( - "SELECT partition, offset, value, timestamp, headers FROM ( - SELECT partition, offset, value, timestamp, headers + "SELECT partition, offset, key, value, timestamp, headers FROM ( + SELECT partition, offset, key, value, timestamp, headers FROM kr_message WHERE (offset, partition) < (:o, :p) AND value LIKE :search @@ -328,8 +340,8 @@ impl MessagesRepository { ) ORDER BY offset ASC, partition ASC", )?, None => conn.prepare_cached( - "SELECT partition, offset, value, timestamp, headers FROM ( - SELECT partition, offset, value, timestamp, headers + "SELECT partition, offset, key, value, timestamp, headers FROM ( + SELECT partition, offset, key, value, timestamp, headers FROM kr_message WHERE (offset, partition) < (:o, :p) ORDER BY offset DESC, partition DESC @@ -347,9 +359,10 @@ impl MessagesRepository { Ok(KrustMessage { partition: row.get(0)?, offset: row.get(1)?, - value: row.get(2)?, - timestamp: Some(row.get(3)?), - headers: string_to_headers(row.get(4)?)?, + key: row.get(2)?, + value: row.get(3)?, + timestamp: Some(row.get(4)?), + headers: string_to_headers(row.get(5)?)?, topic: topic_name.clone(), }) }; @@ -577,17 +590,19 @@ impl Repository { topic, partition, offset, + key, value, timestamp, headers, } = message; - let mut insert_stmt = self.conn.prepare_cached("INSERT INTO kr_message (connection, topic, partition, offset, value, timestamp) VALUES (?, ?, ?, ?, ?, ?) RETURNING id")?; + let mut insert_stmt = self.conn.prepare_cached("INSERT INTO kr_message (connection, topic, partition, offset, key, value, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id")?; let result = insert_stmt .query_row(params![], |_row| { Ok(KrustMessage { topic, partition, offset, + key, value, timestamp, headers, diff --git a/src/backend/worker.rs b/src/backend/worker.rs index 425b995..6f0163b 100644 --- a/src/backend/worker.rs +++ b/src/backend/worker.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use chrono::Utc; use tokio::select; use tokio_util::sync::CancellationToken; @@ -9,7 +7,7 @@ use crate::{config::ExternalError, Repository}; use super::{ kafka::{KafkaBackend, KafkaFetch}, - repository::{KrustConnection, KrustMessage, KrustTopic, MessagesRepository, Partition}, + repository::{KrustConnection, KrustMessage, KrustTopic, MessagesRepository}, }; #[derive(Debug, Clone, Copy, PartialEq, Default, strum::EnumString, strum::Display)] @@ -36,6 +34,8 @@ pub struct MessagesRequest { pub page_size: u16, pub offset_partition: (usize, usize), pub search: Option, + pub fetch: KafkaFetch, + pub max_messages: i64, } #[derive(Debug, Clone)] @@ -116,8 +116,6 @@ impl MessagesWorker { } } - - async fn get_messages_cached( self, request: &MessagesRequest, @@ -151,7 +149,9 @@ impl MessagesWorker { Some(_) => { if refresh { let partitions = mrepo.find_offsets().ok(); - let topic = kafka.topic_message_count(&topic.name, partitions.clone()).await; + let topic = kafka + .topic_message_count(&topic.name, None, None, partitions.clone()) + .await; let partitions = topic.partitions.clone(); let total = topic.total.unwrap_or_default(); kafka @@ -171,7 +171,7 @@ impl MessagesWorker { } None => { let total = kafka - .topic_message_count(&topic.name, None) + .topic_message_count(&topic.name, None, None, None) .await .total .unwrap_or_default(); @@ -230,14 +230,15 @@ impl MessagesWorker { let kafka = KafkaBackend::new(&request.connection); let topic = &request.topic.name; // Run async background task - let total = kafka - .topic_message_count(topic, None) - .await - .total - .unwrap_or_default(); - let messages = kafka.list_messages_for_topic(topic, total).await?; + let messages = kafka + .list_messages_for_topic( + topic, + Some(request.fetch.clone()), + Some(request.max_messages), + ) + .await?; Ok(MessagesResponse { - total, + total: messages.len(), messages: messages, topic: Some(request.topic.clone()), page_operation: request.page_operation, diff --git a/src/component/app.rs b/src/component/app.rs index 2b9c078..2abb0e7 100644 --- a/src/component/app.rs +++ b/src/component/app.rs @@ -45,7 +45,6 @@ pub enum AppMsg { SavedSettings, } -#[derive(Debug)] pub struct AppModel { //state: State, _status_bar: Controller, diff --git a/src/component/messages/lists.rs b/src/component/messages/lists.rs index aa7ff09..53660da 100644 --- a/src/component/messages/lists.rs +++ b/src/component/messages/lists.rs @@ -113,7 +113,7 @@ impl MessageListItem { Self { offset: value.offset, partition: value.partition, - key: String::default(), + key: value.key, value: value.value, timestamp: value.timestamp, headers: value.headers, @@ -203,12 +203,12 @@ impl LabelColumn for MessageValueColumn { } fn format_cell_value(value: &Self::Value) -> String { - if value.len() >= 150 { + if value.len() >= 100 { format!( "{}...", value .replace('\n', " ") - .get(0..150) + .get(0..100) .unwrap_or("") ) } else { @@ -216,5 +216,34 @@ impl LabelColumn for MessageValueColumn { } } } +pub struct MessageKeyColumn; + +impl LabelColumn for MessageKeyColumn { + type Item = MessageListItem; + type Value = String; + + const COLUMN_NAME: &'static str = "Key"; + const ENABLE_RESIZE: bool = true; + const ENABLE_EXPAND: bool = true; + const ENABLE_SORT: bool = false; + + fn get_cell_value(item: &Self::Item) -> Self::Value { + item.key.clone() + } + + fn format_cell_value(value: &Self::Value) -> String { + if value.len() >= 40 { + format!( + "{}...", + value + .replace('\n', " ") + .get(0..40) + .unwrap_or("") + ) + } else { + format!("{}", value) + } + } +} // Table messages: end diff --git a/src/component/messages/messages_page.rs b/src/component/messages/messages_page.rs index af639f8..4557cc6 100644 --- a/src/component/messages/messages_page.rs +++ b/src/component/messages/messages_page.rs @@ -11,7 +11,6 @@ use sourceview5 as sourceview; use super::messages_tab::{MessagesTabInit, MessagesTabModel}; -#[derive(Debug)] pub struct MessagesPageModel { topic: Option, connection: Option, diff --git a/src/component/messages/messages_tab.rs b/src/component/messages/messages_tab.rs index 12221d5..da376a6 100644 --- a/src/component/messages/messages_tab.rs +++ b/src/component/messages/messages_tab.rs @@ -2,9 +2,16 @@ // See: https://gitlab.gnome.org/GNOME/gtk/-/issues/5644 use chrono::{TimeZone, Utc}; use chrono_tz::America; -use gtk::{gdk::DisplayManager, ColumnViewSorter}; +use csv::StringRecord; +use gtk::{ + gdk::{DisplayManager, Rectangle}, + ColumnViewSorter, +}; use relm4::{ - actions::{RelmAction, RelmActionGroup}, factory::{DynamicIndex, FactoryComponent}, typed_view::column::TypedColumnView, * + actions::{RelmAction, RelmActionGroup}, + factory::{DynamicIndex, FactoryComponent}, + typed_view::column::TypedColumnView, + *, }; use relm4_components::simple_combo_box::SimpleComboBox; use sourceview::prelude::*; @@ -14,6 +21,7 @@ use tracing::{debug, info, trace}; use crate::{ backend::{ + kafka::KafkaFetch, repository::{KrustConnection, KrustMessage, KrustTopic}, worker::{ MessagesCleanupRequest, MessagesMode, MessagesRequest, MessagesResponse, @@ -31,11 +39,17 @@ use crate::{ Repository, DATE_TIME_FORMAT, }; +use super::lists::MessageKeyColumn; + // page actions relm4::new_action_group!(pub MessagesPageActionGroup, "messages_page"); relm4::new_stateless_action!(pub MessagesSearchAction, MessagesPageActionGroup, "search"); -#[derive(Debug)] +relm4::new_action_group!(pub(super) MessagesListActionGroup, "messages-list"); +relm4::new_stateless_action!(pub(super) CopyMessagesAsCsv, MessagesListActionGroup, "copy-messages-as-csv"); +relm4::new_stateless_action!(pub(super) CopyMessagesValue, MessagesListActionGroup, "copy-messages-value"); +relm4::new_stateless_action!(pub(super) CopyMessagesKey, MessagesListActionGroup, "copy-messages-key"); + pub struct MessagesTabModel { token: CancellationToken, pub topic: Option, @@ -45,12 +59,22 @@ pub struct MessagesTabModel { headers_wrapper: TypedColumnView, page_size_combo: Controller>, page_size: u16, + fetch_type_combo: Controller>, + fetch_type: KafkaFetch, + max_messages: f64, + messages_menu_popover: gtk::PopoverMenu, } pub struct MessagesTabInit { pub topic: KrustTopic, pub connection: KrustConnection, } +#[derive(Debug)] +pub enum Copy { + AllAsCsv, + Value, + Key, +} #[derive(Debug)] pub enum MessagesTabMsg { @@ -66,7 +90,10 @@ pub enum MessagesTabMsg { LiveSearchMessages(String), Selection(u32), PageSizeChanged(usize), + FetchTypeChanged(usize), ToggleMode(bool), + DigitsOnly(f64), + CopyMessages(Copy), } #[derive(Debug)] @@ -84,6 +111,16 @@ impl FactoryComponent for MessagesTabModel { type CommandOutput = CommandMsg; type ParentWidget = adw::TabView; + menu! { + messages_menu: { + section! { + "_Copy as CSV" => CopyMessagesAsCsv, + "_Copy value" => CopyMessagesValue, + "_Copy key" => CopyMessagesKey, + } + } + } + view! { #[root] gtk::Paned { @@ -94,6 +131,9 @@ impl FactoryComponent for MessagesTabModel { set_orientation: gtk::Orientation::Vertical, set_hexpand: true, set_vexpand: true, + container_add = &self.messages_menu_popover.clone() { + set_menu_model: Some(&messages_menu), + }, gtk::CenterBox { set_orientation: gtk::Orientation::Horizontal, set_halign: gtk::Align::Fill, @@ -159,13 +199,8 @@ impl FactoryComponent for MessagesTabModel { sender.input(MessagesTabMsg::SearchMessages); }, }, - #[name(messages_search_btn)] - gtk::Button { - set_icon_name: "edit-find-symbolic", + self.fetch_type_combo.widget() -> >k::ComboBoxText { set_margin_start: 5, - connect_clicked[sender] => move |_| { - sender.input(MessagesTabMsg::SearchMessages); - }, }, }, }, @@ -180,7 +215,7 @@ impl FactoryComponent for MessagesTabModel { set_show_column_separators: true, set_single_click_activate: false, set_enable_rubberband: true, - } + }, }, }, #[wrap(Some)] @@ -345,19 +380,27 @@ impl FactoryComponent for MessagesTabModel { set_halign: gtk::Align::Start, set_hexpand: true, gtk::Label { - set_label: "Max messages", + set_label: "Max messages (per partition)", set_margin_start: 5, }, #[name(max_messages)] - gtk::Entry { + gtk::SpinButton { set_margin_start: 5, set_width_chars: 10, + set_numeric: true, + set_increments: (1000.0, 10000.0), + set_range: (1.0, 100000.0), + set_value: self.max_messages, + set_digits: 0, + connect_value_changed[sender] => move |sbtn| { + sender.input(MessagesTabMsg::DigitsOnly(sbtn.value())); + }, }, }, }, }, }, - } + }, } @@ -366,6 +409,7 @@ impl FactoryComponent for MessagesTabModel { let mut messages_wrapper = TypedColumnView::::new(); messages_wrapper.append_column::(); messages_wrapper.append_column::(); + messages_wrapper.append_column::(); messages_wrapper.append_column::(); messages_wrapper.append_column::(); @@ -381,6 +425,37 @@ impl FactoryComponent for MessagesTabModel { }) .forward(sender.input_sender(), MessagesTabMsg::PageSizeChanged); page_size_combo.widget().queue_allocate(); + let fetch_type_combo = SimpleComboBox::builder() + .launch(SimpleComboBox { + variants: KafkaFetch::VALUES.to_vec(), + active_index: Some(default_idx), + }) + .forward(sender.input_sender(), MessagesTabMsg::FetchTypeChanged); + + let messages_popover_menu = gtk::PopoverMenu::builder().build(); + let mut messages_actions = RelmActionGroup::::new(); + let messages_menu_sender = sender.input_sender().clone(); + let menu_copy_all_csv_action = RelmAction::::new_stateless(move |_| { + messages_menu_sender + .send(MessagesTabMsg::CopyMessages(Copy::AllAsCsv)) + .unwrap(); + }); + let messages_menu_sender = sender.input_sender().clone(); + let menu_copy_value_action = RelmAction::::new_stateless(move |_| { + messages_menu_sender + .send(MessagesTabMsg::CopyMessages(Copy::Value)) + .unwrap(); + }); + let messages_menu_sender = sender.input_sender().clone(); + let menu_copy_key_action = RelmAction::::new_stateless(move |_| { + messages_menu_sender + .send(MessagesTabMsg::CopyMessages(Copy::Key)) + .unwrap(); + }); + messages_actions.add_action(menu_copy_all_csv_action); + messages_actions.add_action(menu_copy_value_action); + messages_actions.add_action(menu_copy_key_action); + messages_actions.register_for_widget(&messages_popover_menu); let model = MessagesTabModel { token: CancellationToken::new(), mode: MessagesMode::Live, @@ -390,15 +465,19 @@ impl FactoryComponent for MessagesTabModel { headers_wrapper, page_size_combo, page_size: AVAILABLE_PAGE_SIZES[0], + fetch_type_combo, + fetch_type: KafkaFetch::default(), + max_messages: 1000.0, + messages_menu_popover: messages_popover_menu, }; let messages_view = &model.messages_wrapper.view; let _headers_view = &model.headers_wrapper.view; - let sender_for_selection = sender.clone(); + let _sender_for_selection = sender.clone(); messages_view .model() .unwrap() - .connect_selection_changed(move |selection_model, _, _| { - sender_for_selection.input(MessagesTabMsg::Selection(selection_model.n_items())); + .connect_selection_changed(move |_selection_model, _, _| { + //sender_for_selection.input(MessagesTabMsg::Selection(selection_model.n_items())); }); let sender_for_activate = sender.clone(); messages_view.connect_activate(move |_view, idx| { @@ -441,20 +520,38 @@ impl FactoryComponent for MessagesTabModel { } let language = sourceview::LanguageManager::default().language("json"); buffer.set_language(language.as_ref()); - + widgets.max_messages.set_increments(1000.0, 10000.0); // Shortcuts - let mut actions = RelmActionGroup::::new(); + // let mut actions = RelmActionGroup::::new(); - let messages_search_entry = widgets.messages_search_entry.clone(); - let search_action = { - let messages_search_btn = widgets.messages_search_btn.clone(); - RelmAction::::new_stateless(move |_| { - messages_search_btn.emit_clicked(); - }) - }; - actions.add_action(search_action); - actions.register_for_widget(messages_search_entry); + // let messages_search_entry = widgets.messages_search_entry.clone(); + // let search_action = { + // let messages_search_btn = widgets.messages_search_btn.clone(); + // RelmAction::::new_stateless(move |_| { + // messages_search_btn.emit_clicked(); + // }) + // }; + // actions.add_action(search_action); + // actions.register_for_widget(messages_search_entry); + + //self.messages_menu_popover.set_menu_model(widgets.menu) + // Create a click gesture + let gesture = gtk::GestureClick::new(); + // Set the gestures button to the right mouse button (=3) + gesture.set_button(gtk::gdk::ffi::GDK_BUTTON_SECONDARY as u32); + + // Assign your handler to an event of the gesture (e.g. the `pressed` event) + let messages_menu = self.messages_menu_popover.clone(); + gesture.connect_pressed(move |gesture, _n, x, y| { + gesture.set_state(gtk::EventSequenceState::Claimed); + let x = x as i32; + let y = y as i32; + info!("ColumnView: Right mouse button pressed [x={},y={}]", x, y); + messages_menu.set_pointing_to(Some(&Rectangle::new(x, y + 55, 1, 1))); + messages_menu.popup(); + }); + self.messages_wrapper.view.add_controller(gesture); } fn update_with_view( @@ -464,6 +561,10 @@ impl FactoryComponent for MessagesTabModel { sender: FactorySender, ) { match msg { + MessagesTabMsg::DigitsOnly(value) => { + self.max_messages = value; + info!("Max messages:{}", self.max_messages); + } MessagesTabMsg::ToggleMode(toggle) => { self.mode = if toggle { widgets.cached_controls.set_visible(true); @@ -501,6 +602,14 @@ impl FactoryComponent for MessagesTabModel { self.page_size = page_size; self.page_size_combo.widget().queue_allocate(); } + MessagesTabMsg::FetchTypeChanged(_idx) => { + let fetch_type = match self.fetch_type_combo.model().get_active_elem() { + Some(ps) => ps.clone(), + None => KafkaFetch::default(), + }; + self.fetch_type = fetch_type; + self.fetch_type_combo.widget().queue_allocate(); + } MessagesTabMsg::Selection(size) => { let mut copy_content = String::from("PARTITION;OFFSET;VALUE;TIMESTAMP"); let min_length = copy_content.len(); @@ -535,6 +644,22 @@ impl FactoryComponent for MessagesTabModel { .set_text(copy_content.as_str()); } } + MessagesTabMsg::CopyMessages(copy) => { + info!("copy selected messages"); + + let data = match copy { + Copy::AllAsCsv => copy_all_as_csv(self), + Copy::Value => copy_value(self), + Copy::Key => copy_key(self), + }; + if let Ok(data) = data { + DisplayManager::get() + .default_display() + .unwrap() + .clipboard() + .set_text(data.as_str()); + } + } MessagesTabMsg::Open(connection, topic) => { let conn_id = &connection.id.unwrap(); let topic_name = &topic.name.clone(); @@ -597,6 +722,8 @@ impl FactoryComponent for MessagesTabModel { let page_size = self.page_size; let token = self.token.clone(); let search = get_search_term(widgets); + let fetch = self.fetch_type.clone(); + let max_messages: i64 = self.max_messages as i64; widgets.pag_current_entry.set_text("0"); sender.oneshot_command(async move { // Run async background task @@ -612,6 +739,8 @@ impl FactoryComponent for MessagesTabModel { page_size, offset_partition: (0, 0), search: search, + fetch, + max_messages, }, ) .await @@ -647,6 +776,8 @@ impl FactoryComponent for MessagesTabModel { ); let search = get_search_term(widgets); let token = self.token.clone(); + let fetch = self.fetch_type.clone(); + let max_messages: i64 = self.max_messages as i64; info!( "getting next messages [page_size={}, last_offset={}, last_partition={}]", page_size, offset, partition @@ -665,6 +796,8 @@ impl FactoryComponent for MessagesTabModel { page_size, offset_partition: (offset, partition), search: search, + fetch, + max_messages, }, ) .await @@ -700,6 +833,8 @@ impl FactoryComponent for MessagesTabModel { ); let token = self.token.clone(); let search = get_search_term(widgets); + let fetch = self.fetch_type.clone(); + let max_messages: i64 = self.max_messages as i64; sender.oneshot_command(async move { // Run async background task let messages_worker = MessagesWorker::new(); @@ -714,6 +849,8 @@ impl FactoryComponent for MessagesTabModel { page_size, offset_partition: (offset, partition), search: search, + fetch, + max_messages, }, ) .await @@ -837,6 +974,7 @@ fn on_loading(widgets: &mut MessagesTabModelWidgets, enabled: bool) { widgets.btn_get_messages.set_sensitive(enabled); widgets.btn_cache_refresh.set_sensitive(enabled); widgets.btn_cache_toggle.set_sensitive(enabled); + widgets.max_messages.set_sensitive(enabled); } fn fill_pagination( @@ -900,3 +1038,65 @@ fn fill_pagination( } } } + +fn copy_all_as_csv(model: &mut MessagesTabModel) -> Result { + let mut wtr = csv::WriterBuilder::new() + .delimiter(b';') + .quote_style(csv::QuoteStyle::NonNumeric) + .from_writer(vec![]); + let _ = wtr.write_record(&["PARTITION", "OFFSET", "KEY", "VALUE", "TIMESTAMP"]); + for i in 0..model.messages_wrapper.selection_model.n_items() { + if model.messages_wrapper.selection_model.is_selected(i) { + let item = model.messages_wrapper.get_visible(i).unwrap(); + let partition = item.borrow().partition; + let offset = item.borrow().offset; + let key = item.borrow().key.clone(); + let value = item.borrow().value.clone(); + let clean_value = match serde_json::from_str::(value.as_str()) { + Ok(json) => json.to_string(), + Err(_) => value.replace('\n', ""), + }; + let timestamp = item.borrow().timestamp; + let record = StringRecord::from(vec![ + partition.to_string(), + offset.to_string(), + key, + clean_value, + timestamp.unwrap_or_default().to_string(), + ]); + let _ = wtr.write_record(&record); + } + } + let data = String::from_utf8(wtr.into_inner().unwrap_or_default()); + data +} +fn copy_value(model: &mut MessagesTabModel) -> Result { + let mut copy_content = String::default(); + for i in 0..model.messages_wrapper.selection_model.n_items() { + if model.messages_wrapper.selection_model.is_selected(i) { + let item = model.messages_wrapper.get_visible(i).unwrap(); + let value = item.borrow().value.clone(); + let clean_value = match serde_json::from_str::(value.as_str()) { + Ok(json) => json.to_string(), + Err(_) => value.replace('\n', ""), + }; + let copy_text = format!( + "{}\n", clean_value); + copy_content.push_str(©_text.as_str()); + } + } + Ok(copy_content) +} +fn copy_key(model: &mut MessagesTabModel) -> Result { + let mut copy_content = String::default(); + for i in 0..model.messages_wrapper.selection_model.n_items() { + if model.messages_wrapper.selection_model.is_selected(i) { + let item = model.messages_wrapper.get_visible(i).unwrap(); + let key = item.borrow().key.clone(); + let copy_text = format!( + "{}\n", key); + copy_content.push_str(©_text.as_str()); + } + } + Ok(copy_content) +} diff --git a/src/main.rs b/src/main.rs index 0f01b79..336f381 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,6 @@ use gtk::gdk; //#![windows_subsystem = "windows"] use gtk::prelude::ApplicationExt; use gtk::gio; -use krust::MessagesSearchAction; use tracing::*; use tracing_subscriber::filter; use tracing_subscriber::prelude::*; @@ -34,7 +33,7 @@ fn main() -> Result<(), ()> { // Enable the `INFO` level for anything in `my_crate` .with_target("relm4", Level::WARN) // Enable the `DEBUG` level for a specific module. - .with_target("krust", Level::DEBUG); + .with_target("krust", Level::TRACE); tracing_subscriber::registry() .with(HierarchicalLayer::new(2)) .with(EnvFilter::from_default_env())