MQTT 是一种基于发布/订阅模式的轻量级消息协议。它专为受限网络环境和低带宽、高延迟或不可靠的网络而设计。其主要特点包括:
轻量与高效:协议开销小,非常适合物联网(IoT)、移动应用等场景。
发布/订阅模式:消息的发送者(发布者)和接收者(订阅者)通过 Topic(主题)解耦,不需要知道彼此的存在。
多级 QoS:支持最多一次(0)、至少一次(1)、刚好一次(2)三种消息服务质量等级,可按需保证消息的可靠传输。
双向通信:设备可通过 MQTT 上传数据(如传感器读数),服务端也可反向发布指令(如控制开关)。
在集成之前,你需要了解以下几个核心概念:
EMQX:定位是 云端的核心数据中枢。它追求极致的扩展性(集群可达亿万级连接)和高可用性,并提供强大的数据集成与处理能力。
NanoMQ:定位是 边缘侧的轻量级消息总线。它追求极致的轻量(占用资源极低)和高性能,专注于在资源受限的边缘环境中高效收集和处理数据。
docker run -d --name nanomq -p 1883:1883 -p 8083:8083 -p 8883:8883 emqx/nanomq:latest
docker run -d --name emqx-enterprise \ --hostname node1.emqx.com \ -e "EMQX_NODE_NAME=emqx@node1.emqx.com" \ -p 1883:1883 -p 8083:8083 \ -p 8084:8084 -p 8883:8883 \ -p 18083:18083 \ -v $PWD/data:/opt/emqx/data \ -v $PWD/log:/opt/emqx/log \ emqx/emqx-enterprise:6.2.0
在 Java/Spring Boot 项目中,主要有两种集成 MQTT 的方式,你可以根据项目的复杂度和需求进行选择。
方案一:使用 Eclipse Paho 客户端库 (最直接的方式) Eclipse Paho 是 Java 领域最成熟、最常用的 MQTT 客户端库。这种方式让你能对 MQTT 连接的细节有更精细的控制。
在你的 pom.xml 中添加 Paho 客户端库依赖:
xml<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
在 application.yml 或 application.properties 中配置连接信息:
yamlmqtt:
broker-url: tcp://localhost:1883 # Broker地址,SSL使用ssl://...
client-id: your-mqtt-client-id # 客户端ID,需唯一
username: your-username # 用户名(可选)
password: your-password # 密码(可选)
timeout: 30 # 连接超时(秒)
keep-alive: 60 # 心跳间隔(秒)
创建一个配置类,用于初始化 Paho 的 MqttClient,并设置连接选项和回调函数。
javaimport org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
// ... 其他配置属性
@Bean
public MqttClient mqttClient() throws MqttException {
// 1. 创建客户端
MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
// 2. 配置连接选项
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setAutomaticReconnect(true); // 开启自动重连
options.setCleanSession(false); // 启用持久会话
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepAlive);
// 3. 设置回调函数,处理连接断开和消息到达
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.err.println("连接已断开,原因:" + cause.getMessage());
// 可在此处添加自定义的异常告警逻辑
}
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.println("收到消息 - Topic: " + topic + ", Payload: " + new String(message.getPayload()));
// 在这里处理业务逻辑,如解析JSON并存入数据库等
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {}
});
// 4. 连接到Broker
client.connect(options);
System.out.println("MQTT 连接成功");
return client;
}
}
封装一个 MqttService 或直接在 Controller 中使用 mqttClient 的实例:
发布消息 (Publish):通过 REST API 触发消息发送。
订阅主题 (Subscribe):通常在应用启动后或特定条件下调用。
java@Service
public class MqttService {
@Autowired
private MqttClient mqttClient;
// 发布消息
public void publish(String topic, String payload, int qos) throws MqttException {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
mqttClient.publish(topic, message);
}
// 订阅主题(可在 @PostConstruct 方法中调用)
@PostConstruct
public void subscribe() throws MqttException {
mqttClient.subscribe("sensor/+/data", (topic, msg) -> {
// 直接在回调中处理,或在messageArrived回调中处理
});
}
}
方案二:基于 Spring Integration 的消息驱动方案
如果你的系统基于 Spring,并且希望使用声明式、消息驱动的方式与 MQTT 交互,那么 Spring Integration 的 MQTT 模块是更优雅的选择。它提供了入站和出站适配器,能无缝融入 Spring 的消息通道(MessageChannel)体系。
除了 Paho 依赖,还需要引入 spring-integration-mqtt。
xml<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
你可以通过 XML 配置或 Java 配置来定义消息通道和适配器。配置中的 MqttPahoMessageDrivenChannelAdapter 用于接收消息,而 MqttPahoMessageHandler 用于发送消息。
java@Configuration
public class MqttIntegrationConfig {
@Bean
public DefaultMqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://localhost:1883"});
options.setAutomaticReconnect(true);
// ... 其他配置
factory.setConnectionOptions(options);
return factory;
}
// 入站适配器:接收MQTT消息,发送到 inputChannel
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inboundAdapter() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("clientIdIn", mqttClientFactory(), "topic1", "topic2");
adapter.setOutputChannel(mqttInputChannel());
adapter.setQos(1);
return adapter;
}
// 出站适配器:从 outputChannel 接收消息,发布到MQTT
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageHandler outboundAdapter() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientIdOut", mqttClientFactory());
handler.setAsync(true);
handler.setDefaultTopic("default-topic");
handler.setOutputChannel(mqttOutputChannel());
return handler;
}
}
定义一个 @ServiceActivator 来监听 mqttInputChannel,处理接收到的 MQTT 消息,并将结果发送到 mqttOutputChannel 以发布新的 MQTT 消息。
java@Component
public class MqttMessageProcessor {
@ServiceActivator(inputChannel = "mqttInputChannel", outputChannel = "mqttOutputChannel")
public Message<?> handleMqttMessage(Message<?> message) {
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
String payload = new String((byte[]) message.getPayload());
System.out.println("收到并处理消息: " + payload);
// 业务逻辑处理,例如解析数据、数据库存储等
// 如果需要回复,可以构造一个新的消息发送到 outputChannel
return MessageBuilder.withPayload("{\"response\":\"OK\"}")
.setHeader(MqttHeaders.TOPIC, "response/topic")
.build();
}
}
Spring Boot + Paho 客户端库
复杂度:低/中
核心依赖:org.eclipse.paho.client.mqttv3
适用场景:追求快速开发、直接控制连接和回调
特点:代码直观,控制粒度细,配置相对简单
Spring Integration MQTT
复杂度:中/高
核心依赖:spring-integration-mqtt
适用场景:复杂消息路由、需要声明式集成Spring生态
特点:基于消息通道,与Spring Messaging无缝集成,重连和会话管理更健壮、支持MQTT v5及主题模式匹配
本文作者:Weee
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!