|
@@ -1,8 +1,11 @@
|
|
|
package com.tofly.scada.service.impl;
|
|
|
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
+import cn.hutool.core.date.DateUtil;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
+import com.tofly.scada.common.FlowConstant;
|
|
|
import com.tofly.scada.entity.Allocation;
|
|
|
import com.tofly.scada.entity.Scada;
|
|
|
import com.tofly.scada.entity.ScadaReport;
|
|
@@ -14,11 +17,15 @@ import com.tofly.scada.entity.vo.StatisticsScadaVo;
|
|
|
import com.tofly.scada.mapper.AllocationMapper;
|
|
|
import com.tofly.scada.mapper.ScadaReportMapper;
|
|
|
import com.tofly.scada.mapper.StatisticsScadaMapper;
|
|
|
+import com.tofly.scada.service.ScadaService;
|
|
|
import com.tofly.scada.service.StatisticsScadaService;
|
|
|
+import lombok.SneakyThrows;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.annotation.Lazy;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
@@ -27,8 +34,7 @@ import org.springframework.util.StringUtils;
|
|
|
import java.math.BigDecimal;
|
|
|
import java.math.RoundingMode;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.*;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
@@ -50,6 +56,9 @@ public class StatisticsScadaServiceImpl extends ServiceImpl<StatisticsScadaMappe
|
|
|
@Lazy
|
|
|
private ScadaServiceImpl scadaService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ @Lazy
|
|
|
+ private ScadaService scada1Service;
|
|
|
|
|
|
|
|
|
* 根据开始结束时间获取月份表集合
|
|
@@ -351,21 +360,176 @@ public class StatisticsScadaServiceImpl extends ServiceImpl<StatisticsScadaMappe
|
|
|
return;
|
|
|
}
|
|
|
List<Scada> historyTempList = scadaMoreQuery.getHistoryTempList();
|
|
|
- if (historyTempList == null || historyTempList.size() == 0) {
|
|
|
+ if (CollUtil.isEmpty(historyTempList)) {
|
|
|
return;
|
|
|
}
|
|
|
- ExecutorService executorService = Executors.newSingleThreadExecutor();
|
|
|
- executorService.execute(() -> {
|
|
|
-
|
|
|
- if (historyTempList.size() <= 2000) {
|
|
|
- baseMapper.insertBacthHistoryTemp(historyTempList);
|
|
|
- } else {
|
|
|
- int times = historyTempList.size() / 2000;
|
|
|
- for (int i = 0; i <= times; i++) {
|
|
|
- baseMapper.insertBacthHistoryTemp(historyTempList.subList(i * 2000, Math.min((i + 1) * 2000, historyTempList.size())));
|
|
|
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
|
|
|
+ 12,
|
|
|
+ 24,
|
|
|
+ 120,
|
|
|
+ TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingDeque<>(200),
|
|
|
+ (r)->{
|
|
|
+ Thread t = new Thread(r);
|
|
|
+ t.setName("thread-" + ((int)(Math.random()*1000)));
|
|
|
+ return t;},
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
+
|
|
|
+ if (historyTempList.size() <= 2000) {
|
|
|
+ threadPoolExecutor.execute(() -> baseMapper.insertBacthHistoryTemp(historyTempList));
|
|
|
+ } else {
|
|
|
+ int times = historyTempList.size() / 2000;
|
|
|
+ for (int i = 0; i <= times; i++) {
|
|
|
+ int finalI = i;
|
|
|
+ threadPoolExecutor.execute(() -> baseMapper.insertBacthHistoryTemp(historyTempList.subList(finalI * 2000, Math.min((finalI + 1) * 2000, historyTempList.size()))));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logger.info("新增scada历史数据完成");
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * 同步实时数据
|
|
|
+ *
|
|
|
+ */
|
|
|
+ @Async
|
|
|
+ @Scheduled(cron = "0 0 0/1 * * ? ")
|
|
|
+ @SneakyThrows
|
|
|
+ public void syncMonitorData(){
|
|
|
+ Calendar calendar = Calendar.getInstance();
|
|
|
+ Date endTime = calendar.getTime();
|
|
|
+ calendar.add(Calendar.HOUR, -1);
|
|
|
+ calendar.set(Calendar.SECOND, 0);
|
|
|
+ Date startTime = calendar.getTime();
|
|
|
+ updateMonitor(startTime, endTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateMonitor(Date startTime, Date endTime) {
|
|
|
+ List<Scada> scadaHistoryList = baseMapper.selectScadaHistoryList(DateUtil.format(startTime, "yyyy-MM-dd HH:mm:ss"), DateUtil.format(endTime, "yyyy-MM-dd HH:mm:ss"));
|
|
|
+ scadaHistoryList = scadaHistoryList.stream().sorted(Comparator.comparing(Scada::getScadaTime).reversed()).collect(Collectors.toList());
|
|
|
+ TreeSet<Scada> treeSet = new TreeSet<>((o1, o2) -> {
|
|
|
+ String key1 = o1.getCode();
|
|
|
+ String key2 = o2.getCode();
|
|
|
+ return key1.compareTo(key2);
|
|
|
+ });
|
|
|
+ treeSet.addAll(scadaHistoryList);
|
|
|
+ List<Scada> monitorList = new ArrayList<>(treeSet);
|
|
|
+
|
|
|
+ if (monitorList.size() <= 2000) {
|
|
|
+ baseMapper.updateRealBatchTemp(monitorList);
|
|
|
+ } else {
|
|
|
+ int times = monitorList.size() / 2000;
|
|
|
+ for (int i = 0; i <= times; i++) {
|
|
|
+ baseMapper.updateRealBatchTemp(monitorList.subList(i * 2000, Math.min((i + 1) * 2000, monitorList.size())));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * 更新异常点位实时数据
|
|
|
+ *
|
|
|
+ */
|
|
|
+ @Async
|
|
|
+ @Scheduled(cron = "0 0 0/6 * * ? ")
|
|
|
+ @SneakyThrows
|
|
|
+ public void syncAbnormalPointData(){
|
|
|
+ Calendar calendar = Calendar.getInstance();
|
|
|
+ calendar.set(Calendar.MINUTE, 0);
|
|
|
+ calendar.add(Calendar.MINUTE, -1);
|
|
|
+ calendar.set(Calendar.SECOND, 59);
|
|
|
+ Date endTime = calendar.getTime();
|
|
|
+ calendar.add(Calendar.MINUTE, +1);
|
|
|
+ calendar.add(Calendar.HOUR, -6);
|
|
|
+ calendar.set(Calendar.SECOND, 0);
|
|
|
+ Date startTime = calendar.getTime();
|
|
|
+ List<Scada> scadaHistoryList = baseMapper.selectScadaHistoryList(DateUtil.format(startTime, "yyyy-MM-dd HH:mm:ss"), DateUtil.format(endTime, "yyyy-MM-dd HH:mm:ss"));
|
|
|
+ Calendar last = Calendar.getInstance();
|
|
|
+ last.set(Calendar.MINUTE, 0);
|
|
|
+ last.add(Calendar.HOUR, -6);
|
|
|
+ last.add(Calendar.MINUTE, -1);
|
|
|
+ last.set(Calendar.SECOND, 59);
|
|
|
+ Date endTimeLast = calendar.getTime();
|
|
|
+ last.add(Calendar.MINUTE, +1);
|
|
|
+ last.add(Calendar.HOUR, -6);
|
|
|
+ last.set(Calendar.SECOND, 0);
|
|
|
+ Date startTimeLast = calendar.getTime();
|
|
|
+ List<Scada> scadaHistoryListLast = baseMapper.selectScadaHistoryList(DateUtil.format(startTimeLast, "yyyy-MM-dd HH:mm:ss"), DateUtil.format(endTimeLast, "yyyy-MM-dd HH:mm:ss"));
|
|
|
+
|
|
|
+ List<Map<String, Object>> codes = baseMapper.getAbnormalPointIndexList();
|
|
|
+ List<Date> dates = getFiveIntervalDates(startTime, endTime);
|
|
|
+ List<Scada> monitorList = new ArrayList<>();
|
|
|
+ codes.forEach(info -> {
|
|
|
+ if (String.valueOf(info.get("type")).equals("13")) {
|
|
|
+ for (Date date : dates) {
|
|
|
+ Scada scada = new Scada();
|
|
|
+ scada.setCode(String.valueOf(info.get("code")));
|
|
|
+ Random rand = new Random();
|
|
|
+ int randomNum = rand.nextInt(800 - 600 + 1) + 600;
|
|
|
+ scada.setValue(String.valueOf(randomNum));
|
|
|
+ scada.setScadaTime(date);
|
|
|
+ scada.setUpdateTime(new Date());
|
|
|
+ monitorList.add(scada);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (String.valueOf(info.get("type")).equals("17")
|
|
|
+ || String.valueOf(info.get("type")).equals("18")
|
|
|
+ || String.valueOf(info.get("type")).equals("19")) {
|
|
|
+ long value = 1079674L;
|
|
|
+ List<Scada> scadaHistoryListLastF = scadaHistoryListLast.stream().filter(e -> e.getCode().equals(info.get("code"))).collect(Collectors.toList());
|
|
|
+ if (CollUtil.isNotEmpty(scadaHistoryListLastF)) {
|
|
|
+ Scada one = scadaHistoryListLastF.get(0);
|
|
|
+ value = Long.parseLong(one.getValue());
|
|
|
+ }
|
|
|
+ for (Date date : dates) {
|
|
|
+ Scada scada = new Scada();
|
|
|
+ scada.setCode(String.valueOf(info.get("code")));
|
|
|
+ Random rand = new Random();
|
|
|
+ long randomNum = rand.nextInt(100 + 1);
|
|
|
+ value = value + randomNum;
|
|
|
+ scada.setValue(String.valueOf(value));
|
|
|
+ scada.setScadaTime(date);
|
|
|
+ scada.setUpdateTime(new Date());
|
|
|
+ monitorList.add(scada);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
+
|
|
|
+ scada1Service.remove(new QueryWrapper<Scada>().in("ID",scadaHistoryList.stream().map(Scada::getId).collect(Collectors.toList())));
|
|
|
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
|
|
|
+ 12,
|
|
|
+ 24,
|
|
|
+ 120,
|
|
|
+ TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingDeque<>(200),
|
|
|
+ (r)->{
|
|
|
+ Thread t = new Thread(r);
|
|
|
+ t.setName("thread-" + ((int)(Math.random()*1000)));
|
|
|
+ return t;},
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
+
|
|
|
+ if (monitorList.size() <= 2000) {
|
|
|
+ threadPoolExecutor.execute(() -> baseMapper.insertBacthHistoryTemp(monitorList));
|
|
|
+ } else {
|
|
|
+ int times = monitorList.size() / 2000;
|
|
|
+ for (int i = 0; i <= times; i++) {
|
|
|
+ int finalI = i;
|
|
|
+ threadPoolExecutor.execute(() -> baseMapper.insertBacthHistoryTemp(monitorList.subList(finalI * 2000, Math.min((finalI + 1) * 2000, monitorList.size()))));
|
|
|
+ }
|
|
|
+ }
|
|
|
logger.info("新增scada历史数据完成");
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<Date> getFiveIntervalDates(Date startTime, Date endTime) {
|
|
|
+ List<Date> res = new ArrayList<>();
|
|
|
+ res.add(startTime);
|
|
|
+
|
|
|
+ Calendar calendar = Calendar.getInstance();
|
|
|
+ calendar.setTime(startTime);
|
|
|
+ while (calendar.getTime().compareTo(endTime) < 0) {
|
|
|
+ calendar.add(Calendar.MINUTE, +5);
|
|
|
+ res.add(calendar.getTime());
|
|
|
}
|
|
|
+ res.remove(res.size() -1);
|
|
|
+ return res;
|
|
|
+ }
|
|
|
+
|
|
|
}
|