Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/app/mqtt' into bsp/phytium_pi
Browse files Browse the repository at this point in the history
  • Loading branch information
ZR233 committed Jan 7, 2025
2 parents 0b1f95a + 32a80f6 commit 0fc4765
Show file tree
Hide file tree
Showing 14 changed files with 1,202 additions and 0 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ members = [
"examples/httpserver",
"examples/httpserver",
"examples/shell",
"examples/mqttclient", "examples/mqtt-lite",
]

[workspace.package]
Expand Down
16 changes: 16 additions & 0 deletions examples/mqtt-lite/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "mqtt-lite"
edition = "2021"
version.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
documentation.workspace = true
repository.workspace = true
keywords.workspace = true
categories.workspace = true


[dependencies]
bitflags = "2.6"
bytes = { version = "1.9", default-features = false }
57 changes: 57 additions & 0 deletions examples/mqtt-lite/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#![cfg_attr(not(test), no_std)]

extern crate alloc;

mod packet;

use core::slice::Iter;

pub use packet::{connack::ConnAck, connect::Connect, Packet, publish::Publish, publish::Payload};

pub use packet::{Reader, ToBytes};

#[derive(Debug)]
pub enum MqttError {
Disconnected,
Packet(packet::PacketError),
EOF,
Unspecified,
MalformedPacket,
Protocol,
ImplementationSpecific,
UnsupportedProtocolVersion,
ClientIdentifierNotValid,
BadUsernameOrPassword,
NotAuthorized,
ServerUnavailable,
ServerBusy,
Banned,
BadAuthenticationMethod,
TopicNameInvalid,
PacketTooLarge,
QuotaExceeded,
PayloadFormatInvalid,
RetainNotSupported,
QosNotSupported,
UseAnotherServer,
ServerMoved,
ConnectionRateExceeded,
}

pub trait BufRead {
fn read_exact(&mut self, buff: &mut [u8]) -> Result<(), MqttError>;
fn next(&mut self) -> Result<u8, MqttError> {
let mut buf = [0u8];
self.read_exact(&mut buf)?;
Ok(buf[0])
}
}

impl BufRead for Iter<'_, u8> {
fn read_exact(&mut self, buff: &mut [u8]) -> Result<(), MqttError> {
for byte in buff.iter_mut() {
*byte = *(Iterator::next(self).ok_or(MqttError::EOF)?);
}
Ok(())
}
}
74 changes: 74 additions & 0 deletions examples/mqtt-lite/src/packet/connack.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use alloc::vec::Vec;

use crate::MqttError;

use super::{data::Bits, PacketError, Reader, ToBytes};

#[derive(Default, Debug)]
pub struct ConnAck {
pub session_present: bool,
pub err: Option<MqttError>,
}

impl Reader for ConnAck {
fn read(&mut self, buff: &mut impl crate::BufRead) -> Result<(), crate::MqttError> {
let mut flags = Bits::default();
flags.read(buff)?;

self.session_present = flags.raw() & 0b1 != 0;

let mut connect_reason = Bits::default();
connect_reason.read(buff)?;
let connect_reason_code_raw = connect_reason.raw();
if connect_reason_code_raw != 0 {
self.err = Some(connect_reason_code_raw.try_into()?);
}

Ok(())
}
}

impl ToBytes for ConnAck {
fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::new();

bytes
}
}

