Skip to content

Commit

Permalink
Messages page improvements.
Browse files Browse the repository at this point in the history
  - Messages search: Live and Cached.
  - Minor layout improvements.
  • Loading branch information
Miguel Aranha Baldi Horlle committed Apr 20, 2024
1 parent 276f1da commit 6cc7c11
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 99 deletions.
185 changes: 128 additions & 57 deletions src/backend/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,28 +123,41 @@ impl MessagesRepository {
//let conn = self.get_connection();
let mut stmt_by_id = conn.prepare_cached(
"INSERT INTO kr_message(partition, offset, value, timestamp, headers)
VALUES (:p, :o, :v, :t, :h)",
VALUES (:p, :o, :v, :t, :h)",
)?;
let headers =
ron::ser::to_string::<Vec<KrustHeader>>(message.headers.as_ref()).unwrap_or_default();
let maybe_message = stmt_by_id
.execute(
named_params! { ":p": message.partition, ":o": message.offset, ":v": message.value, ":t": message.timestamp, ":h": headers},
)
.map_err(ExternalError::DatabaseError);
.execute(
named_params! { ":p": message.partition, ":o": message.offset, ":v": message.value, ":t": message.timestamp, ":h": headers},
)
.map_err(ExternalError::DatabaseError);

match maybe_message {
Ok(_) => Ok(message.to_owned()),
Err(e) => Err(e),
}
}

