黑暗时代:背水一战免安装绿色中文版
16.3G · 2025-11-01
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:
docker run -p 5672:5672 -p 15672:15672 -p 1883:1883 -p 15675:15675 --name rabbitmq-mqtt
-v /mydata/rabbitmq-mqtt/data:/var/lib/rabbitmq
-d rabbitmq:3.9.11-management
# 先进入rabbitmq容器
docker exec -it rabbitmq-mqtt /bin/bash
# 再启用mqtt web插件,会同时启用rabbitmq_mqtt插件
rabbitmq-plugins enable rabbitmq_web_mqtt
15675端口上了,访问地址:http://192.*168*.*3.101:15672docker run -p 80:80 --name mqttx-web -d emqx/mqttx-web
加号按钮来创建一个MQTT连接,配置好连接信息,注意MQTT版本选择3.1.1;testTopicA这个主题,我们会向这个主题发送消息;MQTT.js的库,项目地址:github.com/mqttjs/MQTT…<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div>
<label>目标Topic:<input id="targetTopicInput" type="text"></label><br>
<label>发送消息:<input id="messageInput" type="text"></label><br>
<button onclick="sendMessage()">发送</button>
<button onclick="clearMessage()">清空</button>
<div id="messageDiv"></div>
</div>
</body>
<script src="https://*unp*kg.com/*mqtt/dist/mqtt.min.js"></script>
<script>
//RabbitMQ的web-mqtt连接地址
const url = 'ws://192.168.3.101:15675/ws';
//获取订阅的topic
const topic = getQueryString("topic");
//连接到消息队列
let client = mqtt.connect(url);
client.on('connect', function () {
//连接成功后订阅topic
client.subscribe(topic, function (err) {
if (!err) {
showMessage("订阅topic:" + topic + "成功!");
}
});
});
//获取订阅topic中的消息
client.on('message', function (topic, message) {
showMessage("收到消息:" + message.toString());
});
//发送消息
function sendMessage() {
let targetTopic = document.getElementById("targetTopicInput").value;
let message = document.getElementById("messageInput").value;
//向目标topic中发送消息
client.publish(targetTopic, message);
showMessage("发送消息给" + targetTopic + "的消息:" + message);
}
//从URL中获取参数
function getQueryString(name) {
let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
let r = window.location.search.substr(1).match(reg);
if (r != null) {
return decodeURIComponent(r[2]);
}
return null;
}
//在消息列表中展示消息
function showMessage(message) {
let messageDiv = document.getElementById("messageDiv");
let messageEle = document.createElement("div");
messageEle.innerText = message;
messageDiv.appendChild(messageEle);
}
//清空消息列表
function clearMessage() {
let messageDiv = document.getElementById("messageDiv");
messageDiv.innerHTML = "";
}
</script>
</html>
接下来我们订阅不同的主题开启两个页面测试下功能(页面已经放在SpringBoot项目的resources目录下了,需要先启动应用再访问):
testTopicA,访问地址:http://l***ocalhost:8088/page/index?topic=testTopicAtestTopicB,访问地址:http://loc*a*lhost*:8088/page/index?topic=testTopicB之后互相发送消息,让我们来看看效果吧!
pom.xml中添加MQTT相关依赖;<!--Spring集成MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;rabbitmq:
mqtt:
url: tcp://192.168.3.101:1883
username: guest
password: guest
defaultTopic: testTopic
/**
* @auther macrozheng
* @description MQTT相关配置
* @date 2025/8/1
* @github https://githu*b.*co*m/macrozheng
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
/**
* RabbitMQ连接用户名
*/
private String username;
/**
* RabbitMQ连接密码
*/
private String password;
/**
* RabbitMQ的MQTT默认topic
*/
private String defaultTopic;
/**
* RabbitMQ的MQTT连接地址
*/
private String url;
}
@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处理订阅消息;/**
* @auther macrozheng
* @description MQTT消息订阅者相关配置
* @date 2025/8/1
* @github https://githu*b.*co*m/macrozheng
*/
@Slf4j
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//设置消息质量:0->至多一次;1->至少一次;2->只有一次
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//处理订阅消息
log.info("handleMessage : {}",message.getPayload());
}
};
}
}
/**
* @auther macrozheng
* @description MQTT消息发布者相关配置
* @date 2025/8/1
* @github https://githu*b.*co*m/macrozheng
*/
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttConfig.getUrl()});
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
/**
* @auther macrozheng
* @description MQTT网关,通过接口将数据传递到集成流
* @date 2025/8/1
* @github https://githu*b.*co*m/macrozheng
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送消息到默认topic
*/
void sendToMqtt(String payload);
/**
* 发送消息到指定topic
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 发送消息到指定topic并设置QOS
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
/**
* @auther macrozheng
* @description MQTT测试接口
* @date 2025/8/1
* @github https://githu*b.*co*m/macrozheng
*/
@Slf4j
@RestController
@Tag(name = "MqttController", description = "MQTT测试接口")
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
@Operation(summary = "向默认主题发送消息")
public CommonResult sendToDefaultTopic(String payload) {
mqttGateway.sendToMqtt(payload);
return CommonResult.success(null);
}
@PostMapping("/sendToTopic")
@Operation(summary = "向指定主题发送消息")
public CommonResult sendToTopic(String payload, String topic) {
mqttGateway.sendToMqtt(payload, topic);
return CommonResult.success(null);
}
}
2025-08-01T16:25:48.503+08:00 INFO 35516 --- [spring-mqtt] [ubscriberClient] c.m.blog.mqtt.config.MqttInboundConfig : handleMessage : 来自网页上的消息
2025-08-01T16:25:49.329+08:00 INFO 35516 --- [spring-mqtt] [ubscriberClient] c.m.blog.mqtt.config.MqttInboundConfig : handleMessage : 来自网页上的消息
2025-08-01T16:25:50.134+08:00 INFO 35516 --- [spring-mqtt] [ubscriberClient] c.m.blog.mqtt.config.MqttInboundConfig : handleMessage : 来自网页上的消息
消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。如果没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件即可实现即时通讯,有特殊需求的时候也可以使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通讯的一个好选择!
github.com/macrozheng/…
16.3G · 2025-11-01
16.1G · 2025-11-01
205M · 2025-11-01