Browse Source

南溪物联网MQ数据对接配置

heshan 1 year ago
parent
commit
e59eb0e92b

+ 16 - 18
application/src/main/java/org/thingsboard/server/config/RabbitmqConfig.java

@@ -1,7 +1,7 @@
 package org.thingsboard.server.config;
 
-//import org.springframework.amqp.core.*;
-//import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Bean;
@@ -17,7 +17,6 @@ import org.thingsboard.server.service.nanxi.ScadaMonitorService;
  */
 @Configuration
 public class RabbitmqConfig {
-/*
 
 
     public static final String QUEUE_INFORM_SCADA = "queue_inform_nx_scada";
@@ -27,27 +26,26 @@ public class RabbitmqConfig {
 
 
     //声明交换机
-//    @Bean(EXCHANGE_TOPICS_INFORM)
-//    public Exchange EXCHANGE_TOPICS_INFORM() {
-//        //durable(true) 持久化,mq重启之后交换机还在
-//        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
-//    }
+   @Bean(EXCHANGE_TOPICS_INFORM)
+   public Exchange EXCHANGE_TOPICS_INFORM() {
+        //durable(true) 持久化,mq重启之后交换机还在
+        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
+    }
 
     //声明QUEUE_INFORM_SCADA队列
-//    @Bean(QUEUE_INFORM_SCADA)
-//    public Queue QUEUE_INFORM_SCADA() {
-//        return new Queue(QUEUE_INFORM_SCADA);
-//    }
+    @Bean(QUEUE_INFORM_SCADA)
+    public Queue QUEUE_INFORM_SCADA() {
+        return new Queue(QUEUE_INFORM_SCADA);
+    }
 
     //声明QUEUE_INFORM_SMS队列
 
     //ROUTINGKEY_SCADA队列绑定交换机,指定routingKey
-//    @Bean
-//    public Binding BINDING_QUEUE_INFORM_SCADA(@Qualifier(QUEUE_INFORM_SCADA) Queue queue,
-//                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
-//        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SCADA).noargs();
-//    }
+    @Bean
+    public Binding BINDING_QUEUE_INFORM_SCADA(@Qualifier(QUEUE_INFORM_SCADA) Queue queue,
+                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
+        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SCADA).noargs();
+    }
 
-*/
 
 }

+ 121 - 120
application/src/main/java/org/thingsboard/server/config/ReceiveHandler.java

@@ -1,15 +1,16 @@
 package org.thingsboard.server.config;
 
