Description
Bug Description
Current Situation
When a QoS 2 PUBLISH
is received, the message is stored internally, and a PUBREC
is sent. Once the broker receives PUBREC
, message ownership is transferred to the client.
When the client receives a subsequent PUBREL
, the on_message(..)
callback is executed. If on_message(..)
completes successfully or a manual acknowledgment is performed, a PUBCOMP
is sent.
Problem
If the client disconnects and reconnects, the broker has already sent PUBREC
and cannot resend the PUBLISH
. If a new client instance is created instead, the broker still cannot resend the PUBLISH
, causing message loss.
For the broker, a reconnection and a new client instance are indistinguishable. However, the client can differentiate between them based on session state. These scenarios should be handled separately.
Expected Behavior
The client should adhere to the message delivery retry specification. This requires retaining the following information after a connection loss:
- Unacknowledged PUBLISH: If a
PUBLISH
has been received and passed to the handler but is not yet acknowledged, it must be reprocessed if the client restarts. - Outgoing
PUBREC
Packets: If aPUBREC
has been sent, the message was already acknowledged and must not be reprocessed. Otherwise, this would violate the exactly-once guarantee.
Advantages
This approach ensures that events can react to PUBREL
appropriately, such as committing processed data.
Messages will be properly retransmitted, and with a commit mechanism or PUBREL
, no messages should be lost.
Additionally, clients can distinguish between:
- Recoverable errors: Resolvable by reconnection. The message should not be reprocessed.
- Application errors: Require a new client instance. The message should be reprocessed.
Note
Under this logic, the PUBLISH
message itself is not part of the session state. Instead, handle_publish
should dispatch the message to the handler and retain only the message ID.
Reproduction
This is the log:
Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'd11461cb-2fd4-40fd-8cba-e51fda4334a1' properties=[SessionExpiryInterval : 120]
Received CONNACK (0, Success) properties=[ReceiveMaximum : 20, TopicAliasMaximum : 10]
Sending SUBSCRIBE (d0, m1) [(b'236a73a0-e388-48c1-bb7e-e221189d8b3a', {QoS=2, noLocal=False, retainAsPublished=False, retainHandling=0})]
Received SUBACK
Received PUBLISH (d0, q2, r0, m1), '236a73a0-e388-48c1-bb7e-e221189d8b3a', properties=[], ... (3 bytes)
Sending PUBREC (Mid: 1)
Received PUBREL (Mid: 1)
Caught exception in on_message: Unable to ack: Processing failed, msg = b'msg', qos = 2, topic = 236a73a0-e388-48c1-bb7e-e221189d8b3a
Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'd11461cb-2fd4-40fd-8cba-e51fda4334a1' properties=[SessionExpiryInterval : 120]
Received CONNACK (1, Success) properties=[ReceiveMaximum : 20, TopicAliasMaximum : 10]
Received PUBREL (Mid: 1)
Sending PUBCOMP (Mid: 1)
that is generated by this example:
import paho.mqtt.client as mqtt
from paho.mqtt import publish
import threading
import uuid
import time
host = "127.0.0.1"
client_id = str(uuid.uuid4())
topic = str(uuid.uuid4())
qos = 2
def new_client():
def on_message(client, userdata, msg: mqtt.MQTTMessage):
raise Exception(f"Unable to ack: Processing failed, msg = {msg.payload}, qos = {msg.qos}, topic = {msg.topic}")
def on_connect(client, userdata, flags, reason_code, properties):
if not flags.session_present:
client.subscribe(topic, qos=qos)
client = mqtt.Client(protocol=mqtt.MQTTv5, callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
client.on_message = on_message
client.on_connect = on_connect
client.on_log = lambda a, b, c, msg: print(msg)
properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
properties.SessionExpiryInterval = 120
client.connect(host=host, clean_start=False, properties=properties)
return client
def publish_msg():
time.sleep(1)
publish.single(topic, "msg", qos=qos, hostname=host)
try:
threading.Thread(target=publish_msg).start()
new_client().loop_forever()
except Exception:
...
try:
new_client().loop_forever()
except Exception:
print("THIS WILL NEVER HAPPEN")
The docker-compose.yml
:
services:
mosquitto:
image: eclipse-mosquitto
ports:
- "1883:1883"
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf
The mosquitto.conf
:
persistence false
allow_anonymous true
connection_messages true
log_type all
listener 1883
Environment
- Python version: 3.11, 3.13
- Library version: 2.1.0
- Operating system (including version): Linux
- MQTT server (name, version, configuration, hosting details): mosquitto:latest
Logs
For many issues, especially when you cannot provide code to replicate the issue, it's helpful to include logs. Please
consider including:
- library logs; see the readme and logger example.
- broker logs (availability will depend the server in use)