编辑
2024-05-07
学习记录
00
请注意,本文编写于 731 天前,最后修改于 1 天前,其中某些信息可能已经过时。

目录

什么是 MQTT?
核心组件与架构
部署mqtt服务
两种主流集成方案
两种方案对比

什么是 MQTT?

MQTT 是一种基于发布/订阅模式的轻量级消息协议。它专为受限网络环境和低带宽、高延迟或不可靠的网络而设计。其主要特点包括:

轻量与高效:协议开销小,非常适合物联网(IoT)、移动应用等场景。

发布/订阅模式:消息的发送者(发布者)和接收者(订阅者)通过 Topic(主题)解耦,不需要知道彼此的存在。

多级 QoS:支持最多一次(0)、至少一次(1)、刚好一次(2)三种消息服务质量等级,可按需保证消息的可靠传输。

双向通信:设备可通过 MQTT 上传数据(如传感器读数),服务端也可反向发布指令(如控制开关)。

核心组件与架构

在集成之前,你需要了解以下几个核心概念:

  • MQTT 代理 (Broker):消息的中枢服务器,负责接收所有客户端的消息,并根据主题将其推送给订阅的客户端。常见的有 EMQX、HiveMQ、Mosquitto 等。
  • MQTT 客户端 (Client):你的 Java/Spring Boot 应用,它可以充当发布者、订阅者或两者皆是。
  • 主题 (Topic):消息的“地址”,例如 device/sensor/temperature,客户端通过指定主题来收发消息。
  • QoS (服务质量):定义了消息传递的可靠性级别。

部署mqtt服务

EMQX:定位是 云端的核心数据中枢。它追求极致的扩展性(集群可达亿万级连接)和高可用性,并提供强大的数据集成与处理能力。

NanoMQ:定位是 边缘侧的轻量级消息总线。它追求极致的轻量(占用资源极低)和高性能,专注于在资源受限的边缘环境中高效收集和处理数据。

nonamq官网

docker run -d --name nanomq -p 1883:1883 -p 8083:8083 -p 8883:8883 emqx/nanomq:latest

emqx官网

docker run -d --name emqx-enterprise \ --hostname node1.emqx.com \ -e "EMQX_NODE_NAME=emqx@node1.emqx.com" \ -p 1883:1883 -p 8083:8083 \ -p 8084:8084 -p 8883:8883 \ -p 18083:18083 \ -v $PWD/data:/opt/emqx/data \ -v $PWD/log:/opt/emqx/log \ emqx/emqx-enterprise:6.2.0

两种主流集成方案

在 Java/Spring Boot 项目中,主要有两种集成 MQTT 的方式,你可以根据项目的复杂度和需求进行选择。

方案一:使用 Eclipse Paho 客户端库 (最直接的方式) Eclipse Paho 是 Java 领域最成熟、最常用的 MQTT 客户端库。这种方式让你能对 MQTT 连接的细节有更精细的控制。

  1. 添加依赖

在你的 pom.xml 中添加 Paho 客户端库依赖:

xml
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
  1. 配置连接参数

在 application.yml 或 application.properties 中配置连接信息:

yaml
mqtt: broker-url: tcp://localhost:1883 # Broker地址,SSL使用ssl://... client-id: your-mqtt-client-id # 客户端ID,需唯一 username: your-username # 用户名(可选) password: your-password # 密码(可选) timeout: 30 # 连接超时(秒) keep-alive: 60 # 心跳间隔(秒)
  1. 编写 MQTT 配置类

创建一个配置类,用于初始化 Paho 的 MqttClient,并设置连接选项和回调函数。

