Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/broker-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ jobs:
wolfmqtt_opts: "--enable-broker --enable-tls --enable-websocket"
extra_deps: "libwebsockets-dev"
wolfssl_opts: "--enable-opensslcoexist --enable-enckeys"
# Maximum-QoS matrix. WOLFMQTT_MAX_QOS=2 is the default and runs
# the full broker.test. =1 and =0 caps compile out the QoS 2
# state machine; broker.test exercises QoS 2 pub/sub (tests 3
# and 11) which is intentionally rejected on capped builds, so
# those entries are build-only.
- name: "Broker MAX_QOS=2 (default, full QoS)"
cflags: ""
wolfmqtt_opts: "--enable-v5 --enable-broker --enable-max-qos=2"
- name: "Broker MAX_QOS=1 (build only)"
cflags: ""
wolfmqtt_opts: "--enable-v5 --enable-broker --enable-max-qos=1"
skip_broker_test: "yes"
Comment on lines +58 to +61
- name: "Broker MAX_QOS=0 (build only)"
cflags: ""
wolfmqtt_opts: "--enable-v5 --enable-broker --enable-max-qos=0"
skip_broker_test: "yes"

steps:
- name: Install dependencies
Expand Down Expand Up @@ -82,6 +98,7 @@ jobs:
run: make

- name: "Run broker tests (${{ matrix.name }})"
if: matrix.skip_broker_test != 'yes'
run: ./scripts/broker.test

- name: Show logs on failure
Expand Down
20 changes: 18 additions & 2 deletions .github/workflows/cmake-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ jobs:

runs-on: ubuntu-22.04

strategy:
fail-fast: false
matrix:
# Smoke-build the CMake project under each WOLFMQTT_MAX_QOS value
# to keep the build-option plumbing exercised. "" means leave the
# cache value at its 2 default.
Comment on lines +17 to +19
include:
- name: "CMake default (MAX_QOS unset)"
cmake_opts: ""
- name: "CMake MAX_QOS=2"
cmake_opts: "-DWOLFMQTT_V5=yes -DWOLFMQTT_BROKER=yes -DWOLFMQTT_MAX_QOS=2"
- name: "CMake MAX_QOS=1"
cmake_opts: "-DWOLFMQTT_V5=yes -DWOLFMQTT_BROKER=yes -DWOLFMQTT_MAX_QOS=1"
- name: "CMake MAX_QOS=0"
cmake_opts: "-DWOLFMQTT_V5=yes -DWOLFMQTT_BROKER=yes -DWOLFMQTT_MAX_QOS=0"

steps:
# Install cmake
- name: Install cmake
Expand All @@ -36,9 +52,9 @@ jobs:
- uses: actions/checkout@master

#build wolfMQTT
- name: Build wolfMQTT
- name: "Build wolfMQTT (${{ matrix.name }})"
run: |
mkdir build
cd build
cmake ..
cmake ${{ matrix.cmake_opts }} ..
cmake --build .
13 changes: 13 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,19 @@ endif()
add_option(WOLFMQTT_DISCB
"Enable disconnect callback"
"yes" "yes;no")

# Maximum QoS supported (compile-time cap for both client and broker).
# Default 2 = full QoS support. 1 or 0 compiles out the broker's QoS 2
# state machine and the client's initial max_qos clamp, and causes the
# broker to advertise v5 MQTT_PROP_MAX_QOS in CONNACK.
set(WOLFMQTT_MAX_QOS "2" CACHE STRING
"Maximum QoS supported by client and broker (0, 1, or 2)")
set_property(CACHE WOLFMQTT_MAX_QOS PROPERTY STRINGS "0;1;2")
if (NOT WOLFMQTT_MAX_QOS MATCHES "^[012]$")
message(SEND_ERROR
"WOLFMQTT_MAX_QOS must be 0, 1, or 2 (got: ${WOLFMQTT_MAX_QOS})")
endif()
list(APPEND WOLFMQTT_DEFINITIONS "-DWOLFMQTT_MAX_QOS=${WOLFMQTT_MAX_QOS}")
if (WOLFMQTT_DISCB)
list(APPEND WOLFMQTT_DEFINITIONS "-DWOLFMQTT_DISCONNECT_CB")
endif()
Expand Down
21 changes: 21 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,26 @@ then
AM_CFLAGS="$AM_CFLAGS -DWOLFMQTT_DISCONNECT_CB"
fi

