Connect
You can connect a MQTT client without the need to provide arguments. This will use the default parameters as defined in the MQTT specification or reasonable defaults if not defined there.
client.connect();
The return type depends on the used MQTT version and API flavour.
The blocking API directly returns a
Mqtt5ConnAck
message if connecting was successful.Mqtt5ConnAck connAckMessage = client.connect();
If connecting was not successful, it throws:
The asynchronous API returns a
CompletableFuture
which completes with aMqtt5ConnAck
message if connecting was successful.CompletableFuture<Mqtt5ConnAck> connAckFuture = client.connect();
If connecting was not successful, the
CompletableFuture
completes exceptionally with:The reactive API returns a
Single
which succeeds with aMqtt5ConnAck
message if connecting was successful. As theSingle
is a reactive type, the following line does not connect immediately but only after you subscribe to it (in terms of Reactive Streams).Single<Mqtt5ConnAck> connAckSingle = client.connect();
If connecting was not successful, the
Single
errors with:
ConnectionFailedException
if an error occurs before the Connect message could be sent ConnectionClosedException
if the connection is closed after the Connect message has been sent but before a ConnAck message has been received Mqtt5ConnAckException
if the ConnAck message contained an error code (the ConnAck message is contained in the exception) MqttClientStateException
if the client is already connecting or connected The blocking API directly returns a
Mqtt3ConnAck
message if connecting was successful.Mqtt3ConnAck connAckMessage = client.connect();
If connecting was not successful, it throws:
The asynchronous API returns a
CompletableFuture
which succeeds with aMqtt3ConnAck
message if connecting was successful.CompletableFuture<Mqtt3ConnAck> connAckFuture = client.connect();
If connecting was not successful, the
CompletableFuture
completes exceptionally with:The reactive API returns a
Single
which succeeds with aMqtt3ConnAck
message if connecting was successful. As theSingle
is a reactive type the following line does not connect immediately but only after you subscribe to it (in terms of Reactive Streams).Single<Mqtt3ConnAck> connAckSingle = client.connect();
If connecting was not successful, the
Single
errors with:
ConnectionFailedException
if an error occurs before the Connect message could be sent ConnectionClosedException
if the connection is closed after the Connect message has been sent but before a ConnAck message has been received Mqtt3ConnAckException
if the ConnAck message contained an error code (the ConnAck message is contained in the exception) MqttClientStateException
if the client is already connecting or connected
The rest of this section describes all possible properties of a
Mqtt5Connect
message. They can be set via a fluent builder API.Mqtt5ConnAck connAckMessage = client.connectWith() ... // here you can specify multiple properties which are described below .send();
Mqtt5Connect connectMessage = Mqtt5Connect.builder() ... // here you can specify multiple properties which are described below .build(); Mqtt5ConnAck connAckMessage = client.connect(connectMessage);
CompletableFuture<Mqtt5ConnAck> connAckFuture = client.connectWith() ... // here you can specify multiple properties described below .send();
Mqtt5Connect connectMessage = Mqtt5Connect.builder() ... // here you can specify multiple properties described below .build(); CompletableFuture<Mqtt5ConnAck> connAckFuture = client.connect(connectMessage);
Single<Mqtt5ConnAck> connAckSingle = client.connectWith() ... // here you can specify multiple properties described below .applyConnect();
Mqtt5Connect connectMessage = Mqtt5Connect.builder() ... // here you can specify multiple properties described below .build(); Single<Mqtt5ConnAck> connAckSingle = client.connect(connectMessage);
The rest of this section describes all possible properties of a
Mqtt3Connect
message. They can be set via a fluent builder API.Mqtt3ConnAck connAckMessage = client.connectWith() ... // here you can specify multiple properties which are described below .send();
Mqtt3Connect connectMessage = Mqtt3Connect.builder() ... // here you can specify multiple properties which are described below .build(); Mqtt3ConnAck connAckMessage = client.connect(connectMessage);
CompletableFuture<Mqtt3ConnAck> connAckFuture = client.connectWith() ... // here you can specify multiple properties described below .send();
Mqtt3Connect connectMessage = Mqtt3Connect.builder() ... // here you can specify multiple properties described below .build(); CompletableFuture<Mqtt3ConnAck> connAckFuture = client.connect(connectMessage);
Single<Mqtt3ConnAck> connAckSingle = client.connectWith() ... // here you can specify multiple properties described below .applyConnect();
Mqtt3Connect connectMessage = Mqtt3Connect.builder() ... // here you can specify multiple properties described below .build(); Single<Mqtt3ConnAck> connAckSingle = client.connect(connectMessage);
Clean Start
Clean start determines if the client wants to start a new “clean” session (
true
) or wants to resume a previous session if present (false
).Property Values Default MQTT Specification cleanStart
true
/false
true
3.1.2.4 client.connectWith().cleanStart(false)...;
Mqtt5Connect connectMessage = Mqtt5Connect.builder().cleanStart(false)...build();
Session Expiry Interval
The session expiry interval is the time interval (in seconds) the session will persist when the client is disconnected.
Property Values Default MQTT Specification sessionExpiryInterval
[ 0
-4_294_967_295
]0
3.1.2.11.2 client.connectWith().sessionExpiryInterval(100)...;
Mqtt5Connect connectMessage = Mqtt5Connect.builder().sessionExpiryInterval(100)...build();
Session expiry can be disabled by setting it to
4_294_967_295
or using the methodnoSessionExpiry
.client.connectWith().noSessionExpiry()...;
Mqtt5Connect connectMessage = Mqtt5Connect.builder().noSessionExpiry()...build();
Additional ResourcesClean Session
Clean session determines if the client wants to start a new “clean” session (
true
) or wants to resume a previous session which will persist when the client is disconnected (false
).Property Values Default MQTT Specification cleanSession
true
/false
true
3.1.2.4 client.connectWith().cleanSession(false)...;
Mqtt3Connect connectMessage = Mqtt3Connect.builder().cleanSession(false)...build();
Keep Alive
The keep alive is the time interval (in seconds) in which the client sends a ping to the broker if no other MQTT packets are sent during this period of time. It is used to determine if the connection is still up.
Property Values Default MQTT Specification keepAlive
[ 0
-65_535
]60
3.1.2.10 client.connectWith().keepAlive(30)...;
Mqtt5Connect connectMessage = Mqtt5Connect.builder().keepAlive(30)...build();
Keep alive can be disabled by setting it to
0
or using the methodnoKeepAlive
.client.connectWith().noKeepAlive()...;
Mqtt5Connect connectMessage = Mqtt5Connect.builder().noKeepAlive()...build();
Property Values Default MQTT Specification keepAlive
[ 0
-65_535
]60
3.1.2.10 client.connectWith().keepAlive(30)...;
Mqtt3Connect connectMessage = Mqtt3Connect.builder().keepAlive(30)...build();
Keep alive can be disabled by setting it to
0
or using the methodnoKeepAlive
.client.connectWith().noKeepAlive()...;
Mqtt3Connect connectMessage = Mqtt3Connect.builder().noKeepAlive()...build();
Simple Auth (username & password)
Property Values Default MQTT Specification simpleAuth.username
String
/MqttUtf8String
- 3.1.3.5 simpleAuth.password
byte[]
/ByteBuffer
- 3.1.3.6 client.connectWith() .simpleAuth() .username("username") .password("password".getBytes()) .applySimpleAuth() ...;
Mqtt5Connect connectMessage = Mqtt5Connect.builder() .simpleAuth() .username("username") .password("password".getBytes()) .applySimpleAuth() ... .build();
You can also prebuild the
Mqtt5SimpleAuth
.Mqtt5SimpleAuth simpleAuth = Mqtt5SimpleAuth.builder() .username("username") .password("password".getBytes()) .build(); Mqtt5Connect connectMessage = Mqtt5Connect.builder() .simpleAuth(simpleAuth) ... .build();
Property Values Default MQTT Specification simpleAuth.username
String
/MqttUtf8String
- 3.1.3.4 simpleAuth.password
byte[]
/ByteBuffer
- 3.1.3.5 client.connectWith() .simpleAuth() .username("username") .password("password".getBytes()) .applySimpleAuth() ...;
Mqtt3Connect connectMessage = Mqtt3Connect.builder() .simpleAuth() .username("username") .password("password".getBytes()) .applySimpleAuth() ... .build();
You can also prebuild the
Mqtt3SimpleAuth
.Mqtt3SimpleAuth simpleAuth = Mqtt3SimpleAuth.builder() .username("username") .password("password".getBytes()) .build(); Mqtt3Connect connectMessage = Mqtt3Connect.builder() .simpleAuth(simpleAuth) ... .build();
Enhanced Auth
You need to implement an
Mqtt5EnhancedAuthMechanism
for enhanced auth.Simple and enhanced auth can be used both at the same time.
Property Values Default MQTT Specification enhancedAuth
Mqtt5EnhancedAuthMechanism
- 3.1.2.11.9/10 Mqtt5EnhancedAuthMechanism myEnhancedAuthMechanism = ... client.connectWith() .enhancedAuth(myEnhancedAuthMechanism) ...;
Mqtt5EnhancedAuthMechanism myEnhancedAuthMechanism = ... Mqtt5Connect connectMessage = Mqtt5Connect.builder() .enhancedAuth(myEnhancedAuthMechanism) ... .build();
Will
The Will publish message is also known as Last Will and Testament (LWT). It is the message that is published by the broker if the client disconnected ungracefully or with the reason code
DISCONNECT_WITH_WILL_MESSAGE
.topic
is the only mandatory property for a Will publish message, all others have defaults or are optional.Property Values Default MQTT Specification willPublish.topic
String
/MqttTopic
mandatory 3.1.3.3 willPublish.qos
MqttQos
AT_MOST_ONCE
3.1.2.6 willPublish.payload
byte[]
/ByteBuffer
- 3.1.3.4 willPublish.retain
true
/false
false
3.1.2.7 willPublish.messageExpiryInterval
[ 0
-4_294_967_295
]- 3.1.3.2.4 willPublish.delayInterval
[ 0
-4_294_967_295
]0
3.1.3.2.2 willPublish.payloadFormatIndicator
Mqtt5PayloadFormatIndicator
- 3.1.3.2.3 willPublish.contentType
String
/MqttUtf8String
- 3.1.3.2.5 willPublish.responseTopic
String
/MqttTopic
- 3.1.3.2.6 willPublish.correlationData
byte[]
/ByteBuffer
- 3.1.3.2.7 willPublish.userProperties
Mqtt5UserProperties
- 3.1.3.2.8 client.connectWith() .willPublish() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) .payload("payload".getBytes()) .retain(true) .messageExpiryInterval(100) .delayInterval(10) .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) .contentType("text/plain") .responseTopic("response/topic") .correlationData("correlationData".getBytes()) .userProperties() .add("key1", "value1") .add("key2", "value2") .applyUserProperties() .applyWillPublish() ...
Mqtt5Connect connectMessage = Mqtt5Connect.builder() .willPublish() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) .payload("payload".getBytes()) .retain(true) .messageExpiryInterval(100) .delayInterval(10) .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) .contentType("text/plain") .responseTopic("response/topic") .correlationData("correlationData".getBytes()) .userProperties() .add("key1", "value1") .add("key2", "value2") .applyUserProperties() .applyWillPublish() ... .build();
You can also prebuild the
Mqtt5WillPublish
.Mqtt5WillPublish willPublishMessage = Mqtt5WillPublish.builder() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) .payload("payload".getBytes()) .retain(true) .messageExpiryInterval(100) .delayInterval(10) .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) .contentType("text/plain") .responseTopic("response/topic") .correlationData("correlationData".getBytes()) .userProperties() .add("key1", "value1") .add("key2", "value2") .applyUserProperties() .build() Mqtt5Connect connectMessage = Mqtt5Connect.builder() .willPublish(willPublishMessage) ... .build();
Message expiry can be disabled (the default) by using the method
noMessageExpiry
.All properties of a Will publish message are the same as of a normal
Mqtt5Publish
message with the addition of thedelayInterval
.The Will publish message is also known as Last Will and Testament (LWT). It is the message that is published by the broker if the client disconnected ungracefully.
topic
is the only mandatory property for a Will publish message, all others have defaults or are optional.Property Values Default MQTT Specification willPublish.topic
String
/MqttTopic
mandatory 3.1.3.2 willPublish.qos
MqttQos
AT_MOST_ONCE
3.1.2.6 willPublish.payload
byte[]
/ByteBuffer
- 3.1.3.3 willPublish.retain
true
/false
false
3.1.2.7 client.connectWith() .willPublish() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) .payload("payload".getBytes()) .retain(true) .applyWillPublish() ...
Mqtt3Connect connectMessage = Mqtt3Connect.builder() .willPublish() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) .payload("payload".getBytes()) .retain(true) .applyWillPublish() ... .build();
You can also prebuild the Will publish message.
Mqtt3Publish willPublishMessage = Mqtt3Publish.builder() .topic("test/topic") .qos(MqttQos.AT_LEAST_ONCE) .payload("payload".getBytes()) .retain(true) .build() Mqtt3Connect connectMessage = Mqtt3Connect.builder() .willPublish(willPublishMessage) ... .build();
All properties of a Will publish message are the same as of a normal
Mqtt3Publish
message.
Restrictions
You can specify broker side and client side restrictions. The ones for messages received from the broker are sent with the
Mqtt5Connect
message. The others for messages the client sends itself are used in conjunction with the restrictions the broker specifies in theMqtt5ConnAck
message to determine the actual client side restrictions.Property Values Default MQTT Specification restrictions.receiveMaximum
[ 1
-65_535
]65_535
3.1.2.11.3 restrictions.sendMaximum
[ 1
-65_535
]65_535
- restrictions.maximumPacketSize
[ 1
-268_435_460
]268_435_460
3.1.2.11.4 restrictions.sendMaximumPacketSize
[ 1
-268_435_460
]268_435_460
- restrictions.topicAliasMaximum
[ 0
-65_535
]0
3.1.2.11.5 restrictions.sendTopicAliasMaximum
[ 0
-65_535
]16
- restrictions.requestProblemInformation
true
/false
true
3.1.2.11.7 restrictions.requestResponseInformation
true
/false
false
3.1.2.11.6 client.connectWith() .restrictions() .receiveMaximum(16) .sendMaximum(32) .maximumPacketSize(2048) .sendMaximumPacketSize(1024) .topicAliasMaximum(16) .sendTopicAliasMaximum(8) .requestProblemInformation(false) .requestResponseInformation(true) .applyRestrictions() ...
Mqtt5Connect connectMessage = Mqtt5Connect.builder() .restrictions() .receiveMaximum(16) .sendMaximum(32) .maximumPacketSize(2048) .sendMaximumPacketSize(1024) .topicAliasMaximum(16) .sendTopicAliasMaximum(8) .requestProblemInformation(false) .requestResponseInformation(true) .applyRestrictions() ... .build();
You can also prebuild the
Mqtt5ConnectRestrictions
.Mqtt5ConnectRestrictions restrictions = Mqtt5ConnectRestrictions.builder() .receiveMaximum(16) .sendMaximum(32) .maximumPacketSize(2048) .sendMaximumPacketSize(1024) .topicAliasMaximum(16) .sendTopicAliasMaximum(8) .requestProblemInformation(false) .requestResponseInformation(true) .build(); Mqtt5Connect connectMessage = Mqtt5Connect.builder() .restrictions(restrictions) ... .build();
User Properties
User Properties are user defined name and value pairs which are sent with the
Mqtt5Connect
message.Method Values MQTT Specification userProperties.add
String, String
MqttUtf8String, MqttUtf8String
Mqtt5UserProperty
3.1.2.11.8 client.connectWith() .userProperties() .add("name1", "value1") .add(Mqtt5UserProperty.of("name2", "value2")) .applyUserProperties() ...
Mqtt5Connect connectMessage = Mqtt5Connect.builder() .userProperties() .add("name1", "value1") .add(Mqtt5UserProperty.of("name2", "value2")) .applyUserProperties() ... .build();
You can also prebuild the
Mqtt5UserProperties
.Mqtt5UserProperties connectUserProperties = Mqtt5UserProperties.builder() .add("name1", "value1") .add(Mqtt5UserProperty.of("name2", "value2")) .build(); Mqtt5Connect connectMessage = Mqtt5Connect.builder() .userProperties(connectUserProperties) ... .build();
Additional Resources