Skip to content

Commit

Permalink
Minor fixes and improvements.
Browse files Browse the repository at this point in the history
  - Resend selected messages, with headers.
  - Headers support when sending messages to Kafka.
  • Loading branch information
miguelbaldi committed Sep 17, 2024
1 parent 930625d commit 1d59358
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build_and_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ jobs:
if: startsWith(github.ref, 'refs/tags/')
with:
prerelease: false
draft: true
generate_release_notes: true
append_body: true
files: |
**/*.AppImage
**/*.zip
Expand Down
130 changes: 125 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 14 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[package]
name = "krust"
rust-version = "1.77"
version = "0.0.1"
edition = "2021"
resolver = "2"
Expand All @@ -15,36 +16,40 @@ anyhow = "1.0.51"
gtk = { version = "0.8.1", package = "gtk4", features = ["v4_14"] }
adw = { version = "0.6.0", package = "libadwaita", features = ["v1_5"] }
relm4 = { version = "0.8.1", features = ["libadwaita", "gnome_46"] }
relm4-components = { version = "0.8.1", features = ["libadwaita"]}
relm4-components = { version = "0.8.1", features = ["libadwaita"] }
tokio = { version = "1.37.0", features = ["full"] }
tokio-util = "0.7.10"
rusqlite = { version = "0.31.0", features = ["bundled", "hooks"] }
sourceview5 = { version = "0.8.0", features = ["v5_4"] }
directories = "4.0.1"
futures = { version = "0.3.25", default-features = false }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { version= "1.0.79", features = ["preserve_order"] }
serde_json = { version = "1.0.79", features = ["preserve_order"] }
ron = "0.8"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-tree = "0.3.0"
thiserror = "1.0.58"
chrono = { version = "0.4.37", features = ["serde"] }
chrono-tz = { version = "0.9.0", features = [ "filter-by-regex" ] }
chrono-tz = { version = "0.9.0", features = ["filter-by-regex"] }
strum = { version = "0.26.2", features = ["derive"] }
rdkafka = { version = "0.36.2", features = [ "cmake-build", "gssapi", "ssl" ] }
rdkafka = { version = "0.36.2", features = ["cmake-build", "gssapi", "ssl"] }
csv = "1.3.0"
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"]}
humansize = "2.0.0"
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
humansize = "2.1.3"
copypasta = { version = "0.10.1", default-features = true }
sysinfo = "0.31.4"
fs_extra = "1.3.0"

[target.'cfg(target_os = "windows")'.dependencies]
sasl2-sys = { version = "0.1.20",features = ["openssl-vendored"]}
sasl2-sys = { version = "0.1.20", features = ["openssl-vendored"] }

[build-dependencies]
anyhow = "1.0.51"
openssl-src = { version = "300", default-features = false, features = ["force-engine"] }
vergen = { version = "8.3.1", features = ["build","git","gitcl","si"] }
openssl-src = { version = "300", default-features = false, features = [
"force-engine",
] }
vergen = { version = "8.3.1", features = ["build", "git", "gitcl", "si"] }
#glib-build-tools = "0.19.0"

[package.metadata.appimage]
Expand Down
18 changes: 11 additions & 7 deletions src/backend/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rdkafka::config::{ClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}
use rdkafka::consumer::BaseConsumer;
use rdkafka::consumer::{Consumer, ConsumerContext};
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::message::Headers;
use rdkafka::message::{Header, Headers, OwnedHeaders};

use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::topic_partition_list::TopicPartitionList;
Expand Down Expand Up @@ -252,22 +252,26 @@ impl KafkaBackend {
.map(|message| async move {
// The send operation on the topic returns a future, which will be
// completed once the result or failure from Kafka is received.
let mut kheaders = OwnedHeaders::new();
for h in message.headers.clone().iter() {
let value = h.value.as_ref();
let key = h.key.as_str();
let header = Header { key, value };
kheaders = kheaders.insert(header);
}
let delivery_status = producer
.send(
FutureRecord::to(topic)
.partition(message.partition)
.payload(&message.value)
.key(&message.key.clone().unwrap_or_default()),
// .headers(OwnedHeaders::new().insert(Header {
// key: "header_key",
// value: Some("header_value"),
// })),
.key(&message.key.clone().unwrap_or_default())
.headers(kheaders),
Duration::from_secs(0),
)
.await;

// This will be executed when the result is received.
info!("Delivery status for message {:?} received", message);
trace!("Delivery status for message {:?} received", message);
delivery_status
})
.collect::<Vec<_>>();
Expand Down
2 changes: 1 addition & 1 deletion src/component/connection_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Component for ConnectionPageModel {
#[root]
adw::PreferencesDialog {
set_title: "Connection",
set_height_request: 500,
set_height_request: 570,
add = &adw::PreferencesPage {
add = &adw::PreferencesGroup {
#[name = "name_entry" ]
Expand Down
Loading

0 comments on commit 1d59358

Please sign in to comment.