@@ -115,39 +115,51 @@ def ping(self):
115
115
self .sock .write (b"\xc0 \0 " )
116
116
117
117
def publish (self , topic , msg , retain = False , qos = 0 ):
118
- pkt = bytearray (b"\x30 \0 \0 \0 " )
119
- pkt [0 ] |= qos << 1 | retain
118
+ print (
119
+ f"Preparing to publish: topic={ topic } , msg={ msg } , retain={ retain } , qos={ qos } "
120
+ )
121
+
122
+ # Encode the topic and message in UTF-8
123
+ topic = topic .encode ("utf-8" )
124
+ msg = msg .encode ("utf-8" )
125
+
126
+ # Calculate the size of the message
120
127
sz = 2 + len (topic ) + len (msg )
121
128
if qos > 0 :
122
129
sz += 2
123
- assert sz < 2097152
130
+
131
+ assert sz < 2097152 # MQTT supports a maximum of 2MB messages
132
+ print (f"Calculated message size: { sz } " )
133
+
134
+ # Create the packet header
135
+ pkt = bytearray (5 ) # Header can be up to 5 bytes
136
+ pkt [0 ] = 0x30 | (qos << 1 ) | retain # Message type (PUBLISH)
124
137
i = 1
125
- while sz > 0x7F :
138
+ while sz > 0x7F : # Multi-byte length encoding
126
139
pkt [i ] = (sz & 0x7F ) | 0x80
127
140
sz >>= 7
128
141
i += 1
129
142
pkt [i ] = sz
130
- # print(hex(len(pkt)), hexlify(pkt, ":"))
131
- self .sock .write (pkt , i + 1 )
132
- self ._send_str (topic )
133
- if qos > 0 :
134
- self .pid += 1
135
- pid = self .pid
136
- struct .pack_into ("!H" , pkt , 0 , pid )
137
- self .sock .write (pkt , 2 )
138
- self .sock .write (msg )
143
+
144
+ # Send the header and data
145
+ self .sock .write (pkt [: i + 1 ]) # Header
146
+ self ._send_str (topic ) # Topic
147
+ self .sock .write (msg ) # Message
148
+ print (f"Message sent: { msg .decode ('utf-8' )} " )
149
+
150
+ # QoS handling
139
151
if qos == 1 :
140
- while 1 :
152
+ while True :
141
153
op = self .wait_msg ()
142
154
if op == 0x40 :
143
155
sz = self .sock .read (1 )
144
156
assert sz == b"\x02 "
145
157
rcv_pid = self .sock .read (2 )
146
158
rcv_pid = rcv_pid [0 ] << 8 | rcv_pid [1 ]
147
- if pid == rcv_pid :
159
+ if self . pid == rcv_pid :
148
160
return
149
161
elif qos == 2 :
150
- assert 0
162
+ raise NotImplementedError ( "QoS level 2 not implemented" )
151
163
152
164
def subscribe (self , topic , qos = 0 ):
153
165
assert self .cb is not None , "Subscribe callback is not set"
0 commit comments