Link Search Menu Expand Document Edit GitHub

Publish

Messages are published with a topic. The MQTT broker needs the topic to route the message to subscribers. Hence the topic is mandatory for a Publish message (it is the only required property).

    • The blocking API directly returns a Mqtt5PublishResult if publishing was successful.

      Mqtt5PublishResult publishResult = client.publishWith().topic("test/topic").send();
      
    • The asynchronous API returns a CompletableFuture which completes with a Mqtt5PublishResult message if publishing was successful.

      CompletableFuture<Mqtt5PublishResult> publishResultFuture = 
              client.publishWith().topic("test/topic").send();
      
    • The reactive API does not publish a single message, but an asynchronous stream of messages: a Flowable<Mqtt5Publish>. Hence it also returns an asynchronous stream of results: Flowable<Mqtt5PublishResult>. Each published message will cause a result in the returned stream. As the Flowable is a reactive type, the following line does not publish immediately but only after you subscribe to it (in terms of Reactive Streams).

      Flowable<Mqtt5PublishResult> publishResultFlowable = client.publish(
              Flowable.range(0, 100).map(i -> Mqtt5Publish.builder().topic("test/topic" + i).build()));
      

    Depending on the Quality of Service (QoS) of the Publish message the result can be:

    Mqtt5PublishResultQoS 0returned when the message is written to the transport
    Mqtt5Qos1ResultQoS 1returned when the message is acknowledged, contains the Mqtt5PubAck message
    Mqtt5Qos2ResultQoS 2returned when the message is acknowledged, contains the Mqtt5PubRec message
    • If publishing was not successful, it throws:

      Mqtt5PubAckExceptionfor QoS 1 if the PubAck message contained an error code (the PubAck message is contained in the exception)
      Mqtt5PubRecExceptionfor QoS 2 if the PubRec message contained an error code (the PubRec message is contained in the exception)
      ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport
      MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely
      MqttEncodeExceptionif the maximum packet size was exceeded
      MqttClientStateExceptionif the client is not connected and also not reconnecting
    • If publishing was not successful, the CompletableFuture completes exceptionally with:

      Mqtt5PubAckExceptionfor QoS 1 if the PubAck message contained an error code (the PubAck message is contained in the exception)
      Mqtt5PubRecExceptionfor QoS 2 if the PubRec message contained an error code (the PubRec message is contained in the exception)
      ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport
      MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely
      MqttEncodeExceptionif the maximum packet size was exceeded
      MqttClientStateExceptionif the client is not connected and also not reconnecting
    • If publishing a single message was not successful, the corresponding publish result contains an error:

      Mqtt5PubAckExceptionfor QoS 1 if the PubAck message contained an error code (the PubAck message is contained in the exception)
      Mqtt5PubRecExceptionfor QoS 2 if the PubRec message contained an error code (the PubRec message is contained in the exception)
      ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport
      MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely
      MqttEncodeExceptionif the maximum packet size was exceeded

      The result stream will always emit a publish result for every message in the input stream. This means that the result stream only errors completely if the provided message stream errors itself and after all publish results have been received. Similarly, the result stream only completes normally if the provided message stream completes normally itself and after all publish results have been received.

    • The blocking API returns nothing if publishing was successful.

      client.publishWith().topic("test/topic").send();
      

      If publishing was not successful, it throws:

      ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport
      MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely
      MqttEncodeExceptionif the maximum packet size was exceeded
      MqttClientStateExceptionif the client is not connected and also not reconnecting
    • The asynchronous API returns a CompletableFuture which completes with the Mqtt3Publish message (as context) if publishing was successful.

      CompletableFuture<Mqtt3Publish> publishResultFuture = 
              client.publishWith().topic("test/topic").send();
      

      If publishing was not successful, the CompletableFuture completes exceptionally with:

      ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport
      MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely
      MqttEncodeExceptionif the maximum packet size was exceeded
      MqttClientStateExceptionif the client is not connected and also not reconnecting
    • The reactive API does not publish a single message, but an asynchronous stream of messages: a Flowable<Mqtt3Publish>. Hence it also returns an asynchronous stream of results: Flowable<Mqtt3PublishResult>. Each published message will cause a result in the returned stream. As the Flowable is a reactive type, the following line does not publish immediately but only after you subscribe to it (in terms of Reactive Streams).

      Flowable<Mqtt3PublishResult> publishResultFlowable = client.publish(
              Flowable.range(0, 100).map(i -> Mqtt3Publish.builder().topic("test/topic" + i).build()));
      

      If publishing a single message was not successful, the corresponding publish result contains an error:

      ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport
      MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely
      MqttEncodeExceptionif the maximum packet size was exceeded

      The result stream will always emit a publish result for every message in the input stream. This means that the result stream only errors completely if the provided message stream errors itself and after all publish results have been received. Similarly, the result stream only completes normally if the provided message stream completes normally itself and after all publish results have been received.


  • The rest of this section describes all possible properties of a Mqtt5Publish message. They can be set via a fluent builder API.

      • Mqtt5PublishResult publishResult = client.publishWith()
                ... // here you can specify multiple properties which are described below
                .send();
        
      • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
                ... // here you can specify multiple properties which are described below
                .build();
        
        Mqtt5PublishResult publishResult = client.publish(publishMessage);
        
      • CompletableFuture<Mqtt5PublishResult> publishResultFuture = client.publishWith()
                ... // here you can specify multiple properties described below
                .send();
        
      • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
                ... // here you can specify multiple properties described below
                .build();
        
        CompletableFuture<Mqtt5PublishResult> publishResultFuture = client.publish(publishMessage);
        
      Flowable<Mqtt5PublishResult> publishResultFlowable = client.publish(
              Flowable.range(0, 100).map(i -> 
                      Mqtt5Publish.builder()
                              ... // here you can specify multiple properties described below
                              .build()
              ));
      
  • The rest of this section describes all possible properties of a Mqtt3Publish message. They can be set via a fluent builder API.

      • client.publishWith()
                ... // here you can specify multiple properties which are described below
                .send();
        
      • Mqtt3Publish publishMessage = Mqtt3Publish.builder()
                ... // here you can specify multiple properties which are described below
                .build();
        
        Mqtt3PublishResult publishResult = client.publish(publishMessage);
        
      • CompletableFuture<Mqtt3Publish> publishResultFuture = client.publishWith()
                ... // here you can specify multiple properties described below
                .send();
        
      • Mqtt3Publish publishMessage = Mqtt5PuMqtt3Publishblish.builder()
                ... // here you can specify multiple properties described below
                .build();
        
        CompletableFuture<Mqtt3Publish> publishResultFuture = client.publish(publishMessage);
        
      Flowable<Mqtt3PublishResult> publishResultFlowable = client.publish(
              Flowable.range(0, 100).map(i -> 
                      Mqtt3Publish.builder()
                              ... // here you can specify multiple properties described below
                              .build()
              ));
      

