Skip to content

Commit

Permalink
Messages page improvements.
Browse files Browse the repository at this point in the history
  - Live: supporting fetch types: `Newest|Oldest`.
  - Live: supporting max messages per partition.
  - UI improvements.
  - Adding message key to the list and cache.
  - Messages copy: popover menu with copy commands, including
  CSV format.
  • Loading branch information
Miguel Aranha Baldi Horlle committed Apr 24, 2024
1 parent 9f88c0a commit c1d405b
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 119 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ chrono = { version = "0.4.37", features = ["serde"] }
chrono-tz = { version = "0.9.0", features = [ "filter-by-regex" ] }
strum = { version = "0.26.2", features = ["derive"] }
rdkafka = { version = "0.36.2", features = [ "cmake-build", "gssapi", "ssl" ] }
csv = "1.3.0"

[target.'cfg(target_os = "windows")'.dependencies]
sasl2-sys = { version = "0.1.20",features = ["openssl-vendored"]}
Expand Down
201 changes: 154 additions & 47 deletions src/backend/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use rdkafka::message::Headers;
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::{Message, Offset};

use std::borrow::Borrow;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tracing::{debug, info, trace, warn};
Expand Down Expand Up @@ -41,13 +40,17 @@ type LoggingConsumer = StreamConsumer<CustomContext>;

// rdkafka: end

#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, strum::EnumString, strum::Display)]
pub enum KafkaFetch {
#[default]
Newest,
Oldest,
}

impl KafkaFetch {
pub const VALUES: [Self; 2] = [Self::Newest, Self::Oldest];
}

