Skip to content

Commit

Permalink
feat: handled client, update script helper
Browse files Browse the repository at this point in the history
  • Loading branch information
dieriba committed Feb 20, 2025
1 parent 2fa0d17 commit ae886f3
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 88 deletions.
207 changes: 124 additions & 83 deletions backend/windmill-api/src/mqtt_triggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ use rumqttc::{
mqttbytes::{
v5::{
ConnectProperties, Filter, LastWill as V5LastWill,
LastWillProperties as V5LastWillProperties, Publish as V5Publish,
LastWillProperties as V5LastWillProperties, PublishProperties,
},
QoS as V5QoS,
},
AsyncClient as V5AsyncClient, Event as V5Event, EventLoop as V5EventLoop,
Incoming as V5Incoming, MqttOptions as V5MqttOptions,
},
AsyncClient as V3AsyncClient, Event as V3Event, EventLoop as V3EventLoop,
Incoming as V3Incoming, MqttOptions as V3MqttOptions, Outgoing as V3Outgoing,
Publish as V3Publish, QoS as V3QoS, SubscribeFilter, TlsConfiguration,
Incoming as V3Incoming, MqttOptions as V3MqttOptions, QoS as V3QoS, SubscribeFilter,
TlsConfiguration,
};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
Expand All @@ -49,10 +49,7 @@ use windmill_common::{
};

use rand::seq::SliceRandom;
use serde_json::{
value::{RawValue, Value},
Map,
};
use serde_json::value::RawValue;
use sqlx::types::Json as SqlxJson;

use windmill_queue::PushArgsOwned;
Expand Down Expand Up @@ -827,7 +824,7 @@ async fn get_mqtt_async_client(
&mqtt_resource.host,
mqtt_resource.port,
);

match (
mqtt_resource.username.as_deref(),
mqtt_resource.password.as_deref(),
Expand Down Expand Up @@ -1164,30 +1161,45 @@ impl MqttTrigger {
}
}

enum MqttPublishPacket {
V3(V3Publish),
V5(V5Publish),
struct PublishData {
topic: String,
retain: bool,
pkid: u16,
v5: Option<PublishProperties>,
}

/*async fn handle_publish_packet(db: &DB, mqtt: &MqttConfig, publish: MqttPublishPacket) {
let payload;
let extra_value = Map::new();
extra_value.insert("kind".to_string(), Value::String("mqtt".to_string()));
match &publish {
MqttPublishPacket::V3(publish) => {
payload = publish.payload.as_ref();
extra_value.insert("topic".to_string(), publish.topic.clone());
}
MqttPublishPacket::V5(publish) => {
payload = publish.payload.as_ref();
topic = String::from_utf8(publish.topic.as_ref().to_vec()).unwrap_or("".to_string());
}
impl PublishData {
fn new(topic: String, retain: bool, pkid: u16, v5: Option<PublishProperties>) -> PublishData {
PublishData { topic, retain, pkid, v5 }
}
}

let args = HashMap::from([("payload".to_string(), to_raw_value(&payload))]);
let extra = Some(HashMap::from([("wm_trigger".to_string(), extra_value)]));
async fn handle_publish_packet(db: &DB, mqtt: &MqttConfig, payload: Bytes, publish: PublishData) {
let args = HashMap::from([("payload".to_string(), to_raw_value(&payload.as_ref()))]);
let extra = Some(HashMap::from([(
"wm_trigger".to_string(),
to_raw_value(&serde_json::json!({
"kind": "mqtt",
"mqtt": {
"topic": publish.topic,
"retain": publish.retain,
"pkid": publish.pkid,
"v5": publish.v5.map(|properties| {
serde_json::json!({
"payload_format_indicator": properties.payload_format_indicator,
"topic_alias": properties.topic_alias,
"response_topic": properties.response_topic,
"correlation_data": properties.correlation_data.as_deref(),
"user_properties": properties.user_properties,
"subscription_identifiers": properties.subscription_identifiers,
"content_type": properties.content_type,
})
})
}
})),
)]));
mqtt.handle(&db, Some(args), extra).await;
}*/
}