pub fn count_messages(&mut self) -> Result<usize, ExternalError> {
pub fn count_messages(&mut self, search: Option<String>) -> Result<usize, ExternalError> {
let conn = self.get_connection();
let mut stmt_by_id = conn.prepare_cached("SELECT COUNT(1) FROM kr_message")?;

stmt_by_id
.query_row(params![], move |row| row.get(0))
let mut stmt_count = match search {
Some(_) => {
conn.prepare_cached("SELECT COUNT(1) FROM kr_message WHERE value LIKE :search")?
}
None => conn.prepare_cached("SELECT COUNT(1) FROM kr_message")?,
};
let params_with_search =
named_params! { ":search": format!("%{}%", search.clone().unwrap_or_default()) };
stmt_count
.query_row(
if search.is_some() {
params_with_search
} else {
named_params![]
},
move |row| row.get(0),
)
.map_err(ExternalError::DatabaseError)
}

Expand All @@ -154,12 +167,12 @@ impl MessagesRepository {
let conn = self.get_connection();
let mut stmt_by_id = conn.prepare_cached(
"SELECT high.partition partition, offset_low, offset_high
FROM (SELECT partition, MAX(offset) offset_high
from kr_message
GROUP BY partition) high
JOIN (SELECT partition, MIN(offset) offset_low
from kr_message
GROUP BY partition) low ON high.partition = low.partition",
FROM (SELECT partition, MAX(offset) offset_high
from kr_message
GROUP BY partition) high
JOIN (SELECT partition, MIN(offset) offset_low
from kr_message
GROUP BY partition) low ON high.partition = low.partition",
)?;

let row_to_model = move |row: &Row<'_>| {
Expand All @@ -179,14 +192,27 @@ impl MessagesRepository {
Ok(messages)
}

pub fn find_messages(&mut self, page_size: u16) -> Result<Vec<KrustMessage>, ExternalError> {
pub fn find_messages(
&mut self,
page_size: u16,
search: Option<String>,
) -> Result<Vec<KrustMessage>, ExternalError> {
let conn = self.get_connection();
let mut stmt_by_id = conn.prepare_cached(
"SELECT partition, offset, value, timestamp, headers
FROM kr_message
ORDER BY offset, partition
LIMIT :ps",
)?;
let mut stmt_query = match search {
Some(_) => conn.prepare_cached(
"SELECT partition, offset, value, timestamp, headers
FROM kr_message
WHERE value LIKE :search
ORDER BY offset, partition
LIMIT :ps",
)?,
None => conn.prepare_cached(
"SELECT partition, offset, value, timestamp, headers
FROM kr_message
ORDER BY offset, partition
LIMIT :ps",
)?,
};
let string_to_headers = move |sheaders: String| {
let headers: Result<Vec<KrustHeader>, rusqlite::Error> = ron::from_str(&sheaders)
.map_err(|e| rusqlite::Error::InvalidColumnName(e.to_string()));
Expand All @@ -203,8 +229,17 @@ impl MessagesRepository {
topic: topic_name.clone(),
})
};
let rows = stmt_by_id
.query_map(named_params! {":ps": page_size}, row_to_model)
let params_with_search = named_params! { ":search": format!("%{}%", search.clone().unwrap_or_default()), ":ps": page_size };
let params = named_params! { ":ps": page_size, };
let rows = stmt_query
.query_map(
if search.is_some() {
params_with_search
} else {
params
},
row_to_model,
)
.map_err(ExternalError::DatabaseError)?;
let mut messages = Vec::new();
for row in rows {
Expand All @@ -217,16 +252,27 @@ impl MessagesRepository {
&mut self,
page_size: u16,
last: (usize, usize),
search: Option<String>,
) -> Result<Vec<KrustMessage>, ExternalError> {
let (offset, partition) = last;
let conn = self.get_connection();
let mut stmt_by_id = conn.prepare_cached(
"SELECT partition, offset, value, timestamp, headers
FROM kr_message
WHERE (offset, partition) > (:o, :p)
ORDER BY offset, partition
LIMIT :ps",
)?;
let mut stmt_query = match search {
Some(_) => conn.prepare_cached(
"SELECT partition, offset, value, timestamp, headers
FROM kr_message
WHERE (offset, partition) > (:o, :p)
AND value LIKE :search
ORDER BY offset, partition
LIMIT :ps",
)?,
None => conn.prepare_cached(
"SELECT partition, offset, value, timestamp, headers
FROM kr_message
WHERE (offset, partition) > (:o, :p)
ORDER BY offset, partition
LIMIT :ps",
)?,
};
let string_to_headers = move |sheaders: String| {
let headers: Result<Vec<KrustHeader>, rusqlite::Error> = ron::from_str(&sheaders)
.map_err(|e| rusqlite::Error::InvalidColumnName(e.to_string()));
Expand All @@ -243,9 +289,15 @@ impl MessagesRepository {
topic: topic_name.clone(),
})
};
let rows = stmt_by_id
let params_with_search = named_params! {":ps": page_size, ":o": offset, ":p": partition, ":search": format!("%{}%", search.clone().unwrap_or_default())};
let params = named_params! {":ps": page_size, ":o": offset, ":p": partition,};
let rows = stmt_query
.query_map(
named_params! {":ps": page_size, ":o": offset, ":p": partition},
if search.is_some() {
params_with_search
} else {
params
},
row_to_model,
)
.map_err(ExternalError::DatabaseError)?;
Expand All @@ -259,18 +311,31 @@ impl MessagesRepository {
&mut self,
page_size: u16,
first: (usize, usize),
search: Option<String>,
) -> Result<Vec<KrustMessage>, ExternalError> {
let (offset, partition) = first;
let conn = self.get_connection();
let mut stmt_by_id = conn.prepare_cached(
"SELECT partition, offset, value, timestamp, headers FROM (
SELECT partition, offset, value, timestamp, headers
FROM kr_message
WHERE (offset, partition) < (:o, :p)
ORDER BY offset DESC, partition DESC
LIMIT :ps
) ORDER BY offset ASC, partition ASC",
)?;
let mut stmt_query = match search {
Some(_) => conn.prepare_cached(
"SELECT partition, offset, value, timestamp, headers FROM (
SELECT partition, offset, value, timestamp, headers
FROM kr_message
WHERE (offset, partition) < (:o, :p)
AND value LIKE :search
ORDER BY offset DESC, partition DESC
LIMIT :ps
) ORDER BY offset ASC, partition ASC",
)?,
None => conn.prepare_cached(
"SELECT partition, offset, value, timestamp, headers FROM (
SELECT partition, offset, value, timestamp, headers
FROM kr_message
WHERE (offset, partition) < (:o, :p)
ORDER BY offset DESC, partition DESC
LIMIT :ps
) ORDER BY offset ASC, partition ASC",
)?,
};
let string_to_headers = move |sheaders: String| {
let headers: Result<Vec<KrustHeader>, rusqlite::Error> = ron::from_str(&sheaders)
.map_err(|e| rusqlite::Error::InvalidColumnName(e.to_string()));
Expand All @@ -287,9 +352,15 @@ impl MessagesRepository {
topic: topic_name.clone(),
})
};
let rows = stmt_by_id
let params_with_search = named_params! {":ps": page_size, ":o": offset, ":p": partition, ":search": format!("%{}%", search.clone().unwrap_or_default()) };
let params = named_params! {":ps": page_size, ":o": offset, ":p": partition };
let rows = stmt_query
.query_map(
named_params! {":ps": page_size, ":o": offset, ":p": partition},
if search.is_some() {
params_with_search
} else {
params
},
row_to_model,
)
.map_err(ExternalError::DatabaseError)?;
Expand All @@ -315,9 +386,9 @@ impl Repository {

pub fn init(&mut self) -> Result<(), ExternalError> {
self.conn.execute_batch("
CREATE TABLE IF NOT EXISTS kr_connection(id INTEGER PRIMARY KEY, name TEXT UNIQUE, brokersList TEXT, securityType TEXT, saslMechanism TEXT, saslUsername TEXT, saslPassword TEXT);
CREATE TABLE IF NOT EXISTS kr_topic(connection_id INTEGER, name TEXT, cached INTEGER, PRIMARY KEY (connection_id, name), FOREIGN KEY (connection_id) REFERENCES kr_connection(id));
").map_err(ExternalError::DatabaseError)
CREATE TABLE IF NOT EXISTS kr_connection(id INTEGER PRIMARY KEY, name TEXT UNIQUE, brokersList TEXT, securityType TEXT, saslMechanism TEXT, saslUsername TEXT, saslPassword TEXT);
CREATE TABLE IF NOT EXISTS kr_topic(connection_id INTEGER, name TEXT, cached INTEGER, PRIMARY KEY (connection_id, name), FOREIGN KEY (connection_id) REFERENCES kr_connection(id));
").map_err(ExternalError::DatabaseError)
}

pub fn list_all_connections(&mut self) -> Result<Vec<KrustConnection>, ExternalError> {
Expand Down Expand Up @@ -385,9 +456,9 @@ impl Repository {
Ok(konn_to_update) => {
let mut up_stmt = self.conn.prepare_cached("UPDATE kr_connection SET name = :name, brokersList = :brokers, securityType = :security, saslMechanism = :sasl, saslUsername = :sasl_u, saslPassword = :sasl_p WHERE id = :id")?;
up_stmt
.execute(named_params! { ":id": &konn_to_update.id.unwrap(), ":name": &name, ":brokers": &brokers, ":security": security.to_string(), ":sasl": &sasl, ":sasl_u": &sasl_username, ":sasl_p": &sasl_password })
.map_err(ExternalError::DatabaseError)
.map( |_| {KrustConnection { id: konn_to_update.id, name, brokers_list: brokers, security_type: security, sasl_mechanism: sasl, sasl_username, sasl_password }})
.execute(named_params! { ":id": &konn_to_update.id.unwrap(), ":name": &name, ":brokers": &brokers, ":security": security.to_string(), ":sasl": &sasl, ":sasl_u": &sasl_username, ":sasl_p": &sasl_password })
.map_err(ExternalError::DatabaseError)
.map( |_| {KrustConnection { id: konn_to_update.id, name, brokers_list: brokers, security_type: security, sasl_mechanism: sasl, sasl_username, sasl_password }})
}
Err(_) => {
let mut ins_stmt = self.conn.prepare_cached("INSERT INTO kr_connection (id, name, brokersList, securityType, saslMechanism, saslUsername, saslPassword) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
Expand Down Expand Up @@ -429,9 +500,9 @@ impl Repository {
let cached = topic.cached;
let mut stmt_by_id = self.conn.prepare_cached(
"INSERT INTO kr_topic(connection_id, name, cached)
VALUES (:cid, :topic, :cached)
ON CONFLICT(connection_id, name)
DO UPDATE SET cached=excluded.cached",
VALUES (:cid, :topic, :cached)
ON CONFLICT(connection_id, name)
DO UPDATE SET cached=excluded.cached",
)?;
let row_to_model = move |_| {
Ok(KrustTopic {
Expand All @@ -457,8 +528,8 @@ impl Repository {
let name = topic.name.clone();
let mut stmt_by_id = self.conn.prepare_cached(
"DELETE FROM kr_topic
WHERE connection_id = :cid
AND name = :topic",
WHERE connection_id = :cid
AND name = :topic",
)?;

stmt_by_id
Expand All @@ -468,7 +539,7 @@ impl Repository {

pub fn find_topic(&mut self, conn_id: usize, topic_name: &String) -> Option<KrustTopic> {
let stmt = self.conn
.prepare_cached("SELECT connection_id, name, cached FROM kr_topic WHERE connection_id = :cid AND name = :topic");
.prepare_cached("SELECT connection_id, name, cached FROM kr_topic WHERE connection_id = :cid AND name = :topic");
stmt.ok()?
.query_row(
named_params! {":cid": &conn_id, ":topic": &topic_name },
Expand Down
29 changes: 22 additions & 7 deletions src/backend/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct MessagesRequest {
pub page_operation: PageOp,
pub page_size: u16,
pub offset_partition: (usize, usize),
pub search: Option<String>,
}

#[derive(Debug, Clone)]
Expand All @@ -42,6 +43,7 @@ pub struct MessagesResponse {
pub total: usize,
pub messages: Vec<KrustMessage>,
pub topic: Option<KrustTopic>,
pub search: Option<String>,
}

pub struct MessagesCleanupRequest {
Expand Down Expand Up @@ -92,7 +94,7 @@ impl MessagesWorker {
_ = token.cancelled() => {
info!("request {:?} cancelled", &req);
// The token was cancelled
Ok(MessagesResponse { total: 0, messages: Vec::new(), topic: Some(req.topic), page_operation: req.page_operation, page_size: req.page_size})
Ok(MessagesResponse { total: 0, messages: Vec::new(), topic: Some(req.topic), page_operation: req.page_operation, page_size: req.page_size, search: req.search})
}
messages = self.get_messages_by_mode(&req) => {
messages
Expand Down Expand Up @@ -142,7 +144,7 @@ impl MessagesWorker {
let total = match request.topic.cached {
Some(_) => {
if refresh {
let cached_total = mrepo.count_messages().unwrap_or_default();
let cached_total = mrepo.count_messages(None).unwrap_or_default();
let total = kafka.topic_message_count(&topic.name).await - cached_total;
let partitions = mrepo.find_offsets().ok();
kafka
Expand All @@ -156,7 +158,7 @@ impl MessagesWorker {
.await
.unwrap();
}
mrepo.count_messages().unwrap_or_default()
mrepo.count_messages(request.search.clone()).unwrap_or_default()
}
None => {
let total = kafka.topic_message_count(&topic.name).await;
Expand All @@ -171,18 +173,29 @@ impl MessagesWorker {
)
.await
.unwrap();
let total = if request.search.clone().is_some() {
mrepo.count_messages(request.search.clone()).unwrap_or_default()
} else {
total
};
total
}
};
let messages = match request.page_operation {
PageOp::Next => match request.offset_partition {
(0, 0) => mrepo.find_messages(request.page_size).unwrap(),
(0, 0) => mrepo
.find_messages(request.clone().page_size, request.clone().search)
.unwrap(),
offset_partition => mrepo
.find_next_messages(request.page_size, offset_partition)
.find_next_messages(request.page_size, offset_partition, request.clone().search)
.unwrap(),
},
PageOp::Prev => mrepo
.find_prev_messages(request.page_size, request.offset_partition)
.find_prev_messages(
request.page_size,
request.offset_partition,
request.clone().search,
)
.unwrap(),
};
Ok(MessagesResponse {
Expand All @@ -191,6 +204,7 @@ impl MessagesWorker {
topic: Some(topic),
page_operation: request.page_operation,
page_size: request.page_size,
search: request.search.clone(),
})
}

Expand All @@ -202,13 +216,14 @@ impl MessagesWorker {
let topic = &request.topic.name;
// Run async background task
let total = kafka.topic_message_count(topic).await;
let messages = kafka.list_messages_for_topic(topic, total, ).await?;
let messages = kafka.list_messages_for_topic(topic, total).await?;
Ok(MessagesResponse {
total,
messages: messages,
topic: Some(request.topic.clone()),
page_operation: request.page_operation,
page_size: request.page_size,
search: request.search.clone(),
})
}
}
Loading

0 comments on commit 6cc7c11

Please sign in to comment.