Publish
Messages are published with a topic. The MQTT broker needs the topic to route the message to subscribers. Hence the topic is mandatory for a Publish message (it is the only required property).
The blocking API directly returns a
Mqtt5PublishResultif publishing was successful.Mqtt5PublishResult publishResult = client.publishWith().topic("test/topic").send();The asynchronous API returns a
CompletableFuturewhich completes with aMqtt5PublishResultmessage if publishing was successful.CompletableFuture<Mqtt5PublishResult> publishResultFuture = client.publishWith().topic("test/topic").send();The reactive API does not publish a single message, but an asynchronous stream of messages: a
Flowable<Mqtt5Publish>. Hence it also returns an asynchronous stream of results:Flowable<Mqtt5PublishResult>. Each published message will cause a result in the returned stream. As theFlowableis a reactive type, the following line does not publish immediately but only after you subscribe to it (in terms of Reactive Streams).Flowable<Mqtt5PublishResult> publishResultFlowable = client.publish( Flowable.range(0, 100).map(i -> Mqtt5Publish.builder().topic("test/topic" + i).build()));
Depending on the Quality of Service (QoS) of the Publish message the result can be:
Mqtt5PublishResultQoS 0 returned when the message is written to the transport Mqtt5Qos1ResultQoS 1 returned when the message is acknowledged, contains the Mqtt5PubAckmessageMqtt5Qos2ResultQoS 2 returned when the message is acknowledged, contains the Mqtt5PubRecmessageIf publishing was not successful, it throws:
Mqtt5PubAckExceptionfor QoS 1 if the PubAck message contained an error code (the PubAck message is contained in the exception) Mqtt5PubRecExceptionfor QoS 2 if the PubRec message contained an error code (the PubRec message is contained in the exception) ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely MqttEncodeExceptionif the maximum packet size was exceeded MqttClientStateExceptionif the client is not connected and also not reconnecting If publishing was not successful, the
CompletableFuturecompletes exceptionally with:Mqtt5PubAckExceptionfor QoS 1 if the PubAck message contained an error code (the PubAck message is contained in the exception) Mqtt5PubRecExceptionfor QoS 2 if the PubRec message contained an error code (the PubRec message is contained in the exception) ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely MqttEncodeExceptionif the maximum packet size was exceeded MqttClientStateExceptionif the client is not connected and also not reconnecting If publishing a single message was not successful, the corresponding publish result contains an error:
Mqtt5PubAckExceptionfor QoS 1 if the PubAck message contained an error code (the PubAck message is contained in the exception) Mqtt5PubRecExceptionfor QoS 2 if the PubRec message contained an error code (the PubRec message is contained in the exception) ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely MqttEncodeExceptionif the maximum packet size was exceeded The result stream will always emit a publish result for every message in the input stream. This means that the result stream only errors completely if the provided message stream errors itself and after all publish results have been received. Similarly, the result stream only completes normally if the provided message stream completes normally itself and after all publish results have been received.
The blocking API returns nothing if publishing was successful.
client.publishWith().topic("test/topic").send();If publishing was not successful, it throws:
ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely MqttEncodeExceptionif the maximum packet size was exceeded MqttClientStateExceptionif the client is not connected and also not reconnecting The asynchronous API returns a
CompletableFuturewhich completes with theMqtt3Publishmessage (as context) if publishing was successful.CompletableFuture<Mqtt3Publish> publishResultFuture = client.publishWith().topic("test/topic").send();If publishing was not successful, the
CompletableFuturecompletes exceptionally with:ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely MqttEncodeExceptionif the maximum packet size was exceeded MqttClientStateExceptionif the client is not connected and also not reconnecting The reactive API does not publish a single message, but an asynchronous stream of messages: a
Flowable<Mqtt3Publish>. Hence it also returns an asynchronous stream of results:Flowable<Mqtt3PublishResult>. Each published message will cause a result in the returned stream. As theFlowableis a reactive type, the following line does not publish immediately but only after you subscribe to it (in terms of Reactive Streams).Flowable<Mqtt3PublishResult> publishResultFlowable = client.publish( Flowable.range(0, 100).map(i -> Mqtt3Publish.builder().topic("test/topic" + i).build()));If publishing a single message was not successful, the corresponding publish result contains an error:
ConnectionClosedExceptionfor QoS 0 if the connection was closed during writing the message to the transport MqttSessionExpiredExceptionif the session expired before the message has been acknowledged completely MqttEncodeExceptionif the maximum packet size was exceeded The result stream will always emit a publish result for every message in the input stream. This means that the result stream only errors completely if the provided message stream errors itself and after all publish results have been received. Similarly, the result stream only completes normally if the provided message stream completes normally itself and after all publish results have been received.
The rest of this section describes all possible properties of a
Mqtt5Publishmessage. They can be set via a fluent builder API.Mqtt5PublishResult publishResult = client.publishWith() ... // here you can specify multiple properties which are described below .send();Mqtt5Publish publishMessage = Mqtt5Publish.builder() ... // here you can specify multiple properties which are described below .build(); Mqtt5PublishResult publishResult = client.publish(publishMessage);
CompletableFuture<Mqtt5PublishResult> publishResultFuture = client.publishWith() ... // here you can specify multiple properties described below .send();Mqtt5Publish publishMessage = Mqtt5Publish.builder() ... // here you can specify multiple properties described below .build(); CompletableFuture<Mqtt5PublishResult> publishResultFuture = client.publish(publishMessage);
Flowable<Mqtt5PublishResult> publishResultFlowable = client.publish( Flowable.range(0, 100).map(i -> Mqtt5Publish.builder() ... // here you can specify multiple properties described below .build() ));
The rest of this section describes all possible properties of a
Mqtt3Publishmessage. They can be set via a fluent builder API.client.publishWith() ... // here you can specify multiple properties which are described below .send();Mqtt3Publish publishMessage = Mqtt3Publish.builder() ... // here you can specify multiple properties which are described below .build(); Mqtt3PublishResult publishResult = client.publish(publishMessage);
CompletableFuture<Mqtt3Publish> publishResultFuture = client.publishWith() ... // here you can specify multiple properties described below .send();Mqtt3Publish publishMessage = Mqtt5PuMqtt3Publishblish.builder() ... // here you can specify multiple properties described below .build(); CompletableFuture<Mqtt3Publish> publishResultFuture = client.publish(publishMessage);
Flowable<Mqtt3PublishResult> publishResultFlowable = client.publish( Flowable.range(0, 100).map(i -> Mqtt3Publish.builder() ... // here you can specify multiple properties described below .build() ));
Topic
Messages are published with a topic. The MQTT broker needs the topic to route the message to subscribers. Hence the topic is mandatory for a Publish message (it is the only required property). A topic can be hierarchically structured in multiple topic levels (divided by /) enabling easier filtering for subscribers.
Property Values Default MQTT Specification topicString/MqttTopicmandatory 3.3.2.1 client.publishWith().topic("test/topic")...;Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic("test/topic")...build();
Property Values Default MQTT Specification topicString/MqttTopicmandatory 3.3.2.1 client.publishWith().topic("test/topic")...;Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("test/topic")...build();
Payload
The payload of a Publish message carries the actual application data. MQTT is data-agnostic so you can use any format for the payload.
Property Values Default MQTT Specification payloadbyte[]/ByteBuffer- 3.3.3 client.publishWith() .topic("test/topic") .payload("hello world".getBytes()) ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .payload("hello world".getBytes()) ... .build();
Property Values Default MQTT Specification payloadbyte[]/ByteBuffer- 3.3.3 client.publishWith() .topic("test/topic") .payload("hello world".getBytes()) ...;Mqtt3Publish publishMessage = Mqtt3Publish.builder() .topic("test/topic") .payload("hello world".getBytes()) ... .build();
Quality of Service (QoS)
The QoS levels ensure different message delivery guarantees in case of connection failures. The QoS level should be chosen based on the use case.
| QoS 0 | AT MOST ONCE | Messages are not redelivered after a failure. Some messages may be lost. |
| QoS 1 | AT LEAST ONCE | Messages are redelivered after a failure if they were not acknowledged by the broker. Some messages may be delivered more than once (initial delivery attempt + redelivery attempt(s)). |
| QoS 2 | EXACTLY ONCE | Messages are redelivered after a failure if they were not acknowledged by the broker. The broker additionally filters duplicate messages based on message ids. |
The trade-off between the QoS levels is lower or higher latency and the amount of state that has to be stored on sender and receiver.
Keep in mind that the MQTT QoS levels cover guarantees between the client and the broker (not directly the subscribers) as MQTT is an asynchronous protocol (which is an advantage because it decouples publishers and subscribers and makes the system more robust and scalable). Different brokers might provide different guarantees for end-to-end communication (especially if they are clustered).
Property Values Default MQTT Specification qosAT_MOST_ONCE
AT_LEAST_ONCE
EXACTLY_ONCEAT_MOST_ONCE3.3.1.2 client.publishWith() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) ... .build();
Property Values Default MQTT Specification qosAT_MOST_ONCE
AT_LEAST_ONCE
EXACTLY_ONCEAT_MOST_ONCE3.3.1.2 client.publishWith() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) ...;Mqtt3Publish publishMessage = Mqtt3Publish.builder() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) ... .build();
Retain
The retain flag indicates that the message should be stored at the broker for its topic. New subscribers then get the last retained message on that topic even if they were not connected when it was published.
Property Values Default MQTT Specification retaintrue/falsefalse3.3.1.3 client.publishWith() .topic("test/topic") .retain(true) ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .retain(true) ... .build();
Property Values Default MQTT Specification retaintrue/falsefalse3.3.1.3 client.publishWith() .topic("test/topic") .retain(true) ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .retain(true) ... .build();
Message Expiry Interval
The message expiry interval is the time interval (in seconds) the message will be queued for subscribers.
Property Values Default MQTT Specification messageExpiryInterval[ 0-4_294_967_295]- 3.3.2.3.3 client.publishWith() .topic("test/topic") .messageExpiryInterval(100) ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .messageExpiryInterval(100) ... .build();
Message expiry can be disabled (the default) by using the method
noMessageExpiry.client.publishWith() .topic("test/topic") .noMessageExpiry() ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .noMessageExpiry() ... .build();
Additional Resources
Payload Format Indicator
To indicate if the payload is text or binary data, you can use the the payload format indicator. Used in conjunction with the content type the payload encoding can be described precisely.
Property Values Default MQTT Specification payloadFormatIndicatorUNSPECIFIEDUTF_8- 3.3.2.3.2 client.publishWith() .topic("test/topic") .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) ... .build();
Additional Resources
Content Type
The content type describes the encoding of the payload. It can be any string, but it is recommended to use a MIME type to ensure interoperability.
Property Values Default MQTT Specification contentTypeString/MqttUtf8String- 3.3.2.3.9 client.publishWith() .topic("test/topic") .contentType("text/plain") ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .contentType("text/plain") ... .build();
Additional Resources
Response Topic
Although MQTT is a publish/subscribe protocol, it can be used with a request/response pattern. MQTT’s request/response is different from synchronous request/response (like HTTP) as it has still all MQTT characteristics like asynchronism, decoupling of sender and receiver and 1-to-many communication. Requesting is done by subscribing to a response topic and then publishing to a request topic. The publish includes the response topic so a responder knows to which topic it should publish the response. To correlate request and response (as they are asynchron, multiple responses from different clients are possible), you can use the correlation data.
Property Values Default MQTT Specification responseTopicString/MqttTopic- 3.3.2.3.5 client.publishWith() .topic("request/topic") .responseTopic("response/topic") ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("request/topic") .responseTopic("response/topic") ... .build();
Additional Resources
Correlation Data
Correlation data can be used to correlate a request to its response. If it is part of the request message then the responder copies it to the response message.
Property Values Default MQTT Specification correlationDatabyte[]/ByteBuffer- 3.3.2.3.6 client.publishWith() .topic("request/topic") .responseTopic("response/topic") .correlationData("1234".getBytes()) ...;Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("request/topic") .responseTopic("response/topic") .correlationData("1234".getBytes()) ... .build();
Additional Resources
User Properties
User Properties are user defined name and value pairs which are sent with the
Mqtt5Publishmessage.Method Values MQTT Specification userProperties.addString, StringMqttUtf8String, MqttUtf8StringMqtt5UserProperty3.3.2.3.7 client.publishWith() .topic("test/topic") .userProperties() .add("name1", "value1") .add(Mqtt5UserProperty.of("name2", "value2")) .applyUserProperties() ...Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .userProperties() .add("name1", "value1") .add(Mqtt5UserProperty.of("name2", "value2")) .applyUserProperties() ... .build();You can also prebuild the
Mqtt5UserProperties.Mqtt5UserProperties userProperties = Mqtt5UserProperties.builder() .add("name1", "value1") .add(Mqtt5UserProperty.of("name2", "value2")) .build(); Mqtt5Publish publishMessage = Mqtt5Publish.builder() .topic("test/topic") .userProperties(userProperties) ... .build();
Additional Resources