Spring integration交互逻辑
1、maven依赖
2、yaml配置文件
3、mqtt生产者消费者配置类
4、消息处理类
5、mqtt发送接口
6、mqtt事件监听类
7、接口测试
总结
Spring integration交互逻辑对于发布者:
1.消息通过消息网关发送出去,由 MessageChannel
的实例 DirectChannel
处理发送的细节。
2.DirectChannel
收到消息后,内部通过 MessageHandler
的实例 MqttPahoMessageHandler
发送到指定的 Topic。
对于订阅者:
1.通过注入 MessageProducerSupport
的实例 MqttPahoMessageDrivenChannelAdapter
,实现订阅 Topic 和绑定消息消费的 MessageChannel
。
2.同样由 MessageChannel
的实例 DirectChannel
处理消费细节。
Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler
实例进行消费。
可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定
1、maven依赖<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.5.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.5.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.5</version>
</dependency>
2、yaml配置文件
#mqtt配置
mqtt:
username: 123
password: 123
#MQTT-服务器连接地址,如果有多个,用逗号隔开
url: tcp://127.0.0.1:1883
#MQTT-连接服务器默认客户端ID
client:
id: ${random.value}
default:
#MQTT-默认的消息推送主题,实际可在调用接口时指定
topic: topic,mqtt/test/#
#连接超时
completionTimeout: 3000
3、mqtt生产者消费者配置类
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.Arrays;
import java.util.List;
/**
* mqtt 推送and接收 消息类
**/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderAndReceiveConfig {
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Autowired
private MqttReceiveHandle mqttReceiveHandle;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String hostUrl;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.default.topic}")
private String defaultTopic;
@Value("${mqtt.completionTimeout}")
private int completionTimeout; //连接超时
/**
* MQTT连接器选项
**/
@Bean(value = "getMqttConnectOptions")
public MqttConnectOptions getMqttConnectOptions1() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions.setKeepAliveInterval(10);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
return mqttConnectOptions;
}
/**
* MQTT工厂
**/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions1());
return factory;
}
/**
* MQTT信息通道(生产者)
**/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(生产者)
**/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调
//设置转换器 发送bytes数据
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
return messageHandler;
}
/**
* 配置client,监听的topic
* MQTT消息订阅绑定(消费者)
**/
@Bean
public MessageProducer inbound() {
List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
String[] topics = new String[topicList.size()];
topicList.toArray(topics);
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);
adapter.setCompletionTimeout(completionTimeout);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* MQTT信息通道(消费者)
**/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(消费者)
**/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//处理接收消息
mqttReceiveHandle.handle(message);
}
};
}
}
4、消息处理类
/**
* mqtt客户端消息处理类
**/
@Slf4j
@Component
public class MqttReceiveHandle {
public void handle(Message<?> message) {
log.info("收到订阅消息: {}", message);
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
log.info("消息主题:{}", topic);
Object payLoad = message.getPayload();
byte[] data = (byte[]) payLoad;
Packet packet = Packet.parse(data);
log.info("发送的Packet数据{}", JSON.toJSONString(packet));
}
}
5、mqtt发送接口
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* mqtt发送消息
* (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)
* **/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送信息到MQTT服务器
*
* @param
*/
void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
}
6、mqtt事件监听类
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqttListener {
/**
* 连接失败的事件通知
* @param mqttConnectionFailedEvent
*/
@EventListener(classes = MqttConnectionFailedEvent.class)
public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
log.info("连接失败的事件通知");
}
/**
* 已发送的事件通知
* @param mqttMessageSentEvent
*/
@EventListener(classes = MqttMessageSentEvent.class)
public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
log.info("已发送的事件通知");
}
/**
* 已传输完成的事件通知
* 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执
* 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知
* 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知
* @param mqttMessageDeliveredEvent
*/
@EventListener(classes = MqttMessageDeliveredEvent.class)
public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
log.info("已传输完成的事件通知");
}
/**
* 消息订阅的事件通知
* @param mqttSubscribedEvent
*/
@EventListener(classes = MqttSubscribedEvent.class)
public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
log.info("消息订阅的事件通知");
}
}
7、接口测试
@Resource
private MqttGateway mqttGateway;
/**
* sendData 消息
* topic 订阅主题
**/
@RequestMapping(value = "/sendMqtt",method = RequestMethod.POST)
public String sendMqtt(String sendData, String topic) {
MqttMessage mqttMessage = new MqttMessage();
mqttGateway.sendToMqtt(topic, sendData);
//mqttGateway.sendToMqttObject(topic, sendData.getBytes());
return "OK";
}
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持软件开发网。