Browse Source

mqtt提交

linzhiwei 2 years ago
parent
commit
b4896c8854

+ 31 - 0
snws-monitor/snws-monitor-boot/pom.xml

@@ -91,6 +91,37 @@
             <groupId>org.springframework</groupId>
             <artifactId>spring-test</artifactId>
         </dependency>
+        <!-- Mqtt -->
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.10.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>2.10.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>2.10.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.0</version>
+        </dependency>
         <dependency>
             <groupId>org.jooq</groupId>
             <artifactId>joou-java-6</artifactId>

+ 122 - 106
snws-monitor/snws-monitor-boot/src/main/java/com/tofly/monitor/config/MqttConfig.java

@@ -1,112 +1,128 @@
 package com.tofly.monitor.config;
 
 
+import com.tofly.monitor.mqtt.MqttMessageHandler;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+
 import javax.annotation.Resource;
 
-//@Configuration
-//public class MqttConfig {
-//
-//    @Resource
-//    private MqttMessageHandler messageHandler;
-//
-//    /**
-//     * 1、先创建连接
-//     */
-//
-//    /**
-//     * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
-//     *
-//     * @return factory
-//     */
-//    @Bean
-//    public MqttPahoClientFactory mqttClientFactory() {
-//        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
-//        MqttConnectOptions options = new MqttConnectOptions();
-//
-//        // 设置代理端的URL地址,可以是多个
-//        options.setServerURIs(new String[]{"tcp://60.255.72.86:1833"});
-//        options.setUserName("admin");
-//        options.setPassword("mosquitto_20221220".toCharArray());
-//        factory.setConnectionOptions(options);
-//        return factory;
-//    }
-//
-//
-//    /**
-//     * 2、入站通道
-//     */
-//    @Bean
-//    public MessageChannel mqttInputChannel() {
-//        return new DirectChannel();
-//    }
-//
-//    /**
-//     * 入站,配置mq,监听主题topic
-//     */
-//    @Bean
-//    public MessageProducer inbound() {
-//        // Paho客户端消息驱动通道适配器,主要用来订阅主题
-//        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho",
-//                mqttClientFactory(), "boat", "collector", "battery", "+/sensor", "demo");
-//        adapter.setCompletionTimeout(5000);
-//
-//        // Paho消息转换器
-//        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
-//        // 按字节接收消息
-////        defaultPahoMessageConverter.setPayloadAsBytes(true);
-//        adapter.setConverter(defaultPahoMessageConverter);
-//        adapter.setQos(1); // 设置QoS
-//        adapter.setOutputChannel(mqttInputChannel());
-//        return adapter;
-//    }
-//
-//
-//    /**
-//     * 3、消息转化,中间站
-//     */
-//
-//    @Bean
-//    // ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
-//    @ServiceActivator(inputChannel = "mqttInputChannel")
-//    public MessageHandler handler() {
-//        return message -> {
-//            messageHandler.handleMessage(message);
-//        };
-//    }
-//
-//
-//    /**
-//     * 4、消息出去
-//     */
-//
-//    /**
-//     * 出站通道
-//     */
-//    @Bean
-//    public MessageChannel mqttOutboundChannel() {
-//        return new DirectChannel();
-//    }
-//
-//    /**
-//     * 出站
-//     */
-//    @Bean
-//    @ServiceActivator(inputChannel = "mqttOutboundChannel")
-//    public MessageHandler outbound() {
-//
-//        // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
-//        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory());
-//        messageHandler.setAsync(true); // 如果设置成true,即异步,发送消息时将不会阻塞。
-//        messageHandler.setDefaultTopic("command");
-//        messageHandler.setDefaultQos(1); // 设置默认QoS
-//
-//        // Paho消息转换器
-//        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
-//
-//        // defaultPahoMessageConverter.setPayloadAsBytes(true); // 发送默认按字节类型发送消息
-//        messageHandler.setConverter(defaultPahoMessageConverter);
-//        return messageHandler;
-//    }
-//
-//}
+@Configuration
+public class MqttConfig {
+
+    @Resource
+    private MqttMessageHandler messageHandler;
+
+    /**
+     * 1、先创建连接
+     */
+
+    /**
+     * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
+     *
+     * @return factory
+     */
+    @Bean
+    public MqttPahoClientFactory mqttClientFactory() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        MqttConnectOptions options = new MqttConnectOptions();
+
+        // 设置代理端的URL地址,可以是多个
+        options.setServerURIs(new String[]{"tcp://60.255.72.86:1833"});
+        options.setUserName("admin");
+        options.setPassword("mosquitto_20221220".toCharArray());
+        factory.setConnectionOptions(options);
+        return factory;
+    }
+
+
+    /**
+     * 2、入站通道
+     */
+    @Bean
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * 入站,配置mq,监听主题topic
+     */
+    @Bean
+    public MessageProducer inbound() {
+        // Paho客户端消息驱动通道适配器,主要用来订阅主题
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho",
+                mqttClientFactory(), "boat", "collector", "battery", "+/sensor", "demo");
+        adapter.setCompletionTimeout(5000);
+
+        // Paho消息转换器
+        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
+        // 按字节接收消息
+//        defaultPahoMessageConverter.setPayloadAsBytes(true);
+        adapter.setConverter(defaultPahoMessageConverter);
+        adapter.setQos(1); // 设置QoS
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+
+
+    /**
+     * 3、消息转化,中间站
+     */
+
+    @Bean
+    // ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
+    @ServiceActivator(inputChannel = "mqttInputChannel")
+    public MessageHandler handler() {
+        return message -> {
+            messageHandler.handleMessage(message);
+        };
+    }
+
+
+    /**
+     * 4、消息出去
+     */
+
+    /**
+     * 出站通道
+     */
+    @Bean
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * 出站
+     */
+    @Bean
+    @ServiceActivator(inputChannel = "mqttOutboundChannel")
+    public MessageHandler outbound() {
+
+        // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
+        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory());
+        messageHandler.setAsync(true); // 如果设置成true,即异步,发送消息时将不会阻塞。
+        messageHandler.setDefaultTopic("command");
+        messageHandler.setDefaultQos(1); // 设置默认QoS
+
+        // Paho消息转换器
+        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
+
+        // defaultPahoMessageConverter.setPayloadAsBytes(true); // 发送默认按字节类型发送消息
+        messageHandler.setConverter(defaultPahoMessageConverter);
+        return messageHandler;
+    }
+
+}
 