java
import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MqttConfig { @Value("${mqtt.broker-url}") private String brokerUrl; @Value("${mqtt.client-id}") private String clientId; // ... 其他配置属性 @Bean public MqttClient mqttClient() throws MqttException { // 1. 创建客户端 MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence()); // 2. 配置连接选项 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setAutomaticReconnect(true); // 开启自动重连 options.setCleanSession(false); // 启用持久会话 options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepAlive); // 3. 设置回调函数,处理连接断开和消息到达 client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.err.println("连接已断开,原因:" + cause.getMessage()); // 可在此处添加自定义的异常告警逻辑 } @Override public void messageArrived(String topic, MqttMessage message) { System.out.println("收到消息 - Topic: " + topic + ", Payload: " + new String(message.getPayload())); // 在这里处理业务逻辑,如解析JSON并存入数据库等 } @Override public void deliveryComplete(IMqttDeliveryToken token) {} }); // 4. 连接到Broker client.connect(options); System.out.println("MQTT 连接成功"); return client; } }
  1. 实现发布与订阅

封装一个 MqttService 或直接在 Controller 中使用 mqttClient 的实例:

发布消息 (Publish):通过 REST API 触发消息发送。

订阅主题 (Subscribe):通常在应用启动后或特定条件下调用。

java
@Service public class MqttService { @Autowired private MqttClient mqttClient; // 发布消息 public void publish(String topic, String payload, int qos) throws MqttException { MqttMessage message = new MqttMessage(payload.getBytes()); message.setQos(qos); mqttClient.publish(topic, message); } // 订阅主题(可在 @PostConstruct 方法中调用) @PostConstruct public void subscribe() throws MqttException { mqttClient.subscribe("sensor/+/data", (topic, msg) -> { // 直接在回调中处理,或在messageArrived回调中处理 }); } }

方案二:基于 Spring Integration 的消息驱动方案

如果你的系统基于 Spring,并且希望使用声明式、消息驱动的方式与 MQTT 交互,那么 Spring Integration 的 MQTT 模块是更优雅的选择。它提供了入站和出站适配器,能无缝融入 Spring 的消息通道(MessageChannel)体系。

  1. 添加依赖

除了 Paho 依赖,还需要引入 spring-integration-mqtt。

xml
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
  1. 配置 MQTT 通道适配器

你可以通过 XML 配置或 Java 配置来定义消息通道和适配器。配置中的 MqttPahoMessageDrivenChannelAdapter 用于接收消息,而 MqttPahoMessageHandler 用于发送消息。

java
@Configuration public class MqttIntegrationConfig { @Bean public DefaultMqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{"tcp://localhost:1883"}); options.setAutomaticReconnect(true); // ... 其他配置 factory.setConnectionOptions(options); return factory; } // 入站适配器:接收MQTT消息,发送到 inputChannel @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MqttPahoMessageDrivenChannelAdapter inboundAdapter() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("clientIdIn", mqttClientFactory(), "topic1", "topic2"); adapter.setOutputChannel(mqttInputChannel()); adapter.setQos(1); return adapter; } // 出站适配器:从 outputChannel 接收消息,发布到MQTT @Bean public MessageChannel mqttOutputChannel() { return new DirectChannel(); } @Bean public MqttPahoMessageHandler outboundAdapter() { MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientIdOut", mqttClientFactory()); handler.setAsync(true); handler.setDefaultTopic("default-topic"); handler.setOutputChannel(mqttOutputChannel()); return handler; } }
  1. 处理业务逻辑

定义一个 @ServiceActivator 来监听 mqttInputChannel,处理接收到的 MQTT 消息,并将结果发送到 mqttOutputChannel 以发布新的 MQTT 消息。

java
@Component public class MqttMessageProcessor { @ServiceActivator(inputChannel = "mqttInputChannel", outputChannel = "mqttOutputChannel") public Message<?> handleMqttMessage(Message<?> message) { String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); String payload = new String((byte[]) message.getPayload()); System.out.println("收到并处理消息: " + payload); // 业务逻辑处理,例如解析数据、数据库存储等 // 如果需要回复,可以构造一个新的消息发送到 outputChannel return MessageBuilder.withPayload("{\"response\":\"OK\"}") .setHeader(MqttHeaders.TOPIC, "response/topic") .build(); } }

两种方案对比

Spring Boot + Paho 客户端库

复杂度:低/中

核心依赖:org.eclipse.paho.client.mqttv3

适用场景:追求快速开发、直接控制连接和回调

特点:代码直观,控制粒度细,配置相对简单

Spring Integration MQTT

复杂度:中/高

核心依赖:spring-integration-mqtt

适用场景:复杂消息路由、需要声明式集成Spring生态

特点:基于消息通道,与Spring Messaging无缝集成,重连和会话管理更健壮、支持MQTT v5及主题模式匹配

本文作者:Weee

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!