Skip to content

Commit a39d60a

Browse files
StormStakemladedavStephenWakelypront
authored
feat(new source): Initial MQTT Source, #19931 (#22752)
* wip * add mqtt source cue reference * add changelog * Update src/sources/mqtt/config.rs Co-authored-by: Stephen Wakely <stephen@lisp.space> * rustfmt * honor shutdown signa * git rm .vscode/settings.json * add deny warnings back * use new_maybe_logs * fix type, changelog, rename bad creds * add fowllow up github issue * common config WIP * Consolidated MQTT error types * More deduplication in MQTT error types * Deduplicated MQTT connector type * Formatting fixes for tests * Fixed docs tests * Fixed metrics and integration tests for MQTT * Added to changelog authorship * Fixed cue docs formating * Formatting fix on errors * Added topic metadata --------- Co-authored-by: David Mládek <david.mladek.cz@gmail.com> Co-authored-by: Stephen Wakely <stephen@lisp.space> Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
1 parent 9129d95 commit a39d60a

File tree

18 files changed

+1296
-103
lines changed

18 files changed

+1296
-103
lines changed

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,7 @@ sources-logs = [
580580
"sources-kafka",
581581
"sources-kubernetes_logs",
582582
"sources-logstash",
583+
"sources-mqtt",
583584
"sources-nats",
584585
"sources-opentelemetry",
585586
"sources-pulsar",
@@ -635,6 +636,7 @@ sources-kafka = ["dep:rdkafka"]
635636
sources-kubernetes_logs = ["vector-lib/file-source", "kubernetes", "transforms-reduce"]
636637
sources-logstash = ["sources-utils-net-tcp", "tokio-util/net"]
637638
sources-mongodb_metrics = ["dep:mongodb"]
639+
sources-mqtt = ["dep:rumqttc"]
638640
sources-nats = ["dep:async-nats", "dep:nkeys"]
639641
sources-nginx_metrics = ["dep:nom"]
640642
sources-opentelemetry = ["dep:hex", "vector-lib/opentelemetry", "dep:prost", "dep:prost-types", "sources-http_server", "sources-utils-http", "sources-utils-http-headers", "sources-vector"]
@@ -934,7 +936,7 @@ kafka-integration-tests = ["sinks-kafka", "sources-kafka"]
934936
logstash-integration-tests = ["docker", "sources-logstash"]
935937
loki-integration-tests = ["sinks-loki"]
936938
mongodb_metrics-integration-tests = ["sources-mongodb_metrics"]
937-
mqtt-integration-tests = ["sinks-mqtt"]
939+
mqtt-integration-tests = ["sinks-mqtt", "sources-mqtt"]
938940
nats-integration-tests = ["sinks-nats", "sources-nats"]
939941
nginx-integration-tests = ["sources-nginx_metrics"]
940942
opentelemetry-integration-tests = ["sources-opentelemetry", "dep:prost"]
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Adds a new `mqtt` source enabling Vector to receive logs from a MQTT broker.
2+
3+
authors: mladedav pront StormStake

scripts/integration/mqtt/test.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ paths:
1010
- "src/internal_events/mqtt.rs"
1111
- "src/sinks/mqtt/**"
1212
- "src/sinks/util/**"
13+
- "src/sources/mqtt/**"

src/common/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ pub(crate) mod sqs;
1818
#[cfg(any(feature = "sources-aws_s3", feature = "sinks-aws_s3"))]
1919
pub(crate) mod s3;
2020

21+
#[cfg(any(feature = "sources-mqtt", feature = "sinks-mqtt",))]
22+
/// Common MQTT configuration shared by MQTT components.
23+
pub mod mqtt;
24+
2125
#[cfg(any(feature = "transforms-log_to_metric", feature = "sinks-loki"))]
2226
pub(crate) mod expansion;
2327

src/common/mqtt.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use rumqttc::{AsyncClient, EventLoop, MqttOptions};
2+
use snafu::Snafu;
3+
use vector_config_macros::configurable_component;
4+
use vector_lib::tls::{TlsEnableableConfig, TlsError};
5+
6+
use crate::template::TemplateParseError;
7+
8+
/// Shared MQTT configuration for sources and sinks.
9+
#[configurable_component]
10+
#[derive(Clone, Debug, Derivative)]
11+
#[derivative(Default)]
12+
#[serde(deny_unknown_fields)]
13+
pub struct MqttCommonConfig {
14+
/// MQTT server address (The broker’s domain name or IP address).
15+
#[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))]
16+
pub host: String,
17+
18+
/// TCP port of the MQTT server to connect to.
19+
#[configurable(derived)]
20+
#[serde(default = "default_port")]
21+
#[derivative(Default(value = "default_port()"))]
22+
pub port: u16,
23+
24+
/// MQTT username.
25+
#[serde(default)]
26+
#[configurable(derived)]
27+
pub user: Option<String>,
28+
29+
/// MQTT password.
30+
#[serde(default)]
31+
#[configurable(derived)]
32+
pub password: Option<String>,
33+
34+
/// MQTT client ID.
35+
#[serde(default)]
36+
#[configurable(derived)]
37+
pub client_id: Option<String>,
38+
39+
/// Connection keep-alive interval.
40+
#[serde(default = "default_keep_alive")]
41+
#[derivative(Default(value = "default_keep_alive()"))]
42+
pub keep_alive: u16,
43+
44+
/// TLS configuration.
45+
#[configurable(derived)]
46+
pub tls: Option<TlsEnableableConfig>,
47+
}
48+
49+
const fn default_port() -> u16 {
50+
1883
51+
}
52+
53+
const fn default_keep_alive() -> u16 {
54+
60
55+
}
56+
57+
/// MQTT Error Types
58+
#[derive(Debug, Snafu)]
59+
#[snafu(visibility(pub))]
60+
pub enum MqttError {
61+
/// Topic template parsing failed
62+
#[snafu(display("invalid topic template: {source}"))]
63+
TopicTemplate {
64+
/// Source of error
65+
source: TemplateParseError,
66+
},
67+
/// TLS error
68+
#[snafu(display("TLS error: {source}"))]
69+
Tls {
70+
/// Source of error
71+
source: TlsError,
72+
},
73+
/// Configuration error
74+
#[snafu(display("MQTT configuration error: {source}"))]
75+
Configuration {
76+
/// Source of error
77+
source: ConfigurationError,
78+
},
79+
}
80+
81+
/// MQTT Configuration error types
82+
#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
83+
pub enum ConfigurationError {
84+
/// Empty client ID error
85+
#[snafu(display("Client ID is not allowed to be empty."))]
86+
EmptyClientId,
87+
/// Invalid credentials provided error
88+
#[snafu(display("Username and password must be either both provided or both missing."))]
89+
InvalidCredentials,
90+
/// Invalid client ID provied error
91+
#[snafu(display(
92+
"Client ID must be 1-23 characters long and must consist of only alphanumeric characters."
93+
))]
94+
InvalidClientId,
95+
/// Credentials provided were incomplete
96+
#[snafu(display("Username and password must be either both or neither provided."))]
97+
IncompleteCredentials,
98+
}
99+
100+
#[derive(Clone)]
101+
/// Mqtt connector wrapper
102+
pub struct MqttConnector {
103+
/// Mqtt connection options
104+
pub options: MqttOptions,
105+
}
106+
107+
impl MqttConnector {
108+
/// Creates a new MqttConnector
109+
pub const fn new(options: MqttOptions) -> Self {
110+
Self { options }
111+
}
112+
113+
/// Connects the connector and generates a client and eventloop
114+
pub fn connect(&self) -> (AsyncClient, EventLoop) {
115+
let (client, eventloop) = AsyncClient::new(self.options.clone(), 1024);
116+
(client, eventloop)
117+
}
118+
119+
/// TODO: Right now there is no way to implement the healthcheck properly: <https://github.com/bytebeamio/rumqtt/issues/562>
120+
pub async fn healthcheck(&self) -> crate::Result<()> {
121+
Ok(())
122+
}
123+
}