#[derive(Debug, Clone)]
pub struct KafkaBackend {
pub config: KrustConnection,
Expand Down Expand Up @@ -182,14 +185,21 @@ impl KafkaBackend {
pub async fn topic_message_count(
&self,
topic: &String,
fetch: Option<KafkaFetch>,
max_messages: Option<i64>,
current_partitions: Option<Vec<Partition>>,
) -> KrustTopic {
info!("couting messages for topic {}", topic);
info!(
"couting messages for topic {}, fetch {:?}, max messages {:?}",
topic, fetch, max_messages
);

let mut message_count: usize = 0;
let partitions = &self.fetch_partitions(topic).await;
let mut result = current_partitions.clone().unwrap_or_default();
let cpartitions = &current_partitions.unwrap_or_default().clone();
let fetch = fetch.unwrap_or_default();
let max_messages: i64 = max_messages.unwrap_or_default();

let part_map = cpartitions
.into_iter()
Expand All @@ -212,8 +222,56 @@ impl KafkaBackend {
message_count += usize::try_from(p.offset_high.unwrap_or_default()).unwrap()
- usize::try_from(low).unwrap();
} else {
message_count += usize::try_from(p.offset_high.unwrap_or_default()).unwrap()
- usize::try_from(p.offset_low.unwrap_or_default()).unwrap();
let (low, high) = match fetch {
KafkaFetch::Newest => {
let low = p.offset_high.unwrap_or_default() - max_messages;
debug!(
"Newest::[low={},new_low={},high={},max={}]",
p.offset_low.unwrap_or_default(),
low,
p.offset_high.unwrap_or_default(),
max_messages
);
if max_messages > 0
&& p.offset_high.unwrap_or_default() >= max_messages
&& low >= p.offset_low.unwrap_or_default()
{
(low, p.offset_high.unwrap_or_default())
} else {
(
p.offset_low.unwrap_or_default(),
p.offset_high.unwrap_or_default(),
)
}
}
KafkaFetch::Oldest => {
let high = p.offset_low.unwrap_or_default() + max_messages;
debug!(
"Oldest::[low={},high={},new_high={},max={}]",
p.offset_low.unwrap_or_default(),
p.offset_high.unwrap_or_default(),
high,
max_messages
);
if max_messages > 0
&& p.offset_low.unwrap_or_default() < high
&& high <= p.offset_high.unwrap_or_default()
{
(p.offset_low.unwrap_or_default(), high)
} else {
(
p.offset_low.unwrap_or_default(),
p.offset_high.unwrap_or_default(),
)
}
}
};
result.push(Partition {
id: p.id,
offset_low: Some(low),
offset_high: Some(high),
});
message_count += usize::try_from(high).unwrap() - usize::try_from(low).unwrap();
};
}

Expand All @@ -222,7 +280,7 @@ impl KafkaBackend {
connection_id: None,
name: topic.clone(),
cached: None,
partitions: if !cpartitions.is_empty() {
partitions: if !result.is_empty() {
result
} else {
partitions.clone()
Expand Down Expand Up @@ -288,8 +346,16 @@ impl KafkaBackend {
""
}
};
let key = match m.key_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
warn!("Error while deserializing message key: {:?}", e);
""
}
};
trace!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
key, payload, m.topic(), m.partition(), m.offset(), m.timestamp());
let headers = if let Some(headers) = m.headers() {
let mut header_list: Vec<KrustHeader> = vec![];
for header in headers.iter() {
Expand All @@ -309,13 +375,18 @@ impl KafkaBackend {
topic: m.topic().to_string(),
partition: m.partition(),
offset: m.offset(),
key: key.to_string(),
timestamp: m.timestamp().to_millis(),
value: payload.to_string(),
headers,
};
match mrepo.save_message(&conn, &message) {
Ok(_) => trace!("message with offset {} saved", &message.offset),
Err(err) => warn!("unable to save message with offset {}: {}", &message.offset, err.to_string()),
Err(err) => warn!(
"unable to save message with offset {}: {}",
&message.offset,
err.to_string()
),
};
counter += 1;
}
Expand All @@ -331,7 +402,8 @@ impl KafkaBackend {
pub async fn list_messages_for_topic(
&self,
topic: &String,
total: usize,
fetch: Option<KafkaFetch>,
max_messages: Option<i64>,
) -> Result<Vec<KrustMessage>, ExternalError> {
let start_mark = Instant::now();
info!("starting listing messages for topic {}", topic);
Expand All @@ -341,51 +413,86 @@ impl KafkaBackend {

let mut counter = 0;

info!("consumer created");
let topic = self
.topic_message_count(topic, fetch.clone(), max_messages, None)
.await;
let total = topic.total.clone().unwrap_or_default();
let partitions = topic.partitions.clone();

let max_offset_map = partitions
.clone()
.into_iter()
.map(|p| (p.id, p.offset_high.clone().unwrap_or_default()))
.collect::<HashMap<_, _>>();

let mut partition_list = TopicPartitionList::with_capacity(partitions.capacity());
for p in partitions.iter() {
let offset = Offset::from_raw(p.offset_low.unwrap_or_default());
partition_list
.add_partition_offset(topic_name, p.id, offset)
.unwrap();
}
info!("seeking partitions\n{:?}", partition_list);
consumer
.subscribe(&[topic_name])
.expect("Can't subscribe to specified topics");
.assign(&partition_list)
.expect("Can't subscribe to partition list");

let mut messages: Vec<KrustMessage> = Vec::with_capacity(total);
while counter < total {
match consumer.recv().await {
Err(e) => warn!("Kafka error: {}", e),
Ok(m) => {
let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
warn!("Error while deserializing message payload: {:?}", e);
""
}
let max_offset = match max_offset_map.get(&m.partition()) {
Some(max) => *max,
None => 0,
};
trace!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
let headers = if let Some(headers) = m.headers() {
let mut header_list: Vec<KrustHeader> = vec![];
for header in headers.iter() {
let h = KrustHeader {
key: header.key.to_string(),
value: header
.value
.map(|v| String::from_utf8(v.to_vec()).unwrap_or_default()),
};
header_list.push(h);
}
header_list
} else {
vec![]
};
let message = KrustMessage {
topic: m.topic().to_string(),
partition: m.partition(),
offset: m.offset(),
timestamp: m.timestamp().to_millis(),
value: payload.to_string(),
headers,
};

messages.push(message);
counter += 1;
if m.offset() <= max_offset {
let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
warn!("Error while deserializing message payload: {:?}", e);
""
}
};
let key = match m.key_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
warn!("Error while deserializing message key: {:?}", e);
""
}
};
trace!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
key, payload, m.topic(), m.partition(), m.offset(), m.timestamp());
let headers = if let Some(headers) = m.headers() {
let mut header_list: Vec<KrustHeader> = vec![];
for header in headers.iter() {
let h = KrustHeader {
key: header.key.to_string(),
value: header
.value
.map(|v| String::from_utf8(v.to_vec()).unwrap_or_default()),
};
header_list.push(h);
}
header_list
} else {
vec![]
};
let message = KrustMessage {
topic: m.topic().to_string(),
partition: m.partition(),
offset: m.offset(),
key: key.to_string(),
timestamp: m.timestamp().to_millis(),
value: payload.to_string(),
headers,
};

messages.push(message);
counter += 1;
}
}
};
}
Expand Down
Loading

0 comments on commit c1d405b

Please sign in to comment.