Topic

Messages are published with a topic. The MQTT broker needs the topic to route the message to subscribers. Hence the topic is mandatory for a Publish message (it is the only required property). A topic can be hierarchically structured in multiple topic levels (divided by /) enabling easier filtering for subscribers.

  • PropertyValuesDefaultMQTT Specification
    topicString/MqttTopicmandatory3.3.2.1
    • client.publishWith().topic("test/topic")...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic("test/topic")...build();
      
  • PropertyValuesDefaultMQTT Specification
    topicString/MqttTopicmandatory3.3.2.1
    • client.publishWith().topic("test/topic")...;
      
    • Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("test/topic")...build();
      

Payload

The payload of a Publish message carries the actual application data. MQTT is data-agnostic so you can use any format for the payload.

  • PropertyValuesDefaultMQTT Specification
    payloadbyte[]/ByteBuffer-3.3.3
    • client.publishWith()
              .topic("test/topic")
              .payload("hello world".getBytes())
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .payload("hello world".getBytes())
              ...
              .build();
      
  • PropertyValuesDefaultMQTT Specification
    payloadbyte[]/ByteBuffer-3.3.3
    • client.publishWith()
              .topic("test/topic")
              .payload("hello world".getBytes())
              ...;
      
    • Mqtt3Publish publishMessage = Mqtt3Publish.builder()
              .topic("test/topic")
              .payload("hello world".getBytes())
              ...
              .build();
      

Quality of Service (QoS)

The QoS levels ensure different message delivery guarantees in case of connection failures. The QoS level should be chosen based on the use case.

QoS 0AT MOST ONCEMessages are not redelivered after a failure. Some messages may be lost.
QoS 1AT LEAST ONCEMessages are redelivered after a failure if they were not acknowledged by the broker. Some messages may be delivered more than once (initial delivery attempt + redelivery attempt(s)).
QoS 2EXACTLY ONCEMessages are redelivered after a failure if they were not acknowledged by the broker. The broker additionally filters duplicate messages based on message ids.

The trade-off between the QoS levels is lower or higher latency and the amount of state that has to be stored on sender and receiver.

Keep in mind that the MQTT QoS levels cover guarantees between the client and the broker (not directly the subscribers) as MQTT is an asynchronous protocol (which is an advantage because it decouples publishers and subscribers and makes the system more robust and scalable). Different brokers might provide different guarantees for end-to-end communication (especially if they are clustered).

  • PropertyValuesDefaultMQTT Specification
    qosAT_MOST_ONCE
    AT_LEAST_ONCE
    EXACTLY_ONCE
    AT_MOST_ONCE3.3.1.2
    • client.publishWith()
              .topic("test/topic")
              .qos(MqttQos.AT_LEAST_ONCE)
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .qos(MqttQos.AT_LEAST_ONCE)
              ...
              .build();
      
  • PropertyValuesDefaultMQTT Specification
    qosAT_MOST_ONCE
    AT_LEAST_ONCE
    EXACTLY_ONCE
    AT_MOST_ONCE3.3.1.2
    • client.publishWith()
              .topic("test/topic")
              .qos(MqttQos.AT_LEAST_ONCE)
              ...;
      
    • Mqtt3Publish publishMessage = Mqtt3Publish.builder()
              .topic("test/topic")
              .qos(MqttQos.AT_LEAST_ONCE)
              ...
              .build();
      