src/sinks/mqtt/config.rs

Lines changed: 18 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,53 +2,32 @@ use std::time::Duration;
22

33
use rand::Rng;
44
use rumqttc::{MqttOptions, QoS, TlsConfiguration, Transport};
5-
use snafu::{ResultExt, Snafu};
5+
use snafu::ResultExt;
66
use vector_lib::codecs::JsonSerializerConfig;
77

88
use crate::template::Template;
99
use crate::{
1010
codecs::EncodingConfig,
11-
config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
12-
sinks::{
13-
mqtt::sink::{ConfigurationSnafu, MqttConnector, MqttError, MqttSink, TlsSnafu},
14-
prelude::*,
15-
Healthcheck, VectorSink,
11+
common::mqtt::{
12+
ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
13+
TlsSnafu,
1614
},
17-
tls::{MaybeTlsSettings, TlsEnableableConfig},
15+
config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
16+
sinks::{mqtt::sink::MqttSink, prelude::*, Healthcheck, VectorSink},
17+
tls::MaybeTlsSettings,
1818
};
1919

2020
/// Configuration for the `mqtt` sink
2121
#[configurable_component(sink("mqtt"))]
2222
#[derive(Clone, Debug)]
2323
pub struct MqttSinkConfig {
24-
/// MQTT server address (The broker’s domain name or IP address).
25-
#[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))]
26-
pub host: String,
27-
28-
/// TCP port of the MQTT server to connect to.
29-
#[serde(default = "default_port")]
30-
pub port: u16,
31-
32-
/// MQTT username.
33-
pub user: Option<String>,
34-
35-
/// MQTT password.
36-
pub password: Option<String>,
37-
38-
/// MQTT client ID.
39-
pub client_id: Option<String>,
40-
41-
/// Connection keep-alive interval.
42-
#[serde(default = "default_keep_alive")]
43-
pub keep_alive: u16,
24+
#[serde(flatten)]
25+
pub common: MqttCommonConfig,
4426

4527
/// If set to true, the MQTT session is cleaned on login.
4628
#[serde(default = "default_clean_session")]
4729
pub clean_session: bool,
4830

49-
#[configurable(derived)]
50-
pub tls: Option<TlsEnableableConfig>,
51-
5231
/// MQTT publish topic (templates allowed)
5332
pub topic: Template,
5433

@@ -100,14 +79,6 @@ impl From<MqttQoS> for QoS {
10079
}
10180
}
10281

103-
const fn default_port() -> u16 {
104-
1883
105-
}
106-
107-
const fn default_keep_alive() -> u16 {
108-
60
109-
}
110-
11182
const fn default_clean_session() -> bool {
11283
false
11384
}
@@ -123,14 +94,9 @@ const fn default_retain() -> bool {
12394
impl Default for MqttSinkConfig {
12495
fn default() -> Self {
12596
Self {
126-
host: "localhost".into(),
127-
port: default_port(),
128-
user: None,
129-
password: None,
130-
client_id: None,
131-
keep_alive: default_keep_alive(),
97+
common: MqttCommonConfig::default(),
13298
clean_session: default_clean_session(),
133-
tls: None,
99+
134100
topic: Template::try_from("vector").expect("Cannot parse as a template"),
135101
retain: default_retain(),
136102
encoding: JsonSerializerConfig::default().into(),
@@ -164,17 +130,9 @@ impl SinkConfig for MqttSinkConfig {
164130
}
165131
}
166132

167-
#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
168-
pub enum ConfigurationError {
169-
#[snafu(display("Client ID is not allowed to be empty."))]
170-
EmptyClientId,
171-
#[snafu(display("Username and password must be either both provided or both missing."))]
172-
InvalidCredentials,
173-
}
174-
175133
impl MqttSinkConfig {
176134
fn build_connector(&self) -> Result<MqttConnector, MqttError> {
177-
let client_id = self.client_id.clone().unwrap_or_else(|| {
135+
let client_id = self.common.client_id.clone().unwrap_or_else(|| {
178136
let hash = rand::rng()
179137
.sample_iter(&rand_distr::Alphanumeric)
180138
.take(6)
@@ -186,11 +144,12 @@ impl MqttSinkConfig {
186144
if client_id.is_empty() {
187145
return Err(ConfigurationError::EmptyClientId).context(ConfigurationSnafu);
188146
}
189-
let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false).context(TlsSnafu)?;
190-
let mut options = MqttOptions::new(&client_id, &self.host, self.port);
191-
options.set_keep_alive(Duration::from_secs(self.keep_alive.into()));
147+
let tls =
148+
MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
149+
let mut options = MqttOptions::new(&client_id, &self.common.host, self.common.port);
150+
options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
192151
options.set_clean_session(self.clean_session);
193-
match (&self.user, &self.password) {
152+
match (&self.common.user, &self.common.password) {
194153
(Some(user), Some(password)) => {
195154
options.set_credentials(user, password);
196155
}
@@ -211,7 +170,7 @@ impl MqttSinkConfig {
211170
alpn,
212171
}));
213172
}
214-
MqttConnector::new(options, self.topic.to_string())
173+
Ok(MqttConnector::new(options))
215174
}
216175
}
217176

src/sinks/mqtt/integration_tests.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::common::mqtt::MqttCommonConfig;
12
use crate::config::{SinkConfig, SinkContext};
23
use crate::sinks::mqtt::config::MqttQoS;
34
use crate::sinks::mqtt::MqttSinkConfig;
@@ -25,16 +26,21 @@ async fn mqtt_happy() {
2526
trace_init();
2627

2728
let topic = "test";
28-
let cnf = MqttSinkConfig {
29+
let common = MqttCommonConfig {
2930
host: mqtt_broker_address(),
3031
port: mqtt_broker_port(),
32+
..Default::default()
33+
};
34+
35+
let config = MqttSinkConfig {
36+
common,
3137
topic: Template::try_from(topic).expect("Cannot parse the topic template"),
3238
quality_of_service: MqttQoS::AtLeastOnce,
3339
..Default::default()
3440
};
3541

3642
let cx = SinkContext::default();
37-
let (sink, healthcheck) = cnf.build(cx).await.expect("Cannot build the sink");
43+
let (sink, healthcheck) = config.build(cx).await.expect("Cannot build the sink");
3844
healthcheck.await.expect("Health check failed");
3945

4046
// prepare consumer

0 commit comments

Comments
 (0)