-//import cn.hutool.core.collection.CollUtil;
-//import cn.hutool.core.date.DateUtil;
-//import com.alibaba.fastjson.JSON;
-//import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-//import com.baomidou.mybatisplus.core.toolkit.Wrappers;
-//import com.rabbitmq.client.Channel;
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.date.DateUtil;
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
-//import org.jetbrains.annotations.NotNull;
-//import org.springframework.amqp.core.Message;
-//import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
@@ -46,117 +47,117 @@ public class ReceiveHandler {
 
 
     //监听scada队列
-//    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SCADA})
-//    @Transactional(rollbackFor = Exception.class)
-//    public void receive_scada(Object msg, Message message, Channel channel) {
-//
-//        byte[] body = message.getBody();
-//        String encode = Base64.getEncoder().encodeToString(body);
-//        byte[] decode = Base64.getDecoder().decode(encode);
-//        String s = new String(decode);
-//        log.info("获取到物联网设备发送的监听数据:{}", s);
-//
-//        String s1 = StrUtils.underlineToHump(s);
-//        ScadaDeviceData scadaDeviceData = JSON.parseObject(s1, ScadaDeviceData.class);
-//        if (!ObjectUtils.isEmpty(scadaDeviceData)) {
-//            scadaDeviceData.setJsonData(s);
-//            boolean b = deviceDataService.save(scadaDeviceData);
-//            if (b) {
-//                //保存数据到历史表
-//                saveScadaHistoryData(scadaDeviceData);
-//            }
-//
-//        }
-//    }
-
-
-
-//    @Transactional(rollbackFor = Exception.class)
-//    void saveScadaHistoryData(ScadaDeviceData scadaDeviceData) {
-//        Date generationTime = StringUtils.hasText(scadaDeviceData.getGenerationTime())
-//                ? DateUtil.parse(scadaDeviceData.getGenerationTime(), YYYY_MM_DD_HH_MM_SS) : new Date();
-//        List<ScadaHistory> scadaHistories = new ArrayList<>();
-//        //保存指标:压力
-//        if (StringUtils.hasText(scadaDeviceData.getPressure())) {
-//            String value = matcherVal(scadaDeviceData.getPressure());
-//            String codeType = JsonItemConst.g_pressure;
-//            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
-//            scadaHistories.add(history);
-//        }
-//        //保存指标:正向累计流
-//        if (StringUtils.hasText(scadaDeviceData.getCumPosFlows())) {
-//            String value = matcherVal(scadaDeviceData.getCumPosFlows());
-//            String codeType = JsonItemConst.g_cum_pos_flows;
-//            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
-//            scadaHistories.add(history);
-//        }
-//        //保存指标:逆向累计流量
-//        if (StringUtils.hasText(scadaDeviceData.getCumNegFlowsNum())) {
-//            String value = matcherVal(scadaDeviceData.getCumNegFlowsNum());
-//            String codeType = JsonItemConst.g_cum_neg_flows;
-//            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
-//            scadaHistories.add(history);
-//        }
-//        //保存指标:瞬时流量
-//        if (StringUtils.hasText(scadaDeviceData.getInstantaneousFlow())) {
-//            String value = matcherVal(scadaDeviceData.getInstantaneousFlow());
-//            String codeType = JsonItemConst.g_instantaneous_flow;
-//            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
-//            scadaHistories.add(history);
-//        }
-//        //保存指标:信号质量
-//        if (StringUtils.hasText(scadaDeviceData.getSignalQuality())) {
-//            String value = matcherVal(scadaDeviceData.getSignalQuality());
-//            String codeType = JsonItemConst.g_signal_quality;
-//            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
-//            scadaHistories.add(history);
-//        }
-//        //保存指标:电池电压
-//        if (StringUtils.hasText(scadaDeviceData.getBatteryVoltage())) {
-//            String value = matcherVal(scadaDeviceData.getBatteryVoltage());
-//            String codeType = JsonItemConst.g_battery_voltage;
-//            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
-//            scadaHistories.add(history);
-//        }
-//        List<ScadaHistory> scadaHistoryList = new ArrayList<>();
-//        if (!CollectionUtils.isEmpty(scadaHistories)){
-//            for (ScadaHistory scadaHistory : scadaHistories) {
-//                if (CollUtil.isNotEmpty(scadaHistoryService.list(new QueryWrapper<ScadaHistory>()
-//                        .eq("code", scadaHistory.getCode())
-//                        .eq("scada_time", scadaHistory.getScadaTime())))) {
-//                    continue;
-//                }
-//                scadaHistoryList.add(scadaHistory);
-//            }
-//            scadaHistoryService.saveBatch(scadaHistoryList);
-//            if(CollUtil.isNotEmpty(scadaHistoryList)){
-//                //实时数据先清空再新增
-//                scadaMonitorService.remove(Wrappers.<ScadaMonitor>query().lambda().isNotNull(ScadaMonitor::getId));
-//                List<ScadaMonitor> scadaMonitors = new ArrayList<>();
-//                scadaHistoryList.forEach(d->{
-//                    ScadaMonitor scadaMonitor = new ScadaMonitor();
-//                    scadaMonitor.setCode(d.getCode());
-//                    scadaMonitor.setValue(d.getValue());
-//                    scadaMonitor.setScadaTime(d.getScadaTime());
-//                    scadaMonitor.setUpdateTime(d.getUpdateTime());
-//                    scadaMonitor.setType(0);
-//                    scadaMonitor.setCodeType(d.getCodeType());
-//                    scadaMonitors.add(scadaMonitor);
-//                });
-//                scadaMonitorService.saveBatch(scadaMonitors);
-//            }
-//        }
-//    }
-//
-//    private ScadaHistory  getScadaHistory(ScadaDeviceData scadaDeviceData, Date generationTime, String value, String codeType) {
-//        ScadaHistory history = new ScadaHistory();
-//        history.setCode(scadaDeviceData.getDeviceName());
-//        history.setCodeType(codeType);
-//        history.setValue(value);
-//        history.setScadaTime(generationTime);
-//        history.setUpdateTime(new Date());
-//        return history;
-//    }
+    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SCADA})
+    @Transactional(rollbackFor = Exception.class)
+    public void receive_scada(Object msg, Message message, Channel channel) {
+
+        byte[] body = message.getBody();
+        String encode = Base64.getEncoder().encodeToString(body);
+        byte[] decode = Base64.getDecoder().decode(encode);
+        String s = new String(decode);
+        log.info("获取到物联网设备发送的监听数据:{}", s);
+
+        String s1 = StrUtils.underlineToHump(s);
+        ScadaDeviceData scadaDeviceData = JSON.parseObject(s1, ScadaDeviceData.class);
+        if (!ObjectUtils.isEmpty(scadaDeviceData)) {
+            scadaDeviceData.setJsonData(s);
+            boolean b = deviceDataService.save(scadaDeviceData);
+            if (b) {
+                //保存数据到历史表
+                saveScadaHistoryData(scadaDeviceData);
+            }
+
+        }
+    }
+
+
+
+    @Transactional(rollbackFor = Exception.class)
+    void saveScadaHistoryData(ScadaDeviceData scadaDeviceData) {
+        Date generationTime = StringUtils.hasText(scadaDeviceData.getGenerationTime())
+                ? DateUtil.parse(scadaDeviceData.getGenerationTime(), YYYY_MM_DD_HH_MM_SS) : new Date();
+        List<ScadaHistory> scadaHistories = new ArrayList<>();
+        //保存指标:压力
+        if (StringUtils.hasText(scadaDeviceData.getPressure())) {
+            String value = matcherVal(scadaDeviceData.getPressure());
+            String codeType = JsonItemConst.g_pressure;
+            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
+            scadaHistories.add(history);
+        }
+        //保存指标:正向累计流
+        if (StringUtils.hasText(scadaDeviceData.getCumPosFlows())) {
+            String value = matcherVal(scadaDeviceData.getCumPosFlows());
+            String codeType = JsonItemConst.g_cum_pos_flows;
+            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
+            scadaHistories.add(history);
+        }
+        //保存指标:逆向累计流量
+        if (StringUtils.hasText(scadaDeviceData.getCumNegFlowsNum())) {
+            String value = matcherVal(scadaDeviceData.getCumNegFlowsNum());
+            String codeType = JsonItemConst.g_cum_neg_flows;
+            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
+            scadaHistories.add(history);
+        }
+        //保存指标:瞬时流量
+        if (StringUtils.hasText(scadaDeviceData.getInstantaneousFlow())) {
+            String value = matcherVal(scadaDeviceData.getInstantaneousFlow());
+            String codeType = JsonItemConst.g_instantaneous_flow;
+            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
+            scadaHistories.add(history);
+        }
+        //保存指标:信号质量
+        if (StringUtils.hasText(scadaDeviceData.getSignalQuality())) {
+            String value = matcherVal(scadaDeviceData.getSignalQuality());
+            String codeType = JsonItemConst.g_signal_quality;
+            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
+            scadaHistories.add(history);
+        }
+        //保存指标:电池电压
+        if (StringUtils.hasText(scadaDeviceData.getBatteryVoltage())) {
+            String value = matcherVal(scadaDeviceData.getBatteryVoltage());
+            String codeType = JsonItemConst.g_battery_voltage;
+            ScadaHistory history = getScadaHistory(scadaDeviceData, generationTime, value, codeType);
+            scadaHistories.add(history);
+        }
+        List<ScadaHistory> scadaHistoryList = new ArrayList<>();
+        if (!CollectionUtils.isEmpty(scadaHistories)){
+            for (ScadaHistory scadaHistory : scadaHistories) {
+                if (CollUtil.isNotEmpty(scadaHistoryService.list(new QueryWrapper<ScadaHistory>()
+                        .eq("code", scadaHistory.getCode())
+                        .eq("scada_time", scadaHistory.getScadaTime())))) {
+                    continue;
+                }
+                scadaHistoryList.add(scadaHistory);
+            }
+            scadaHistoryService.saveBatch(scadaHistoryList);
+            if(CollUtil.isNotEmpty(scadaHistoryList)){
+                //实时数据先清空再新增
+                scadaMonitorService.remove(Wrappers.<ScadaMonitor>query().lambda().isNotNull(ScadaMonitor::getId));
+                List<ScadaMonitor> scadaMonitors = new ArrayList<>();
+                scadaHistoryList.forEach(d->{
+                    ScadaMonitor scadaMonitor = new ScadaMonitor();
+                    scadaMonitor.setCode(d.getCode());
+                    scadaMonitor.setValue(d.getValue());
+                    scadaMonitor.setScadaTime(d.getScadaTime());
+                    scadaMonitor.setUpdateTime(d.getUpdateTime());
+                    scadaMonitor.setType(0);
+                    scadaMonitor.setCodeType(d.getCodeType());
+                    scadaMonitors.add(scadaMonitor);
+                });
+                scadaMonitorService.saveBatch(scadaMonitors);
+            }
+        }
+    }
+
+    private ScadaHistory  getScadaHistory(ScadaDeviceData scadaDeviceData, Date generationTime, String value, String codeType) {
+        ScadaHistory history = new ScadaHistory();
+        history.setCode(scadaDeviceData.getDeviceName());
+        history.setCodeType(codeType);
+        history.setValue(value);
+        history.setScadaTime(generationTime);
+        history.setUpdateTime(new Date());
+        return history;
+    }
 
 
     /**

+ 7 - 0
application/src/main/resources/bootstrap.yml

@@ -402,6 +402,13 @@ spring:
     password: "${SPRING_DATASOURCE_PASSWORD:tofly@028..}"
     hikari:
       maximumPoolSize: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:16}"
+  spring:
+    rabbitmq:
+      host: 221.182.8.141
+      port: 11051
+      username: nxsmx
+      password: nxsmx
+      virtualHost: /
       
    
 

+ 2 - 2
common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java

@@ -25,8 +25,8 @@ import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.TbTransportService;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.queue.ServiceType;
-import org.thingsboard.server.gen.transport.TransportProtos;
-import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
+//import org.thingsboard.server.gen.transport.TransportProtos;
+//import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 import org.thingsboard.server.queue.util.AfterContextReady;