async fn listen_to_messages(
mqtt: MqttConfig,
Expand All @@ -1196,95 +1208,124 @@ async fn listen_to_messages(
) {
tokio::select! {
biased;

_ = killpill_rx.recv() => {
return;
}

_ = loop_ping(&db, &mqtt, Some("Connecting...")) => {
return;
}

result = mqtt.start_consuming_messages(&db) => {
tokio::select! {
biased;

_ = killpill_rx.recv() => {
return;
}

_ = loop_ping(&db, &mqtt, None) => {
return;
}
_ = {
async {
match result {
Ok(connection) => {
match connection {
MqttClientResult::V5((_, mut event_loop)) => {
loop {
match event_loop.poll().await {
Ok(notification) => {
match notification {
V5Event::Incoming(packet) => {
match packet {
V5Incoming::Publish(publish) => {
todo!()
//handle_publish_packet(&db, &mqtt, MqttPublishPacket::V5(publish)).await;
}
V5Incoming::Disconnect(disconnect) => {
let err_message = disconnect.properties.map(|properties| properties.reason_string).flatten();
let reason_code = disconnect.reason_code as u8;
mqtt.disable_with_error(&db, format!("Disconnected by the server, reason code: {}, {}", reason_code, err_message.map(|err| format!("message: {}", err)).unwrap_or("".to_string()))).await;
return;
}
packet => {
tracing::debug!("Received = {:?}", packet);
}

_ = async {
match result {
Ok(connection) => {
match connection {
MqttClientResult::V5((_, mut event_loop)) => {
loop {
match event_loop.poll().await {
Ok(notification) => {
match notification {
V5Event::Incoming(packet) => {
match packet {
V5Incoming::Publish(publish) => {
let publish_data = PublishData::new(
String::from_utf8(publish.topic.as_ref().to_vec())
.unwrap_or("".to_string()),
publish.retain,
publish.pkid,
publish.properties
);
handle_publish_packet(&db, &mqtt, publish.payload, publish_data).await;
}
V5Incoming::Disconnect(disconnect) => {
let err_message = disconnect.properties
.map(|properties| properties.reason_string)
.flatten();
let reason_code = disconnect.reason_code as u8;
mqtt.disable_with_error(
&db,
format!(
"Disconnected by the broker, reason code: {}, {}",
reason_code,
err_message
.map(|err| format!("message: {}", err))
.unwrap_or("".to_string())
)
).await;
return;
}
packet => {
tracing::debug!("Received = {:#?}", packet);
}
}
V5Event::Outgoing(packet) => {
tracing::debug!("Outgoing Received = {:?}", packet);
}
}
V5Event::Outgoing(packet) => {
tracing::debug!("Outgoing Received = {:#?}", packet);
}
}
Err(err) => {
mqtt.disable_with_error(&db, err.to_string()).await
}
}
Err(err) => {
mqtt.disable_with_error(&db, err.to_string()).await
}
}
}
MqttClientResult::V3((_, mut event_loop)) => {
loop {
match event_loop.poll().await {
Ok(notification) => {
match notification {
V3Event::Incoming(packet) => {
match packet {
V3Incoming::Publish(publish) => {
todo!()
}
packet => {
tracing::debug!("Received = {:?}", packet);
}
}
MqttClientResult::V3((_, mut event_loop)) => {
loop {
match event_loop.poll().await {
Ok(notification) => {
match notification {
V3Event::Incoming(packet) => {
match packet {
V3Incoming::Publish(publish) => {
let publish_data = PublishData::new(
publish.topic,
publish.retain,
publish.pkid,
None
);
handle_publish_packet(&db, &mqtt, publish.payload, publish_data).await;
}
packet => {
tracing::debug!("Received = {:?}", packet);
}
}
V3Event::Outgoing(packet) => {
tracing::debug!("Outgoing Received = {:?}", packet);
}
}
V3Event::Outgoing(packet) => {
tracing::debug!("Outgoing Received = {:?}", packet);
}
}
Err(err) => {
mqtt.disable_with_error(&db, err.to_string()).await
}
}
Err(err) => {
mqtt.disable_with_error(&db, err.to_string()).await
}
}
}
}
}
Err(err) => {
tracing::error!("Mqtt trigger error while trying to start listening to notifications: {}", &err);
mqtt.disable_with_error(&db, err.to_string()).await
}
}
Err(err) => {
tracing::error!(
"Mqtt trigger error while trying to start listening to notifications: {}",
&err
);
mqtt.disable_with_error(&db, err.to_string()).await
}
}
} => {
}
} => {}
}
}
}
Expand Down
45 changes: 40 additions & 5 deletions frontend/src/lib/script_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ export async function main(approver?: string) {
export const BUN_PREPROCESSOR_MODULE_CODE = `
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs', 'mqtt',
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
Expand Down Expand Up @@ -616,7 +616,18 @@ export async function preprocessor(
}>
},
mqtt?: {
topic: string
topic: string,
retain: boolean,
pkid: number,
v5?: {
payload_format_indicator?: number,
topic_alias?: number,
response_topic?: string,
correlation_data?: Array<number>,
user_properties?: Array<[string, string]>,
subscription_identifiers?: Array<number>,
content_type?: string
}
}
},
/* your other args */
Expand All @@ -630,7 +641,7 @@ export async function preprocessor(
const DENO_PREPROCESSOR_MODULE_CODE = `
export async function preprocessor(
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs', 'mqtt',
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'kafka' | 'nats' | 'postgres' | 'sqs' | 'mqtt',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
Expand Down Expand Up @@ -666,7 +677,18 @@ export async function preprocessor(
}>
},
mqtt?: {
topic: string
topic: string,
retain: boolean,
pkid: number,
v5?: {
payload_format_indicator?: number,
topic_alias?: number,
response_topic?: string,
correlation_data?: Array<number>,
user_properties?: Array<[string, string]>,
subscription_identifiers?: Array<number>,
content_type?: string
}
}
},
/* your other args */
Expand Down Expand Up @@ -740,8 +762,21 @@ class Sqs(TypedDict):
attributes: dict[str, str]
message_attributes: dict[str, MessageAttribute] | None
class V5Properties:
payload_format_indicator: int | None
topic_alias: int | None
response_topic: str | None
correlation_data: list[int] | None
user_properties: list[tuple[str, str]] | None
subscription_identifiers: list[int] | None
content_type: str | None
class Mqtt(TypeDict):
topic: str
topic: str
retain: bool
pkid: int
v5: V5Properties | None
class WmTrigger(TypedDict):
kind: Literal["http", "email", "webhook", "websocket", "kafka", "nats", "postgres", "sqs", "mqtt"]
Expand Down

0 comments on commit ae886f3

Please sign in to comment.