+ 15 - 11
snws-monitor/snws-monitor-boot/src/main/java/com/tofly/monitor/mqtt/DemoTopicHandler.java

@@ -1,14 +1,18 @@
 package com.tofly.monitor.mqtt;
 
 
-//@Service("demoTopicHandler")
-//@Slf4j
-//class DemoTopicHandler implements TopicHandler {
-//
-//
-//    @Override
-//    public void handler(Message<?> message) {
-//        String payload = String.valueOf(message.getPayload());
-//        log.info("handler(),串口数据接收,payload:{}", payload);
-//    }
-//}
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Service;
+
+@Service("demoTopicHandler")
+@Slf4j
+class DemoTopicHandler implements TopicHandler {
+
+
+    @Override
+    public void handler(Message<?> message) {
+        String payload = String.valueOf(message.getPayload());
+        log.info("handler(),串口数据接收,payload:{}", payload);
+    }
+}

+ 11 - 8
snws-monitor/snws-monitor-boot/src/main/java/com/tofly/monitor/mqtt/MqttGateway.java

@@ -1,12 +1,15 @@
 package com.tofly.monitor.mqtt;
 
 
-
-//@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
-//public interface MqttGateway {
-//    // 定义重载方法,用于消息发送
-//    void sendToMqtt(String payload);
-//    // 指定topic进行消息发送
-//    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
-//}
+import com.netflix.ribbon.proxy.annotation.Http;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+
+@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
+public interface MqttGateway {
+    // 定义重载方法,用于消息发送
+    void sendToMqtt(String payload);
+    // 指定topic进行消息发送
+    void sendToMqtt(@Http.Header(name = "", value = MqttHeaders.TOPIC) String topic, String payload);
+}
 

+ 28 - 23
snws-monitor/snws-monitor-boot/src/main/java/com/tofly/monitor/mqtt/MqttMessageHandler.java

@@ -1,27 +1,32 @@
 package com.tofly.monitor.mqtt;
 
 
+import cn.hutool.extra.spring.SpringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.stereotype.Component;
 