Retain

The retain flag indicates that the message should be stored at the broker for its topic. New subscribers then get the last retained message on that topic even if they were not connected when it was published.

  • PropertyValuesDefaultMQTT Specification
    retaintrue/falsefalse3.3.1.3
    • client.publishWith()
              .topic("test/topic")
              .retain(true)
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .retain(true)
              ...
              .build();
      
  • PropertyValuesDefaultMQTT Specification
    retaintrue/falsefalse3.3.1.3
    • client.publishWith()
              .topic("test/topic")
              .retain(true)
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .retain(true)
              ...
              .build();
      

  • Message Expiry Interval

    The message expiry interval is the time interval (in seconds) the message will be queued for subscribers.

    PropertyValuesDefaultMQTT Specification
    messageExpiryInterval[0 - 4_294_967_295]-3.3.2.3.3
    • client.publishWith()
              .topic("test/topic")
              .messageExpiryInterval(100)
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .messageExpiryInterval(100)
              ...
              .build();
      

    Message expiry can be disabled (the default) by using the method noMessageExpiry.

    • client.publishWith()
              .topic("test/topic")
              .noMessageExpiry()
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .noMessageExpiry()
              ...
              .build();
      

  • Payload Format Indicator

    To indicate if the payload is text or binary data, you can use the the payload format indicator. Used in conjunction with the content type the payload encoding can be described precisely.

    PropertyValuesDefaultMQTT Specification
    payloadFormatIndicatorUNSPECIFIED
    UTF_8
    -3.3.2.3.2
    • client.publishWith()
              .topic("test/topic")
              .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
              ...
              .build();
      

  • Content Type

    The content type describes the encoding of the payload. It can be any string, but it is recommended to use a MIME type to ensure interoperability.

    PropertyValuesDefaultMQTT Specification
    contentTypeString/MqttUtf8String-3.3.2.3.9
    • client.publishWith()
              .topic("test/topic")
              .contentType("text/plain")
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .contentType("text/plain")
              ...
              .build();
      

  • Response Topic

    Although MQTT is a publish/subscribe protocol, it can be used with a request/response pattern. MQTT’s request/response is different from synchronous request/response (like HTTP) as it has still all MQTT characteristics like asynchronism, decoupling of sender and receiver and 1-to-many communication. Requesting is done by subscribing to a response topic and then publishing to a request topic. The publish includes the response topic so a responder knows to which topic it should publish the response. To correlate request and response (as they are asynchron, multiple responses from different clients are possible), you can use the correlation data.

    PropertyValuesDefaultMQTT Specification
    responseTopicString/MqttTopic-3.3.2.3.5
    • client.publishWith()
              .topic("request/topic")
              .responseTopic("response/topic")
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("request/topic")
              .responseTopic("response/topic")
              ...
              .build();
      

  • Correlation Data

    Correlation data can be used to correlate a request to its response. If it is part of the request message then the responder copies it to the response message.

    PropertyValuesDefaultMQTT Specification
    correlationDatabyte[]/ByteBuffer-3.3.2.3.6
    • client.publishWith()
              .topic("request/topic")
              .responseTopic("response/topic")
              .correlationData("1234".getBytes())
              ...;
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("request/topic")
              .responseTopic("response/topic")
              .correlationData("1234".getBytes())
              ...
              .build();
      

  • User Properties

    User Properties are user defined name and value pairs which are sent with the Mqtt5Publish message.

    MethodValuesMQTT Specification
    userProperties.addString, String
    MqttUtf8String, MqttUtf8String
    Mqtt5UserProperty
    3.3.2.3.7
    • client.publishWith()
              .topic("test/topic")
              .userProperties()
                  .add("name1", "value1")
                  .add(Mqtt5UserProperty.of("name2", "value2"))
                  .applyUserProperties()
              ...
      
    • Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .userProperties()
                  .add("name1", "value1")
                  .add(Mqtt5UserProperty.of("name2", "value2"))
                  .applyUserProperties()
              ...
              .build();
      

      You can also prebuild the Mqtt5UserProperties.

      Mqtt5UserProperties userProperties = Mqtt5UserProperties.builder()
              .add("name1", "value1")
              .add(Mqtt5UserProperty.of("name2", "value2"))
              .build();
      
      Mqtt5Publish publishMessage = Mqtt5Publish.builder()
              .topic("test/topic")
              .userProperties(userProperties)
              ...
              .build();