# Maximum QoS supported (compile-time cap for both client and broker).
# Values: 0, 1, 2. Default 2 (full QoS support). Capping at 1 (or 0)
# compiles out the broker's QoS 2 state machine and the client's initial
# max_qos clamp, saves ~2.5 KB of broker .text, and makes the broker
# advertise the v5 MQTT_PROP_MAX_QOS property in CONNACK.
AC_ARG_ENABLE([max-qos],
[AS_HELP_STRING([--enable-max-qos@<:@=0|1|2@:>@],
[Maximum QoS supported by client and broker (default: 2)])],
[ ENABLED_MAX_QOS=$enableval ],
[ ENABLED_MAX_QOS=2 ]
)
case "x$ENABLED_MAX_QOS" in
"x0"|"x1"|"x2")
AM_CFLAGS="$AM_CFLAGS -DWOLFMQTT_MAX_QOS=$ENABLED_MAX_QOS"
;;
*)
AC_MSG_ERROR([--enable-max-qos must be 0, 1, or 2 (got: $ENABLED_MAX_QOS)])
;;
esac

# Multithread support
AC_ARG_ENABLE([mt],
[AS_HELP_STRING([--enable-mt],[Enable multiple thread support (default: disabled)])],
Expand Down Expand Up @@ -611,6 +631,7 @@ echo " * Linker Flags: $LDFLAGS"
echo " * LIB Flags: $LIB"

echo " * Disconnect Callback: $ENABLED_DISCB"
echo " * Maximum QoS: $ENABLED_MAX_QOS"
echo " * Error Strings: $ENABLED_ERROR_STRINGS"
echo " * Enable MQTT-SN: $ENABLED_SN"
echo " * Enable MQTT v5.0: $ENABLED_MQTTV50"
Expand Down
7 changes: 5 additions & 2 deletions scripts/broker.test
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,13 @@ if [ "$has_v5" = "yes" ]; then
>"${TMP_DIR}/t12.log" 2>&1
T12_RC=$?

# 12b: Verify CONNACK server properties were received
# 12b: Verify CONNACK server properties were received.
# Type 37 = Retain Available, Type 40 = Wildcard Subscription Available.
# Maximum QoS (Type 36) is intentionally omitted per [MQTT-3.2.2.3.4]
# (absence signals QoS 2 support; emitting Max QoS=2 is a Protocol Error).
T12_PROPS=yes
grep -q "Property CB: Type 37" "${TMP_DIR}/t12.log" 2>/dev/null || T12_PROPS=no
grep -q "Property CB: Type 36" "${TMP_DIR}/t12.log" 2>/dev/null || T12_PROPS=no
grep -q "Property CB: Type 40" "${TMP_DIR}/t12.log" 2>/dev/null || T12_PROPS=no

# 12c: v5 pub/sub with separate clients (property forwarding)
start_broker
Expand Down
71 changes: 63 additions & 8 deletions src/mqtt_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include <config.h>
#endif

#include "wolfmqtt/mqtt_types.h"
#include "wolfmqtt/mqtt_broker.h"
#include "wolfmqtt/mqtt_types.h"
Comment on lines 27 to +28
#include "wolfmqtt/mqtt_client.h"
#include "wolfmqtt/mqtt_packet.h"
#include "wolfmqtt/mqtt_socket.h"
Expand Down Expand Up @@ -1278,8 +1278,13 @@ static int BrokerNetDisconnect(void* context)
/* must NOT be re-delivered to subscribers. The state is per-client and is */
/* cleared on disconnect; surviving across reconnect would require the */
/* broader session-state work (see #485/#489/#494). */
/* */
/* The entire QoS 2 inbound state and PUBREL/PUBREC/PUBCOMP handling is */
/* compiled out when WOLFMQTT_MAX_QOS < 2. Subscribe-grant capping and */
/* inbound-publish QoS rejection cover the corresponding wire paths. */
/* -------------------------------------------------------------------------- */

#if WOLFMQTT_MAX_QOS >= 2
/* Returns 1 if packet_id is currently awaiting PUBREL, 0 otherwise. */
static int BrokerInboundQos2_Contains(BrokerClient* bc, word16 packet_id)
{
Expand Down Expand Up @@ -1425,13 +1430,16 @@ static void BrokerInboundQos2_Clear(BrokerClient* bc)
bc->qos2_pending_count = 0;
#endif
}
#endif /* WOLFMQTT_MAX_QOS >= 2 */

static void BrokerClient_Free(BrokerClient* bc)
{
if (bc == NULL) {
return;
}
#if WOLFMQTT_MAX_QOS >= 2
BrokerInboundQos2_Clear(bc);
#endif

#ifdef ENABLE_MQTT_WEBSOCKET
if (bc->ws_ctx != NULL) {
Expand Down Expand Up @@ -3264,7 +3272,12 @@ static int BrokerHandle_Connect(BrokerClient* bc, int rx_len,
#endif
bc->will_payload_len = wp_len;
}
bc->will_qos = mc.lwt_msg->qos;
/* Clamp will QoS to this build's Maximum QoS. A v5 client that
* sent Will QoS > advertised Max QoS would already be in
* Protocol Error territory, but for v3.1.1 (no advertisement)
* we silently downgrade rather than rejecting CONNECT. */
Comment on lines +3276 to +3278
bc->will_qos = (mc.lwt_msg->qos > WOLFMQTT_MAX_QOS) ?
(MqttQoS)WOLFMQTT_MAX_QOS : mc.lwt_msg->qos;
bc->will_retain = mc.lwt_msg->retain;
bc->will_delay_sec = 0;
#ifdef WOLFMQTT_V5
Expand Down Expand Up @@ -3445,11 +3458,18 @@ static int BrokerHandle_Connect(BrokerClient* bc, int rx_len,
prop->data_byte = 0;
#endif
}
/* [MQTT-3.2.2.3.4] Maximum QoS property MUST be 0 or 1. Absence
* of the property signals server supports Maximum QoS 2. Emitting
* Maximum QoS = 2 is a Protocol Error and strict v5 clients (e.g.
* mosquitto) will disconnect on receipt. Emit the property only
* when this build caps below QoS 2 via WOLFMQTT_MAX_QOS. */
#if WOLFMQTT_MAX_QOS < 2
prop = MqttProps_Add(&ack.props);
if (prop != NULL) {
prop->type = MQTT_PROP_MAX_QOS;
prop->data_byte = MQTT_QOS_2;
prop->data_byte = (byte)WOLFMQTT_MAX_QOS;
}
#endif
Comment on lines +3461 to +3472
}
#endif

Expand Down Expand Up @@ -3517,9 +3537,10 @@ static int BrokerHandle_Subscribe(BrokerClient* bc, int rx_len,
MqttQoS topic_qos = sub.topics[i].qos;
MqttQoS granted_qos;

/* Cap at QoS 2 */
if (topic_qos > MQTT_QOS_2) {
topic_qos = MQTT_QOS_2;
/* [MQTT-3.8.4-7] / [MQTT-3.9.3]: subscribe grant capped at the
* build's Maximum QoS. Default is QoS 2. */
if (topic_qos > WOLFMQTT_MAX_QOS) {
topic_qos = (MqttQoS)WOLFMQTT_MAX_QOS;
}
granted_qos = topic_qos;

Expand Down Expand Up @@ -3662,7 +3683,9 @@ static int BrokerHandle_Publish(BrokerClient* bc, int rx_len,
MqttPublishResp resp;
byte* payload = NULL;
char* topic = NULL;
#if WOLFMQTT_MAX_QOS >= 2
int qos2_duplicate = 0;
#endif
#ifdef WOLFMQTT_STATIC_MEMORY
char topic_buf[BROKER_MAX_TOPIC_LEN];
#endif
Expand All @@ -3684,6 +3707,26 @@ static int BrokerHandle_Publish(BrokerClient* bc, int rx_len,
* MALFORMED_DATA before reaching this handler. The broker no longer
* needs a per-handler scan. */

#if WOLFMQTT_MAX_QOS < 2
/* [MQTT-3.2.2.3.4] / [MQTT-3.3.4]: this build advertised Maximum QoS
* below 2. A client publishing at QoS > our cap is a Protocol Error;
* v5 spec wants reason 0x9B QoS Not Supported. v3 has no reason code
* field, so we just abnormally close. */
if (pub.qos > WOLFMQTT_MAX_QOS) {
WBLOG_ERR(broker,
"broker: PUBLISH QoS %d exceeds WOLFMQTT_MAX_QOS=%d sock=%d",
pub.qos, WOLFMQTT_MAX_QOS, (int)bc->sock);
#ifdef WOLFMQTT_V5
if (bc->protocol_level >= MQTT_CONNECT_PROTOCOL_LEVEL_5) {
(void)BrokerSend_Disconnect(bc, MQTT_REASON_QOS_NOT_SUPPORTED);
}
#endif
rc = MQTT_CODE_ERROR_MALFORMED_DATA;
goto publish_cleanup;
}
Comment on lines +3715 to +3726
#endif /* WOLFMQTT_MAX_QOS < 2 */

#if WOLFMQTT_MAX_QOS >= 2
/* [MQTT-4.3.3] QoS 2 duplicate detection. If we already PUBREC'd this
* packet_id and are still waiting for PUBREL, treat the inbound PUBLISH
* as a retransmission: send another PUBREC but DO NOT re-deliver the
Expand Down Expand Up @@ -3722,6 +3765,7 @@ static int BrokerHandle_Publish(BrokerClient* bc, int rx_len,
}
}
}
#endif /* WOLFMQTT_MAX_QOS >= 2 */

/* Create null-terminated topic copy for matching/logging */
if (pub.topic_name && pub.topic_name_len > 0) {
Expand Down Expand Up @@ -3749,7 +3793,11 @@ static int BrokerHandle_Publish(BrokerClient* bc, int rx_len,
#ifdef WOLFMQTT_BROKER_RETAINED
/* Handle retained messages - skipped for QoS 2 duplicates: the original
* PUBLISH already updated the retained store. */
if (!qos2_duplicate && topic != NULL && pub.retain) {
if (
#if WOLFMQTT_MAX_QOS >= 2
!qos2_duplicate &&
#endif
topic != NULL && pub.retain) {
Comment on lines +3796 to +3800
if (pub.total_len == 0) {
BrokerRetained_Delete(broker, topic);
}
Expand Down Expand Up @@ -3778,7 +3826,10 @@ static int BrokerHandle_Publish(BrokerClient* bc, int rx_len,

/* Fan-out is skipped for QoS 2 duplicates: subscribers already received
* the application message from the original PUBLISH ([MQTT-4.3.3]). */
if (!qos2_duplicate &&
if (
#if WOLFMQTT_MAX_QOS >= 2
!qos2_duplicate &&
#endif
topic != NULL && (payload != NULL || pub.total_len == 0)) {
#ifdef WOLFMQTT_STATIC_MEMORY
int i;
Expand Down Expand Up @@ -3883,6 +3934,7 @@ static int BrokerHandle_Publish(BrokerClient* bc, int rx_len,
return rc;
}

#if WOLFMQTT_MAX_QOS >= 2
static int BrokerHandle_PublishRel(BrokerClient* bc, int rx_len)
{
int rc;
Expand Down Expand Up @@ -3957,6 +4009,7 @@ static int BrokerHandle_PublishRec(BrokerClient* bc, int rx_len)
}
return rc;
}
#endif /* WOLFMQTT_MAX_QOS >= 2 */

/* [MQTT-2.2.2-2] / [MQTT-3.8.1-1] etc.: a malformed packet MUST cause the
* server to close the network connection. Mirrors the read-failure close
Expand Down Expand Up @@ -4129,6 +4182,7 @@ static int BrokerClient_Process(MqttBroker* broker, BrokerClient* bc)
case MQTT_PACKET_TYPE_PUBLISH_ACK:
/* QoS 1 ack from subscriber - delivery complete */
break;
#if WOLFMQTT_MAX_QOS >= 2
case MQTT_PACKET_TYPE_PUBLISH_REC:
{
/* QoS 2 step 2: subscriber sends PUBREC, broker
Expand All @@ -4155,6 +4209,7 @@ static int BrokerClient_Process(MqttBroker* broker, BrokerClient* bc)
/* QoS 2 step 4: subscriber sends PUBCOMP - delivery
* complete */
break;
#endif /* WOLFMQTT_MAX_QOS >= 2 */
case MQTT_PACKET_TYPE_SUBSCRIBE:
{
int s_rc = BrokerHandle_Subscribe(bc, rc, broker);
Expand Down
19 changes: 14 additions & 5 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,15 @@ static void Handle_ConnectAck_Props(MqttClient* client, MqttProp* props)
for (prop = props; prop != NULL; prop = prop->next) {
if (prop->type == MQTT_PROP_MAX_QOS) {
/* MQTT v5 [3.1.2.11.6]: only 0 or 1 are legal. Clamp a
* non-conforming broker value so client-side publish guards
* non-conforming broker value, then narrow against this
* build's WOLFMQTT_MAX_QOS so client-side publish guards
* remain meaningful. */
client->max_qos = (prop->data_byte <= MQTT_QOS_1) ?
byte adv = (prop->data_byte <= MQTT_QOS_1) ?
prop->data_byte : MQTT_QOS_1;
if (adv > WOLFMQTT_MAX_QOS) {
adv = (byte)WOLFMQTT_MAX_QOS;
}
client->max_qos = adv;
}
else if (prop->type == MQTT_PROP_RETAIN_AVAIL) {
/* MQTT v5 [3.1.2.11.5]: only 0 or 1 are legal. */
Expand Down Expand Up @@ -1647,7 +1652,9 @@ int MqttClient_Init(MqttClient *client, MqttNet* net,
client->rx_buf_len = rx_buf_len;
client->cmd_timeout_ms = cmd_timeout_ms;
#ifdef WOLFMQTT_V5
client->max_qos = MQTT_QOS_2;
/* Initialize to this build's Maximum QoS. Handle_Props will narrow
* this if the server advertises a lower MQTT_PROP_MAX_QOS. */
client->max_qos = (MqttQoS)WOLFMQTT_MAX_QOS;
client->retain_avail = 1;
client->protocol_level = MQTT_CONNECT_PROTOCOL_LEVEL;
rc = MqttProps_Init();
Expand Down Expand Up @@ -1757,8 +1764,10 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect)

/* Reset server-supplied session limits so stale values from a
* prior broker do not leak across reconnects. An accepted CONNACK
* will repopulate these in Handle_ConnectAck_Props. */
client->max_qos = MQTT_QOS_2;
* will repopulate these in Handle_ConnectAck_Props. Initialize to
* this build's Maximum QoS so the runtime guard in MqttPublishMsg
* caps publishes even before CONNACK is processed. */
client->max_qos = (MqttQoS)WOLFMQTT_MAX_QOS;
Comment on lines +1767 to +1770
client->retain_avail = 1;
client->packet_sz_max = 0;
#endif
Expand Down
Loading
Loading