impl TryFrom<u8> for MqttError {
type Error = MqttError;

fn try_from(value: u8) -> Result<Self, Self::Error> {
let v = match value {
0x80 => MqttError::Unspecified,
0x81 => MqttError::MalformedPacket,
0x82 => MqttError::Protocol,
0x83 => MqttError::ImplementationSpecific,
0x84 => MqttError::UnsupportedProtocolVersion,
0x85 => MqttError::ClientIdentifierNotValid,
0x86 => MqttError::BadUsernameOrPassword,
0x87 => MqttError::NotAuthorized,
0x88 => MqttError::ServerUnavailable,
0x89 => MqttError::ServerBusy,
0x8a => MqttError::Banned,
0x8c => MqttError::BadAuthenticationMethod,
0x90 => MqttError::TopicNameInvalid,
0x95 => MqttError::PacketTooLarge,
0x97 => MqttError::QuotaExceeded,
0x99 => MqttError::PayloadFormatInvalid,
0x9a => MqttError::RetainNotSupported,
0x9b => MqttError::QosNotSupported,
0x9c => MqttError::UseAnotherServer,
0x9d => MqttError::ServerMoved,
0x9f => MqttError::ConnectionRateExceeded,
_ => {
return Err(MqttError::Packet(PacketError::Read(
"unknown connack reason code",
)))
}
};

Ok(v)
}
}
134 changes: 134 additions & 0 deletions examples/mqtt-lite/src/packet/connect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use core::time::Duration;

use alloc::{string::String, vec::Vec};

use super::{
header::{ControlPakcetType, FixHeader}, property::Property, Reader, ToBytes
};

pub struct Connect {
pub protocol_level: u8,
pub client_id: String,
pub clean_start: bool,
pub user_name: Option<String>,
pub password: Option<String>,
pub keep_alive: Duration,
pub session_expiry_interval_sec: u32,
}
impl Default for Connect {
fn default() -> Self {
Self {
protocol_level: 5,
clean_start: Default::default(),
user_name: Default::default(),
password: Default::default(),
keep_alive: Default::default(),
session_expiry_interval_sec: Default::default(),
client_id: Default::default(),
}
}
}

impl Connect {
pub fn new(id: impl Into<String>) -> Self {
Self {
client_id: id.into(),
protocol_level: 5,
..Default::default()
}
}
}

impl ToBytes for Connect {
fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend(ProtocolName::to_bytes());
buf.push(self.protocol_level);

let mut flag = ConnectFlags::empty();

if self.clean_start {
flag |= ConnectFlags::CLEAN_START;
}
if self.user_name.is_some() {
flag |= ConnectFlags::USER_NAME;
}
if self.password.is_some() {
flag |= ConnectFlags::PASSWORD;
}

buf.push(flag.bits());

let keep_alive = self.keep_alive.as_secs() as u16;
buf.extend(keep_alive.to_be_bytes());

let properties = alloc::vec![Property::SessionExpiryInterval(
self.session_expiry_interval_sec
),];

buf.extend((&properties[..]).to_bytes());

// Payload

let client_id = self.client_id.to_bytes();
buf.extend(client_id);

//TODO: will properties

//TODO: will topic

//TODO: will payload

if let Some(user_name) = self.user_name.as_ref() {
buf.extend(user_name.to_bytes());
}

if let Some(password) = self.password.as_ref() {
buf.extend(password.to_bytes());
}

buf
}
}


impl Reader for Connect {
fn read(&mut self, buff: &mut impl crate::BufRead) -> Result<(), crate::MqttError> {
todo!()
}
}

struct ProtocolName {}

impl ProtocolName {
fn to_bytes() -> [u8; 6] {
[0, 4, b'M', b'Q', b'T', b'T']
}
}

bitflags::bitflags! {
pub struct ConnectFlags: u8 {
const USER_NAME = 1 << 7;
const PASSWORD = 1 << 6;
const WILL_RETAIN = 1 << 5;
const WILL_QOS = 1 << 3;
const WILL_FLAG = 1 << 2;
const CLEAN_START = 1 << 1;
const RESERVED = 1;
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_connect() {
let mut connect = Connect::default();
connect.clean_start = true;
connect.keep_alive = Duration::from_secs(10);
connect.session_expiry_interval_sec = 10;
let bytes = connect.to_bytes();
println!("{:?}", bytes);
}
}
Loading

0 comments on commit 0fc4765

Please sign in to comment.