Skip to content

Commit 045fd61

Browse files
Copilotithewei
andauthored
Add basic MQTT 5.0 protocol support (#805)
* Initial plan * Add basic MQTT 5.0 protocol support for publish/subscribe - Add empty properties in CONNECT, PUBLISH, SUBSCRIBE, UNSUBSCRIBE packets - Add will properties support in CONNECT packet for MQTT v5 - Skip v5 properties in CONNACK, PUBLISH, SUBACK, UNSUBACK responses - Add mqtt_v5_skip_properties() helper for parsing v5 property sections - Add AUTH packet type (type 15) to mqtt_type_e enum - Add setProtocolVersion() method to C++ MqttClient class - Include hmath.h for varint_decode used in property skipping Co-authored-by: ithewei <26049660+ithewei@users.noreply.github.com> * Rename variable for clarity in mqtt_v5_skip_properties Co-authored-by: ithewei <26049660+ithewei@users.noreply.github.com> * Fix CONNECT comment: will_properties uses varint prefix, not 2-byte Co-authored-by: ithewei <26049660+ithewei@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: ithewei <26049660+ithewei@users.noreply.github.com>
1 parent be92325 commit 045fd61

3 files changed

Lines changed: 92 additions & 4 deletions

File tree

mqtt/mqtt_client.c

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "herr.h"
55
#include "hendian.h"
66
#include "hsocket.h"
7+
#include "hmath.h"
78

89
static unsigned short mqtt_next_mid() {
910
static unsigned short s_mid = 0;
@@ -64,10 +65,27 @@ static void mqtt_send_disconnect(hio_t* io) {
6465
mqtt_send_head(io, MQTT_TYPE_DISCONNECT, 0);
6566
}
6667

68+
// Skip MQTT v5 properties section
69+
// Returns 1 on success, 0 on malformed data
70+
static int mqtt_v5_skip_properties(unsigned char** pp, unsigned char* end) {
71+
unsigned char* p = *pp;
72+
int bytes = end - p;
73+
if (bytes <= 0) return 0;
74+
int prop_len = (int)varint_decode(p, &bytes);
75+
if (bytes <= 0) return 0;
76+
p += bytes; // skip varint bytes
77+
if (p + prop_len > end) return 0;
78+
p += prop_len; // skip properties data
79+
*pp = p;
80+
return 1;
81+
}
82+
6783
/*
6884
* MQTT_TYPE_CONNECT
69-
* 2 + protocol_name + 1 protocol_version + 1 conn_flags + 2 keepalive + 2 + [client_id] +
70-
* [2 + will_topic + 2 + will_payload] +
85+
* 2 + protocol_name + 1 protocol_version + 1 conn_flags + 2 keepalive +
86+
* [v5: properties] +
87+
* 2 + [client_id] +
88+
* [v5: varint + will_properties] + [2 + will_topic + 2 + will_payload] +
7189
* [2 + username] + [2 + password]
7290
*/
7391
static int mqtt_client_login(mqtt_client_t* cli) {
@@ -81,6 +99,10 @@ static int mqtt_client_login(mqtt_client_t* cli) {
8199

82100
// protocol_name_len
83101
len += cli->protocol_version == MQTT_PROTOCOL_V31 ? 6 : 4;
102+
// MQTT v5: connect properties (empty, 1 byte for property length = 0)
103+
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
104+
len += 1;
105+
}
84106
if (*cli->client_id) {
85107
cid_len = strlen(cli->client_id);
86108
} else {
@@ -102,6 +124,10 @@ static int mqtt_client_login(mqtt_client_t* cli) {
102124
if (cli->will->retain) {
103125
conn_flags |= MQTT_CONN_WILL_RETAIN;
104126
}
127+
// MQTT v5: will properties (empty, 1 byte for property length = 0)
128+
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
129+
len += 1;
130+
}
105131
len += 2 + will_topic_len;
106132
len += 2 + will_payload_len;
107133
}
@@ -131,7 +157,6 @@ static int mqtt_client_login(mqtt_client_t* cli) {
131157
unsigned char* p = buf;
132158
int headlen = mqtt_head_pack(&head, p);
133159
p += headlen;
134-
// TODO: Not implement MQTT_PROTOCOL_V5
135160
if (cli->protocol_version == MQTT_PROTOCOL_V31) {
136161
PUSH16(p, 6);
137162
PUSH_N(p, MQTT_PROTOCOL_NAME_v31, 6);
@@ -142,11 +167,19 @@ static int mqtt_client_login(mqtt_client_t* cli) {
142167
PUSH8(p, cli->protocol_version);
143168
PUSH8(p, conn_flags);
144169
PUSH16(p, cli->keepalive);
170+
// MQTT v5: connect properties (empty)
171+
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
172+
PUSH8(p, 0);
173+
}
145174
PUSH16(p, cid_len);
146175
if (cid_len > 0) {
147176
PUSH_N(p, cli->client_id, cid_len);
148177
}
149178
if (conn_flags & MQTT_CONN_HAS_WILL) {
179+
// MQTT v5: will properties (empty)
180+
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
181+
PUSH8(p, 0);
182+
}
150183
PUSH16(p, will_topic_len);
151184
PUSH_N(p, cli->will->topic, will_topic_len);
152185
PUSH16(p, will_payload_len);
@@ -233,6 +266,14 @@ static void on_packet(hio_t* io, void* buf, int len) {
233266
hio_close(io);
234267
return;
235268
}
269+
// MQTT v5: skip connack properties
270+
if (cli->protocol_version == MQTT_PROTOCOL_V5 && p < end) {
271+
if (!mqtt_v5_skip_properties(&p, end)) {
272+
hloge("MQTT CONNACK v5 properties malformed!");
273+
hio_close(io);
274+
return;
275+
}
276+
}
236277
cli->connected = 1;
237278
if (cli->timer) {
238279
htimer_del(cli->timer);
@@ -269,6 +310,14 @@ static void on_packet(hio_t* io, void* buf, int len) {
269310
}
270311
POP16(p, cli->mid);
271312
}
313+
// MQTT v5: skip publish properties
314+
if (cli->protocol_version == MQTT_PROTOCOL_V5 && p < end) {
315+
if (!mqtt_v5_skip_properties(&p, end)) {
316+
hloge("MQTT PUBLISH v5 properties malformed!");
317+
hio_close(io);
318+
return;
319+
}
320+
}
272321
cli->message.payload_len = end - p;
273322
if (cli->message.payload_len > 0) {
274323
// NOTE: Not deep copy
@@ -312,6 +361,14 @@ static void on_packet(hio_t* io, void* buf, int len) {
312361
return;
313362
}
314363
POP16(p, cli->mid);
364+
// MQTT v5: skip suback properties
365+
if (cli->protocol_version == MQTT_PROTOCOL_V5 && p < end) {
366+
if (!mqtt_v5_skip_properties(&p, end)) {
367+
hloge("MQTT SUBACK v5 properties malformed!");
368+
hio_close(io);
369+
return;
370+
}
371+
}
315372
}
316373
break;
317374
// case MQTT_TYPE_UNSUBSCRIBE:
@@ -324,6 +381,14 @@ static void on_packet(hio_t* io, void* buf, int len) {
324381
return;
325382
}
326383
POP16(p, cli->mid);
384+
// MQTT v5: skip unsuback properties
385+
if (cli->protocol_version == MQTT_PROTOCOL_V5 && p < end) {
386+
if (!mqtt_v5_skip_properties(&p, end)) {
387+
hloge("MQTT UNSUBACK v5 properties malformed!");
388+
hio_close(io);
389+
return;
390+
}
391+
}
327392
}
328393
break;
329394
case MQTT_TYPE_PINGREQ:
@@ -541,6 +606,8 @@ int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
541606
int payload_len = msg->payload_len ? msg->payload_len : strlen(msg->payload);
542607
int len = 2 + topic_len + payload_len;
543608
if (msg->qos > 0) len += 2; // mid
609+
// MQTT v5: publish properties (empty)
610+
if (cli->protocol_version == MQTT_PROTOCOL_V5) len += 1;
544611
unsigned short mid = 0;
545612

546613
mqtt_head_t head;
@@ -563,6 +630,10 @@ int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
563630
mid = mqtt_next_mid();
564631
PUSH16(p, mid);
565632
}
633+
// MQTT v5: publish properties (empty)
634+
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
635+
PUSH8(p, 0);
636+
}
566637

567638
hmutex_lock(&cli->mutex_);
568639
// send head + topic + mid
@@ -585,6 +656,8 @@ int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
585656
if (!cli->connected) return -2;
586657
int topic_len = strlen(topic);
587658
int len = 2 + 2 + topic_len + 1;
659+
// MQTT v5: subscribe properties (empty)
660+
if (cli->protocol_version == MQTT_PROTOCOL_V5) len += 1;
588661

589662
mqtt_head_t head;
590663
memset(&head, 0, sizeof(head));
@@ -599,6 +672,10 @@ int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
599672
p += headlen;
600673
unsigned short mid = mqtt_next_mid();
601674
PUSH16(p, mid);
675+
// MQTT v5: subscribe properties (empty)
676+
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
677+
PUSH8(p, 0);
678+
}
602679
PUSH16(p, topic_len);
603680
PUSH_N(p, topic, topic_len);
604681
PUSH8(p, qos & 3);
@@ -613,6 +690,8 @@ int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
613690
if (!cli->connected) return -2;
614691
int topic_len = strlen(topic);
615692
int len = 2 + 2 + topic_len;
693+
// MQTT v5: unsubscribe properties (empty)
694+
if (cli->protocol_version == MQTT_PROTOCOL_V5) len += 1;
616695

617696
mqtt_head_t head;
618697
memset(&head, 0, sizeof(head));
@@ -627,6 +706,10 @@ int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
627706
p += headlen;
628707
unsigned short mid = mqtt_next_mid();
629708
PUSH16(p, mid);
709+
// MQTT v5: unsubscribe properties (empty)
710+
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
711+
PUSH8(p, 0);
712+
}
630713
PUSH16(p, topic_len);
631714
PUSH_N(p, topic, topic_len);
632715
// send head + mid + topic

mqtt/mqtt_client.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ class MqttClient {
172172
mqtt_client_set_id(client, id);
173173
}
174174

175+
void setProtocolVersion(unsigned char version) {
176+
client->protocol_version = version;
177+
}
178+
175179
void setWill(mqtt_message_t* will) {
176180
mqtt_client_set_will(client, will);
177181
}

mqtt/mqtt_protocol.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
#define MQTT_PROTOCOL_V31 3
99
#define MQTT_PROTOCOL_V311 4
10-
#define MQTT_PROTOCOL_V5 5 // Not yet supproted
10+
#define MQTT_PROTOCOL_V5 5
1111

1212
#define MQTT_PROTOCOL_NAME "MQTT"
1313
#define MQTT_PROTOCOL_NAME_v31 "MQIsdp"
@@ -38,6 +38,7 @@ typedef enum {
3838
MQTT_TYPE_PINGREQ = 12,
3939
MQTT_TYPE_PINGRESP = 13,
4040
MQTT_TYPE_DISCONNECT = 14,
41+
MQTT_TYPE_AUTH = 15, // MQTT_PROTOCOL_V5
4142
} mqtt_type_e;
4243

4344
typedef enum {

0 commit comments

Comments
 (0)