Skip to content

Commit

Permalink
Messages cache fix.
Browse files Browse the repository at this point in the history
  - Refactoring `refresh` message counter.
  • Loading branch information
Miguel Aranha Baldi Horlle committed Apr 21, 2024
1 parent 6cc7c11 commit f836538
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 44 deletions.
82 changes: 71 additions & 11 deletions src/backend/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use rdkafka::message::Headers;
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::{Message, Offset};

use std::borrow::Borrow;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tracing::{debug, info, trace, warn};

Expand Down Expand Up @@ -134,13 +136,14 @@ impl KafkaBackend {
name: topic.name().to_string(),
cached: None,
partitions,
total: None,
});
}
topics
}

pub async fn topic_message_count(&self, topic: &String) -> usize {
info!("couting messages for topic {}", topic);
pub async fn fetch_partitions(&self, topic: &String) -> Vec<Partition> {
info!("fetching partitions from topic {}", topic);
let context = CustomContext;
let consumer: LoggingConsumer = self.consumer(context).expect("Consumer creation failed");

Expand All @@ -150,7 +153,7 @@ impl KafkaBackend {
.fetch_metadata(Some(topic.as_str()), TIMEOUT)
.expect("Failed to fetch metadata");

let mut message_count: usize = 0;
let mut partitions = vec![];
match metadata.topics().first() {
Some(t) => {
for partition in t.partitions() {
Expand All @@ -163,13 +166,69 @@ impl KafkaBackend {
high,
high - low
);
message_count += usize::try_from(high).unwrap() - usize::try_from(low).unwrap();
let part = Partition {
id: partition.id(),
offset_low: Some(low),
offset_high: Some(high),
};
partitions.push(part);
}
}
None => warn!(""),
}
partitions
}

pub async fn topic_message_count(
&self,
topic: &String,
current_partitions: Option<Vec<Partition>>,
) -> KrustTopic {
info!("couting messages for topic {}", topic);

let mut message_count: usize = 0;
let partitions = &self.fetch_partitions(topic).await;
let mut result = current_partitions.clone().unwrap_or_default();
let cpartitions = &current_partitions.unwrap_or_default().clone();

let part_map = cpartitions
.into_iter()
.map(|p| (p.id, p.clone()))
.collect::<HashMap<_, _>>();

for p in partitions {
if !cpartitions.is_empty() {
let low = match part_map.get(&p.id) {
Some(part) => part.offset_high.unwrap_or(p.offset_low.unwrap()),
None => {
result.push(Partition {
id: p.id,
offset_low: p.offset_low,
offset_high: None,
});
p.offset_low.unwrap()
}
};
message_count += usize::try_from(p.offset_high.unwrap_or_default()).unwrap()
- usize::try_from(low).unwrap();
} else {
message_count += usize::try_from(p.offset_high.unwrap_or_default()).unwrap()
- usize::try_from(p.offset_low.unwrap_or_default()).unwrap();
};
}

info!("topic {} has {} messages", topic, message_count);
message_count
KrustTopic {
connection_id: None,
name: topic.clone(),
cached: None,
partitions: if !cpartitions.is_empty() {
result
} else {
partitions.clone()
},
total: Some(message_count),
}
}
pub async fn cache_messages_for_topic(
&self,
Expand All @@ -194,10 +253,10 @@ impl KafkaBackend {
let mut partition_list = TopicPartitionList::with_capacity(partitions.capacity());
for p in partitions.iter() {
let offset = match fetch {
KafkaFetch::Newest => {
let latest_offset = p.offset_high.unwrap_or_default() + 1;
Offset::from_raw(latest_offset)
}
KafkaFetch::Newest => p
.offset_high
.map(|oh| Offset::from_raw(oh + 1))
.unwrap_or(Offset::Beginning),
KafkaFetch::Oldest => Offset::Beginning,
};
partition_list
Expand Down Expand Up @@ -230,7 +289,7 @@ impl KafkaBackend {
}
};
trace!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
let headers = if let Some(headers) = m.headers() {
let mut header_list: Vec<KrustHeader> = vec![];
for header in headers.iter() {
Expand All @@ -254,6 +313,7 @@ impl KafkaBackend {
value: payload.to_string(),
headers,
};
trace!("saving message {}", &message.offset);
mrepo.save_message(&conn, &message)?;
counter += 1;
}
Expand Down Expand Up @@ -297,7 +357,7 @@ impl KafkaBackend {
}
};
trace!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
let headers = if let Some(headers) = m.headers() {
let mut header_list: Vec<KrustHeader> = vec![];
for header in headers.iter() {
Expand Down
3 changes: 3 additions & 0 deletions src/backend/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct KrustTopic {
pub name: String,
pub cached: Option<i64>,
pub partitions: Vec<Partition>,
pub total: Option<usize>,
}

impl Display for KrustTopic {
Expand Down Expand Up @@ -510,6 +511,7 @@ impl Repository {
name: topic.name.clone(),
cached,
partitions: vec![],
total: None,
})
};

Expand Down Expand Up @@ -549,6 +551,7 @@ impl Repository {
name: row.get(1)?,
cached: row.get(2)?,
partitions: vec![],
total: None,
})
},
)
Expand Down
85 changes: 52 additions & 33 deletions src/backend/worker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use chrono::Utc;
use tokio::select;
use tokio_util::sync::CancellationToken;
Expand All @@ -7,7 +9,7 @@ use crate::{config::ExternalError, Repository};

use super::{
kafka::{KafkaBackend, KafkaFetch},
repository::{KrustConnection, KrustMessage, KrustTopic, MessagesRepository},
repository::{KrustConnection, KrustMessage, KrustTopic, MessagesRepository, Partition},
};

#[derive(Debug, Clone, Copy, PartialEq, Default, strum::EnumString, strum::Display)]
Expand Down Expand Up @@ -113,6 +115,9 @@ impl MessagesWorker {
MessagesMode::Cached { refresh: _ } => self.get_messages_cached(request).await,
}
}



async fn get_messages_cached(
self,
request: &MessagesRequest,
Expand All @@ -134,6 +139,7 @@ impl MessagesWorker {
name: request.topic.name.clone(),
cached,
partitions: vec![],
total: None,
};
let topic = repo.save_topic(
topic.connection_id.expect("should have connection id"),
Expand All @@ -144,37 +150,46 @@ impl MessagesWorker {
let total = match request.topic.cached {
Some(_) => {
if refresh {
let cached_total = mrepo.count_messages(None).unwrap_or_default();
let total = kafka.topic_message_count(&topic.name).await - cached_total;
let partitions = mrepo.find_offsets().ok();
let topic = kafka.topic_message_count(&topic.name, partitions.clone()).await;
let partitions = topic.partitions.clone();
let total = topic.total.unwrap_or_default();
kafka
.cache_messages_for_topic(
topic_name,
total,
&mut mrepo,
partitions,
Some(KafkaFetch::Newest),
)
.await
.unwrap();
}
mrepo.count_messages(request.search.clone()).unwrap_or_default()
}
None => {
let total = kafka.topic_message_count(&topic.name).await;
mrepo.init().unwrap();
kafka
.cache_messages_for_topic(
topic_name,
total,
&mut mrepo,
None,
Some(KafkaFetch::Oldest),
Some(partitions),
Some(KafkaFetch::Newest),
)
.await
.unwrap();
}
mrepo
.count_messages(request.search.clone())
.unwrap_or_default()
}
None => {
let total = kafka
.topic_message_count(&topic.name, None)
.await
.total
.unwrap_or_default();
mrepo.init().unwrap();
kafka
.cache_messages_for_topic(
topic_name,
total,
&mut mrepo,
None,
Some(KafkaFetch::Oldest),
)
.await
.unwrap();
let total = if request.search.clone().is_some() {
mrepo.count_messages(request.search.clone()).unwrap_or_default()
mrepo
.count_messages(request.search.clone())
.unwrap_or_default()
} else {
total
};
Expand All @@ -184,19 +199,19 @@ impl MessagesWorker {
let messages = match request.page_operation {
PageOp::Next => match request.offset_partition {
(0, 0) => mrepo
.find_messages(request.clone().page_size, request.clone().search)
.unwrap(),
.find_messages(request.clone().page_size, request.clone().search)
.unwrap(),
offset_partition => mrepo
.find_next_messages(request.page_size, offset_partition, request.clone().search)
.unwrap(),
.find_next_messages(request.page_size, offset_partition, request.clone().search)
.unwrap(),
},
PageOp::Prev => mrepo
.find_prev_messages(
request.page_size,
request.offset_partition,
request.clone().search,
)
.unwrap(),
.find_prev_messages(
request.page_size,
request.offset_partition,
request.clone().search,
)
.unwrap(),
};
Ok(MessagesResponse {
total,
Expand All @@ -215,7 +230,11 @@ impl MessagesWorker {
let kafka = KafkaBackend::new(&request.connection);
let topic = &request.topic.name;
// Run async background task
let total = kafka.topic_message_count(topic).await;
let total = kafka
.topic_message_count(topic, None)
.await
.total
.unwrap_or_default();
let messages = kafka.list_messages_for_topic(topic, total).await?;
Ok(MessagesResponse {
total,
Expand Down
1 change: 1 addition & 0 deletions src/component/messages/messages_tab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ impl FactoryComponent for MessagesTabModel {
connection_id: cloned_topic.connection_id,
cached: None,
partitions: vec![],
total: None,
};
let conn = self.connection.clone().unwrap();
MessagesWorker::new().cleanup_messages(&MessagesCleanupRequest {
Expand Down
1 change: 1 addition & 0 deletions src/component/topics_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl Component for TopicsPageModel {
name: item.borrow().name.clone(),
cached: None,
partitions: vec![],
total: None,
};
sender
.output(TopicsPageOutput::OpenMessagesPage(
Expand Down

0 comments on commit f836538

Please sign in to comment.