-//@Component("messageHandler")
-//@Slf4j
-//public class MqttMessageHandler implements MessageHandler {
-//
-//    @Override
-//    public void handleMessage(Message<?> message) {
-//        String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
-//        log.info("handleMessage(),消息处理,topic:{}", topic);
-//        MqttTopicEnum mqttTopicEnum = MqttTopicEnum.byTopic(topic);
-//        if (null == mqttTopicEnum) {
-//            log.warn("handleMessage(),消息处理,warnMsg:topic消费未定义,不处理");
-//            return;
-//        }
-//
-//        TopicHandler topicHandler = SpringUtil.getBean(mqttTopicEnum.getTopicHandlerBeanName(), TopicHandler.class);
-//        if (null == topicHandler) {
-//            log.warn("handleMessage(),消息处理,warnMsg:topicHandler未实现");
-//            return;
-//        }
-//        // topic消费处理
-//        topicHandler.handler(message);
-//    }
-//}
+@Component("messageHandler")
+@Slf4j
+public class MqttMessageHandler implements MessageHandler {
+
+    @Override
+    public void handleMessage(Message<?> message) {
+        String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
+        log.info("handleMessage(),消息处理,topic:{}", topic);
+        MqttTopicEnum mqttTopicEnum = MqttTopicEnum.byTopic(topic);
+        if (null == mqttTopicEnum) {
+            log.warn("handleMessage(),消息处理,warnMsg:topic消费未定义,不处理");
+            return;
+        }
+
+        TopicHandler topicHandler = SpringUtil.getBean(mqttTopicEnum.getTopicHandlerBeanName(), TopicHandler.class);
+        if (null == topicHandler) {
+            log.warn("handleMessage(),消息处理,warnMsg:topicHandler未实现");
+            return;
+        }
+        // topic消费处理
+        topicHandler.handler(message);
+    }
+}

+ 28 - 22
snws-monitor/snws-monitor-boot/src/main/java/com/tofly/monitor/mqtt/MqttTopicEnum.java

@@ -1,26 +1,32 @@
 package com.tofly.monitor.mqtt;
 
 
-//@AllArgsConstructor
-//@Getter
-//public enum MqttTopicEnum {
-//    Serial_PORT("demo", "测试topic", "demoTopicHandler", DemoTopicHandler.class),
-//    ;
-//    private String topic;
-//    private String text;
-//    private String topicHandlerBeanName;
-//    private Class<? extends TopicHandler> clazz;
-//
-//    private static final Map<String, MqttTopicEnum> DATA_MAP;
-//    static {
-//        DATA_MAP = new HashMap<>();
-//        for (MqttTopicEnum i : MqttTopicEnum.values()) {
-//            DATA_MAP.put(i.getTopic(), i);
-//        }
-//    }
-//
-//    public static MqttTopicEnum byTopic(String topic) {
-//        return DATA_MAP.get(topic);
-//    }
-//}
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@AllArgsConstructor
+@Getter
+public enum MqttTopicEnum {
+    Serial_PORT("demo", "测试topic", "demoTopicHandler", DemoTopicHandler.class),
+    ;
+    private String topic;
+    private String text;
+    private String topicHandlerBeanName;
+    private Class<? extends TopicHandler> clazz;
+
+    private static final Map<String, MqttTopicEnum> DATA_MAP;
+    static {
+        DATA_MAP = new HashMap<>();
+        for (MqttTopicEnum i : MqttTopicEnum.values()) {
+            DATA_MAP.put(i.getTopic(), i);
+        }
+    }
+
+    public static MqttTopicEnum byTopic(String topic) {
+        return DATA_MAP.get(topic);
+    }
+}
 

+ 10 - 8
snws-monitor/snws-monitor-boot/src/main/java/com/tofly/monitor/mqtt/TopicHandler.java

@@ -1,14 +1,16 @@
 package com.tofly.monitor.mqtt;
 
 
-//public interface TopicHandler {
-
-//    /**
-//     * 处理
-//     * @param message
-//     */
-//    void handler(Message<?> message);
-//}
+import org.springframework.messaging.Message;
+
+public interface TopicHandler {
+
+    /**
+     * 处理
+     * @param message
+     */
+    void handler(Message<?> message);
+}