spring-integration连接MQTT全过程

Malina ·
更新时间:2024-11-10
· 549 次阅读

目录

首先需要引入spring-integration-mqt的包

MQTT的配置比较简单

其中ChanneName是一个常量类

数据发送网关只是一个接口

MQTT服务器有数据下发时

最后是参数配置文件

总结

MQTT一种物联网数据传输协议,构建在TCP之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实现消息转发。

首先需要引入spring-integration-mqt的包

这里只需要引入这一个包即可。

<dependency>      <groupId>org.springframework.integration</groupId>      <artifactId>spring-integration-mqtt</artifactId>      <version>5.3.1.RELEASE</version> </dependency> MQTT的配置比较简单

和spring-integration集成一样,需要配置相对应的入站、出站就可以了

具体配置如下:

package org.noka.serialservice.config; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.noka.serialservice.service.MsgSendService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.support.MessageBuilder; /**--------------------------------------------------------------  * MQTT 数据转发服务  * mqtt.services MQTT服务地址不配置时,不会启用该服务  * 检测mqtt.services这个参数是否配置,以确定是否启用MQTT服务  * @author  xiefangjian@163.com  * @version 1.0.0  **------------------------------------------------------------*/ @EnableIntegration @Configuration @ConditionalOnProperty("mqtt.services") public class MQTTConfig implements ApplicationListener<ApplicationEvent> {     private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class);     private final MsgSendService msgSendService;//发布消息到消息中间件接口     @Value("${mqtt.appid:mqtt_id}")     private String appid;//客户端ID     @Value("${mqtt.input.topic:mqtt_input_topic}")     private String[] inputTopic;//订阅主题,可以是多个主题     @Value("${mqtt.out.topic:mqtt_out_topic}")     private String[] outTopic;//发布主题,可以是多个主题     @Value("${mqtt.services:#{null}}")     private String[] mqttServices;//服务器地址以及端口     @Value("${mqtt.user:#{null}}")     private String user;//用户名     @Value("${mqtt.password:#{null}}")     private String password;//密码     @Value("${mqtt.KeepAliveInterval:300}")     private Integer KeepAliveInterval;//心跳时间,默认为5分钟     @Value("${mqtt.CleanSession:false}")     private Boolean CleanSession;//是否不保持session,默认为session保持     @Value("${mqtt.AutomaticReconnect:true}")     private Boolean AutomaticReconnect;//是否自动重联,默认为开启自动重联     @Value("${mqtt.CompletionTimeout:30000}")     private Long CompletionTimeout;//连接超时,默认为30秒     @Value("${mqtt.Qos:1}")     private Integer Qos;//通信质量,详见MQTT协议     public MQTTConfig(MsgSendService msgSendService) {         this.msgSendService = msgSendService;     }     /**      * MQTT连接配置      * @return 连接工厂      */     @Bean     public MqttPahoClientFactory mqttClientFactory() {         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类         MqttConnectOptions options = new MqttConnectOptions();//连接参数         options.setServerURIs(mqttServices);//连接地址         if(null!=user) {             options.setUserName(user);//用户名         }         if(null!=password) {             options.setPassword(password.toCharArray());//密码         }         options.setKeepAliveInterval(KeepAliveInterval);//心跳时间         options.setAutomaticReconnect(AutomaticReconnect);//断开是否自动重联         options.setCleanSession(CleanSession);//保持session         factory.setConnectionOptions(options);         return factory;     }     /**      * 入站管道      * @param mqttPahoClientFactory      * @return      */     @Bean     public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){         MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立订阅连接         DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();         converter.setPayloadAsBytes(true);//bytes类型接收         adapter.setCompletionTimeout(CompletionTimeout);//连接超时的时间         adapter.setConverter(converter);         adapter.setQos(Qos);//消息质量         adapter.setOutputChannelName(ChannelName.INPUT_DATA);//输入管道名称         return adapter;     }     /**      * 向服务器发送数据管道绑定      * @param connectionFactory tcp连接工厂类      * @return 消息管道对象      */     @Bean     @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT)     public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) {         //创建一个新的出站管道,由于MQTT的发布与订阅是两个独立的连接,因此客户端的ID(即APPID)不能与订阅时所使用的ID一样,否则在服务端会认为是同一个客户端,而造成连接失败         MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory);         DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();         converter.setPayloadAsBytes(true);//bytes类型接收         outGate.setAsync(true);         outGate.setCompletionTimeout(CompletionTimeout);//设置连接超时时时         outGate.setDefaultQos(Qos);//设置通信质量         outGate.setConverter(converter);         return outGate;     }     /**      * MQTT连接时调用的方法      * @param event      */     @Override     public void onApplicationEvent(ApplicationEvent event) {         if (event instanceof MqttSubscribedEvent) {             String msg = "OK";             /**------------------连接时需要发送起始消息,写在这里-------------**/             msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build());         }     } } 其中ChanneName是一个常量类

