添加Maven依赖
在pom.xml文件中添加如下依赖
<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>
</dependency>
模拟一个客户端接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| package app.mqtt;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ClientMQTT {
public static final String HOST = "tcp://192.168.2.153:1883"; private static final String clientid = "client@@@By7yGnhVX0e9XEpY"; private MqttClient client; private MqttConnectOptions options; private String userName = "By7yGnhCC0e9XEpY$%4"; private String passWord = "WTSUbbb2jRsVH7";
public void start() { try { client = new MqttClient(HOST, clientid, new MemoryPersistence()); options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10); options.setAutomaticReconnect(true); options.setKeepAliveInterval(20); client.setCallback(new PushCallback(ClientMQTT.this));
MqttTopic topic = client.getTopic("client/will/"+clientid); options.setWill(topic, "offline".getBytes(), 2, true); client.connect(options);
if (client.isConnected()){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(1); mqttMessage.setRetained(true); mqttMessage.setPayload("online".getBytes("UTF-8")); topic.publish(mqttMessage);
int[] Qos = {1}; String[] topics = {"client/#"}; client.subscribe(topics, Qos); }
} catch (Exception e) { e.printStackTrace(); } }
public void reConnect() throws Exception { if(null != client) { client.connect(options); } } }
|
消息回调类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package app.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback {
private ClientMQTT conn;
PushCallback(ClientMQTT clientMQTT){ conn = clientMQTT; }
public void connectionLost(Throwable cause) { System.out.println("连接断开"); try { System.out.println("重新连接中"); conn.reConnect(); } catch (Exception e) { e.printStackTrace(); } }
public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("消息交付完成:" + token.isComplete()); }
public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + new String(message.getPayload())); } }
|
启动入口
1 2 3 4 5 6 7 8 9 10 11 12
| import app.mqtt.ClientMQTT;
public class Main { public static void main(String[] args) throws Exception { ClientMQTT client = new ClientMQTT(); client.start(); } }
|