Content-Length: 293453 | pFad | http://github.com/eclipse-paho/paho.mqtt.python/issues/883

85 QoS 2 messages loss on new instance · Issue #883 · eclipse-paho/paho.mqtt.python · GitHub
Skip to content

QoS 2 messages loss on new instance #883

Open
@rubenbaer

Description

@rubenbaer

Bug Description

Current Situation

When a QoS 2 PUBLISH is received, the message is stored internally, and a PUBREC is sent. Once the broker receives PUBREC, message ownership is transferred to the client.
When the client receives a subsequent PUBREL, the on_message(..) callback is executed. If on_message(..) completes successfully or a manual acknowledgment is performed, a PUBCOMP is sent.

Problem

If the client disconnects and reconnects, the broker has already sent PUBREC and cannot resend the PUBLISH. If a new client instance is created instead, the broker still cannot resend the PUBLISH, causing message loss.

For the broker, a reconnection and a new client instance are indistinguishable. However, the client can differentiate between them based on session state. These scenarios should be handled separately.

Expected Behavior

The client should adhere to the message delivery retry specification. This requires retaining the following information after a connection loss:

  • Unacknowledged PUBLISH: If a PUBLISH has been received and passed to the handler but is not yet acknowledged, it must be reprocessed if the client restarts.
  • Outgoing PUBREC Packets: If a PUBREC has been sent, the message was already acknowledged and must not be reprocessed. Otherwise, this would violate the exactly-once guarantee.

Advantages

This approach ensures that events can react to PUBREL appropriately, such as committing processed data.
Messages will be properly retransmitted, and with a commit mechanism or PUBREL, no messages should be lost.

Additionally, clients can distinguish between:

  • Recoverable errors: Resolvable by reconnection. The message should not be reprocessed.
  • Application errors: Require a new client instance. The message should be reprocessed.

Note

Under this logic, the PUBLISH message itself is not part of the session state. Instead, handle_publish should dispatch the message to the handler and retain only the message ID.

Reproduction

This is the log:

Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'd11461cb-2fd4-40fd-8cba-e51fda4334a1' properties=[SessionExpiryInterval : 120]
Received CONNACK (0, Success) properties=[ReceiveMaximum : 20, TopicAliasMaximum : 10]
Sending SUBSCRIBE (d0, m1) [(b'236a73a0-e388-48c1-bb7e-e221189d8b3a', {QoS=2, noLocal=False, retainAsPublished=False, retainHandling=0})]
Received SUBACK
Received PUBLISH (d0, q2, r0, m1), '236a73a0-e388-48c1-bb7e-e221189d8b3a', properties=[], ...  (3 bytes)
Sending PUBREC (Mid: 1)
Received PUBREL (Mid: 1)
Caught exception in on_message: Unable to ack: Processing failed, msg = b'msg', qos = 2, topic = 236a73a0-e388-48c1-bb7e-e221189d8b3a
Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'd11461cb-2fd4-40fd-8cba-e51fda4334a1' properties=[SessionExpiryInterval : 120]
Received CONNACK (1, Success) properties=[ReceiveMaximum : 20, TopicAliasMaximum : 10]
Received PUBREL (Mid: 1)
Sending PUBCOMP (Mid: 1)

that is generated by this example:

import paho.mqtt.client as mqtt
from paho.mqtt import publish
import threading
import uuid
import time

host = "127.0.0.1"
client_id = str(uuid.uuid4())
topic = str(uuid.uuid4())
qos = 2

def new_client():
    def on_message(client, userdata, msg: mqtt.MQTTMessage):
        raise Exception(f"Unable to ack: Processing failed, msg = {msg.payload}, qos = {msg.qos}, topic = {msg.topic}")

    def on_connect(client, userdata, flags, reason_code, properties):
        if not flags.session_present:
            client.subscribe(topic, qos=qos)

    client = mqtt.Client(protocol=mqtt.MQTTv5, callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
    client.on_message = on_message
    client.on_connect = on_connect
    client.on_log = lambda a, b, c, msg: print(msg)

    properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
    properties.SessionExpiryInterval = 120

    client.connect(host=host, clean_start=False, properties=properties)

    return client

def publish_msg():
    time.sleep(1)
    publish.single(topic, "msg", qos=qos, hostname=host)

try:
    threading.Thread(target=publish_msg).start()
    new_client().loop_forever()
except Exception:
    ...

try:
    new_client().loop_forever()
except Exception:
    print("THIS WILL NEVER HAPPEN")

The docker-compose.yml:

services:
  mosquitto:
    image: eclipse-mosquitto
    ports:
      - "1883:1883"
    volumes:
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf

The mosquitto.conf:

persistence false
allow_anonymous true
connection_messages true
log_type all
listener 1883

Environment

  • Python version: 3.11, 3.13
  • Library version: 2.1.0
  • Operating system (including version): Linux
  • MQTT server (name, version, configuration, hosting details): mosquitto:latest

Logs

For many issues, especially when you cannot provide code to replicate the issue, it's helpful to include logs. Please
consider including:

Metadata

Metadata

Assignees

No one assigned

    Labels

    Status: AvailableNo one has claimed responsibility for resolving this issue.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions









      ApplySandwichStrip

      pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


      --- a PPN by Garber Painting Akron. With Image Size Reduction included!

      Fetched URL: http://github.com/eclipse-paho/paho.mqtt.python/issues/883

      Alternative Proxies:

      Alternative Proxy

      pFad Proxy

      pFad v3 Proxy

      pFad v4 Proxy