来标识入站、出站管道的名称,以便在其它需要的地方使用,实现方法如下:

/** -----------------------------------------  * 管道名称常量类  * @author  xiefangjian@163.com  * @version 1.0.0  ** ---------------------------------------**/ public class ChannelName {     public final static String INPUT_DATA="input_data";//入站管道     public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道     public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名称 }

此时所有配置完成,接下来需要做的就是处理接收到的数据和发布数据,以上配置完成以后,接收和发送数据都是通过数据管道来完成,配置的是数据管道名称。

数据发送网关只是一个接口

用于向指定的数据管道里面发送数据,实现如下:

package org.noka.serialservice.service; import org.noka.serialservice.config.ChannelName; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /**----------------------------------------------------------------  * 发送消息网关,其它需要发向服务器发送消息时,调用该接口  * @author  xiefangjian@163.com  * @version  1.0.0  **--------------------------------------------------------------**/ @MessagingGateway @Component public interface MsgGateway {     /**      * MQTT 发送网关      * @param a 主题,可以指定不同的数据发布主题,在消息中间件里面体现为不同的消息队列      * @param out 消息内容      */     @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT)     void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out); }

在需要的地方,可以向下面这样调用这个接口,向MQTT服务器发送消息

//topic为主题名称,out为消息内容 msgGateway.send(topic, out); MQTT服务器有数据下发时

会自动调将数据放入配置的入站数据管道中,在需要接收数据的地方,向下面这样配置即可

    /**      * 服务器有数据下发      * 用ServiceActivator配置需要接收的数据管道名称,当该管道里面的数据时,会自动调用该方法      * @param in 服务器有数据下发时,序列化后的对象,这里使用byte数组      */     @ServiceActivator(inputChannel = ChannelName.INPUT_DATA)     public void upCase(Message<byte[]> in) {         logger.info("[net service data]========================================");         logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服务器下发的数据         logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16进制方式打印服务器下发的数据         serialService.send(in.getPayload());//将服务器下发的数据转发给串口     } 最后是参数配置文件 #--------MQTT--------------------------- #设备ID,唯一标识 mqtt.appid=mqtt_id #订阅主题,多个主题用逗号分隔 mqtt.input.topic=mqtt_input_topic #发布主题 mqtt.out.topic=mqtt_out_topic,aac #MQTT服务器地址,可以是多个地址 mqtt.services=tcp://47.244.191.41:1883 #mqtt用户名,默认无 #mqtt.user=guest #mqtt密码,默认无 #mqtt.password=guest #心跳间隔时间,默认3000 #mqtt.KeepAliveInterval=3000 #是否不保持session,默认false #mqtt.CleanSession=false #是否自动连接,默认true #mqtt.AutomaticReconnect=true #连接超时,默认30000 #mqtt.CompletionTimeout=30000 #传输质量,默认1 #mqtt.Qos=1 总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持软件开发网。



mqtt spring

需要 登录 后方可回复, 如果你还没有账号请 注册新账号