ScadaServiceImpl.java 87 KB


  1. package com.tofly.scada.service.impl;
  2. import cn.hutool.core.date.DateUtil;
  3. import com.alibaba.fastjson.JSON;
  4. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  5. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  6. import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  7. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  8. import com.tofly.common.core.util.StringUtil;
  9. import com.tofly.common.oauth.exception.ToflyDeniedException;
  10. import com.tofly.scada.common.FlowConstant;
  11. import com.tofly.scada.common.MessageTopicEnum;
  12. import com.tofly.scada.entity.*;
  13. import com.tofly.scada.entity.dto.DeviceQuery;
  14. import com.tofly.scada.entity.dto.ScadaQuery;
  15. import com.tofly.scada.entity.vo.*;
  16. import com.tofly.scada.mapper.AllocationMapper;
  17. import com.tofly.scada.mapper.DeviceArchiveManageMapper;
  18. import com.tofly.scada.mapper.ScadaMapper;
  19. import com.tofly.scada.mapper.ScadaReportMapper;
  20. import com.tofly.scada.service.*;
  21. import com.tofly.scada.util.SendTidingsUtil;
  22. import com.tofly.scada.util.TimeUtils;
  23. import lombok.SneakyThrows;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.slf4j.Logger;
  26. import org.slf4j.LoggerFactory;
  27. import org.springframework.beans.BeanUtils;
  28. import org.springframework.beans.factory.annotation.Autowired;
  29. import org.springframework.beans.factory.annotation.Value;
  30. import org.springframework.context.annotation.Configuration;
  31. import org.springframework.scheduling.annotation.Async;
  32. import org.springframework.scheduling.annotation.Scheduled;
  33. import org.springframework.stereotype.Service;
  34. import org.springframework.transaction.annotation.Transactional;
  35. import org.springframework.util.CollectionUtils;
  36. import org.springframework.util.ObjectUtils;
  37. import org.springframework.util.StringUtils;
  38. import javax.annotation.PostConstruct;
  39. import java.math.BigDecimal;
  40. import java.math.RoundingMode;
  41. import java.text.ParseException;
  42. import java.text.SimpleDateFormat;
  43. import java.time.LocalDate;
  44. import java.time.LocalDateTime;
  45. import java.util.*;
  46. import java.util.concurrent.CountDownLatch;
  47. import java.util.concurrent.ExecutorService;
  48. import java.util.concurrent.Executors;
  49. import java.util.concurrent.atomic.AtomicReference;
  50. import java.util.regex.Matcher;
  51. import java.util.regex.Pattern;
  52. import java.util.stream.Collectors;
  53. /**
  54. * @author HaiQiu
  55. * @date 2022/4/25
  56. */
  57. @Service
  58. @Slf4j
  59. public class ScadaServiceImpl extends ServiceImpl<ScadaMapper, Scada> implements ScadaService {
  60. /**
  61. * 统计量放行记录的code
  62. */
  63. // @Value("${statisticsScadaNew.code}")
  64. private List<String> llj = new ArrayList<>();
  65. private final Logger logger = LoggerFactory.getLogger(ScadaServiceImpl.class);
  66. private static final Pattern pattern = Pattern.compile("-?[0-9]+.?[0-9]*");
  67. public static final String SCADA_HISTORY = "SCADA_HISTORY";
  68. @Autowired
  69. private ScadaMapper scadaMapper;
  70. @Autowired
  71. private StatisticsScadaService statisticsScadaService;
  72. @Autowired
  73. private AllocationMapper allocationMapper;
  74. @Autowired
  75. private DeviceArchiveManageMapper deviceArchiveManageMapper;
  76. @Autowired
  77. private ScadaReportMapper reportMapper;
  78. @Autowired
  79. private SendTidingsUtil sendTidingsUtil;
  80. @Autowired
  81. private ScadaReportServiceImpl scadaReportService;
  82. @Autowired
  83. private TfScadaReportListService tfScadaReportListService;
  84. @Autowired
  85. private AllocationService allocationService;
  86. @Autowired
  87. private DmatableService dmatableService;
  88. @Autowired
  89. private ScadaMonitorService scadaMonitorService;
  90. /**
  91. * 根据当前时间动态自动创建月份表,创建完成之后返回表名
  92. * <p>
  93. * 往后往前推迟月份,1即创建下一月的表,-1表示创建上个月表
  94. *
  95. * @return 月份表名
  96. */
  97. //每月最后28日凌晨2点执行创建下个月的数据表
  98. @Scheduled(cron = "0 0 2 28 * ? ")
  99. public void createMonths() {
  100. Calendar cal = Calendar.getInstance();
  101. cal.setTime(new Date());
  102. cal.add(Calendar.MONTH, 1);
  103. SimpleDateFormat format = new SimpleDateFormat("yyyy_MM");
  104. String date = format.format(cal.getTime());
  105. String tableName = "SCADA_MONTH_" + date;
  106. logger.info("开始检测Scada月份表状态");
  107. if (scadaMapper.verifyTableMonthsIsExits(tableName) <= 0) {
  108. logger.info("Scada月份表状态:不存在,开始创建");
  109. scadaMapper.createTableMonths(tableName);
  110. logger.info("Scada月份表状态:创建完成");
  111. }
  112. logger.info("结束检测Scada月份表状态");
  113. }
  114. public String createMonths(Date time) {
  115. Calendar cal = Calendar.getInstance();
  116. cal.setTime(time == null ? new Date() : time);
  117. SimpleDateFormat format = new SimpleDateFormat("yyyy_MM");
  118. String date = format.format(cal.getTime());
  119. String tableName = "SCADA_MONTH_" + date;
  120. logger.info("开始检测Scada月份表状态");
  121. if (scadaMapper.verifyTableMonthsIsExits(tableName) <= 0) {
  122. logger.info("Scada月份表状态:不存在,开始创建");
  123. scadaMapper.createTableMonths(tableName);
  124. logger.info("Scada月份表状态:创建完成");
  125. }
  126. logger.info("结束检测Scada月份表状态");
  127. return tableName;
  128. }
  129. /**
  130. * 同步24小时数据,支持自定义时间添加
  131. */
  132. //每小时执行一次(整点整分)
  133. @Scheduled(cron = "0 0 * * * ? ")
  134. public void insert24H() {
  135. logger.info("同步scada历史数据到24小时表:正在进行时间调整");
  136. //整个日历调整时间
  137. Calendar calendar = Calendar.getInstance();
  138. Date date = new Date();
  139. calendar.setTime(date);
  140. //延迟一个小时
  141. calendar.add(Calendar.HOUR_OF_DAY, -1);
  142. calendar.set(Calendar.MINUTE, 0);
  143. calendar.set(Calendar.SECOND, 0);
  144. calendar.set(Calendar.MILLISECOND, 0);
  145. Date startTime = calendar.getTime();
  146. String start = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  147. //拉长时间到59分59秒
  148. calendar.set(Calendar.MINUTE, 59);
  149. calendar.set(Calendar.SECOND, 59);
  150. calendar.set(Calendar.MILLISECOND, 999);
  151. Date endTime = calendar.getTime();
  152. String end = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  153. logger.info("同步scada历史数据到24小时表:正在进行查询历史数据");
  154. //查询历史数据
  155. // List<Scada> scadas = getScadasDatas(code, startTime, endTime);
  156. List<String> codes = getCodes(null);
  157. // 获取对应时间范围内数据参数
  158. List<StatisticsScadaCalVo> scadaLists = scadaMapper.selectHistoryMaxAndMinByScadaTime(codes, startTime, endTime, 0);
  159. // 删除旧数据
  160. statisticsScadaService.delete(0, codes, startTime, endTime);
  161. //保存数据
  162. saveStatisticsCode(scadaLists, FlowConstant.INTEGER_ZERO, true);
  163. logger.info("同步scada历史数据到24小时表结束");
  164. }
  165. public void insert24H(String code, Date startTime, Date endTime, Integer type) {
  166. logger.info("同步scada历史数据到24小时表:正在进行时间调整");
  167. List<String> codes = getCodes(code);
  168. // 获取对应时间范围内数据参数
  169. List<StatisticsScadaCalVo> scadaLists = scadaMapper.selectHistoryMaxAndMinByScadaTime(codes, startTime, endTime, type);
  170. // 删除旧数据
  171. statisticsScadaService.delete(0, codes, startTime, endTime);
  172. //保存数据
  173. saveStatisticsCode(scadaLists, FlowConstant.INTEGER_ZERO, false);
  174. }
  175. private Date getDateType(String timeStr, Integer type) {
  176. if (0 == type) {
  177. String s = timeStr + ":00:00";
  178. LocalDateTime localDateTime = TimeUtils.fromString2LocalDateTime(s, "yyyy-MM-dd HH:mm:ss");
  179. return TimeUtils.convertLDTToDate(localDateTime);
  180. } else if (1 == type) {
  181. return TimeUtils.localDateToDate(TimeUtils.fromString2LocalDate(timeStr, "yyyy-MM-dd"));
  182. } else {
  183. String s = timeStr + "-01";
  184. return TimeUtils.localDateToDate(TimeUtils.fromString2LocalDate(s, "yyyy-MM-dd"));
  185. }
  186. }
  187. /**
  188. * 同步每天数据,每天凌晨一点半同步昨天的数据
  189. */
  190. @Scheduled(cron = "0 30 1 * * ? ")
  191. public void syncDaysData() {
  192. logger.info("同步scada历史数据到表:正在进行时间调整");
  193. //整个日历调整时间
  194. Calendar calendar = Calendar.getInstance();
  195. Date date = new Date();
  196. calendar.setTime(date);
  197. //延迟一天
  198. calendar.add(Calendar.DAY_OF_MONTH, -1);
  199. calendar.set(Calendar.HOUR_OF_DAY, 0);
  200. calendar.set(Calendar.MINUTE, 0);
  201. calendar.set(Calendar.SECOND, 0);
  202. calendar.set(Calendar.MILLISECOND, 0);
  203. Date startTime = calendar.getTime();
  204. String start = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  205. //拉长时间到59分59秒
  206. calendar.set(Calendar.HOUR_OF_DAY, 23);
  207. calendar.set(Calendar.MINUTE, 59);
  208. calendar.set(Calendar.SECOND, 59);
  209. calendar.set(Calendar.MILLISECOND, 999);
  210. Date endTime = calendar.getTime();
  211. String end = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  212. logger.info("同步scada历史数据到24小时表:正在进行查询历史数据");
  213. //查询历史数据
  214. // List<Scada> scadas = getScadasDatas(code, startTime, endTime);
  215. List<String> codes = getCodes(null);
  216. // 获取对应时间范围内数据参数
  217. List<StatisticsScadaCalVo> scadaLists = scadaMapper.selectHistoryMaxAndMinByScadaTime(codes, startTime, endTime, 1);
  218. // 删除旧数据
  219. statisticsScadaService.delete(1, codes, startTime, endTime);
  220. saveStatisticsCode(scadaLists, FlowConstant.INTEGER_ONE, true);
  221. }
  222. @Async
  223. public void syncDaysData(String code, Date startTime, Date endTime, Integer type) {
  224. logger.info("同步scada历史数据到表:正在进行时间调整");
  225. List<String> codes = getCodes(code);
  226. // 获取对应时间范围内数据参数
  227. List<StatisticsScadaCalVo> scadaLists = scadaMapper.selectHistoryMaxAndMinByScadaTime(codes, startTime, endTime, type);
  228. // 删除旧数据
  229. statisticsScadaService.delete(1, codes, startTime, endTime);
  230. saveStatisticsCode(scadaLists, FlowConstant.INTEGER_ONE, true);
  231. }
  232. /**
  233. * 每月1号凌晨1点同步更新历史数据与上个月表数据,支持自定义时间添加
  234. */
  235. @Scheduled(cron = "0 0 1 1 * ? ")
  236. public void syncDataIntoMonths() {
  237. logger.info("同步scada历史数据到月份表:正在进行时间调整");
  238. //整个日历调整时间
  239. Calendar calendar = Calendar.getInstance();
  240. calendar.setTime(new Date());
  241. //倒退一个月开始时间
  242. calendar.add(Calendar.MONTH, -1);
  243. calendar.set(Calendar.DATE, 1);
  244. calendar.set(Calendar.HOUR_OF_DAY, 0);
  245. calendar.set(Calendar.MINUTE, 0);
  246. calendar.set(Calendar.SECOND, 0);
  247. calendar.set(Calendar.MILLISECOND, 0);
  248. Date startTime = calendar.getTime();
  249. String start = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  250. //倒退一个月最后一天时间
  251. calendar.set(Calendar.DATE, calendar.getActualMaximum(Calendar.DATE));
  252. calendar.set(Calendar.HOUR_OF_DAY, 23);
  253. calendar.set(Calendar.MINUTE, 59);
  254. calendar.set(Calendar.SECOND, 59);
  255. calendar.set(Calendar.MILLISECOND, 999);
  256. Date endTime = calendar.getTime();
  257. String end = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  258. logger.info("同步scada历史数据到月份表:正在进行历史数据查询....");
  259. //查询历史数据
  260. // List<Scada> scadas = getScadasDatas(code, startTime, endTime);
  261. List<String> codes = getCodes(null);
  262. // 获取对应时间范围内数据参数
  263. List<StatisticsScadaCalVo> scadaLists = scadaMapper.selectHistoryMaxAndMinByScadaTime(codes, startTime, endTime, 2);
  264. // 删除旧数据
  265. statisticsScadaService.delete(2, codes, startTime, endTime);
  266. saveStatisticsCode(scadaLists, FlowConstant.INTEGER_TWO, true);
  267. }
  268. @Async
  269. public void syncDataIntoMonths(String code, Date startTime, Date endTime, Integer type) {
  270. logger.info("同步scada历史数据到月份表:正在进行时间调整");
  271. List<String> codes = getCodes(code);
  272. // 获取对应时间范围内数据参数
  273. List<StatisticsScadaCalVo> scadaLists = scadaMapper.selectHistoryMaxAndMinByScadaTime(codes, startTime, endTime, 2);
  274. // 删除旧数据
  275. statisticsScadaService.delete(2, codes, startTime, endTime);
  276. saveStatisticsCode(scadaLists, FlowConstant.INTEGER_TWO, true);
  277. }
  278. /**
  279. * 普通进程跑
  280. *
  281. * @param startTime
  282. * @param endTime
  283. * @param codes
  284. * @param DateType
  285. */
  286. public void saveStatisticsByCode(Date startTime, Date endTime, List<String> codes, Integer DateType) {
  287. // if (!CollectionUtils.isEmpty(codes)) {
  288. // logger.info("同步scada历史数据到表:正在进行数据分组");
  289. // //有数据就分组
  290. // logger.info("同步scada历史数据到表:正在进行分组循环计算");
  291. // //分组循环计算添加
  292. // for (String code : codes) {
  293. // //当前线程名称
  294. // String name = Thread.currentThread().getName();
  295. // logger.info("当前线程:{}----统计添加数据:{}中。。。。", name, code);
  296. // QueryWrapper<Scada> scadaQueryWrapper = new QueryWrapper<>();
  297. // scadaQueryWrapper.between("SCADA_TIME", startTime, endTime)
  298. // .in("CODE", code);
  299. // List<Scada> scadaList = scadaMapper.selectList(scadaQueryWrapper);
  300. // if (CollectionUtils.isEmpty(scadaList)) {
  301. // return;
  302. // }
  303. //
  304. // String defaultValue = "0";
  305. //
  306. // BigDecimal sum = BigDecimal.ZERO;
  307. // BigDecimal average = BigDecimal.ZERO;
  308. // BigDecimal maxVal = new BigDecimal(scadaList.get(0).getValue() == null ? "0" : scadaList.get(0).getValue());
  309. // BigDecimal minVal = new BigDecimal(scadaList.get(0).getValue() == null ? "0" : scadaList.get(0).getValue());
  310. //
  311. // for (String s : llj) {
  312. // if (scadaList.get(0).getCode().contains(s)) {
  313. // Iterator<Scada> iterator = scadaList.iterator();
  314. // while (iterator.hasNext()) {
  315. // Scada next = iterator.next();
  316. // for (String s1 : llj) {
  317. // if (next.getCode().contains(s1) && !next.getValue().equals(defaultValue)) {
  318. // maxVal = new BigDecimal(next.getValue());
  319. // minVal = maxVal;
  320. // break;
  321. // }
  322. // }
  323. // }
  324. // }
  325. // }
  326. //
  327. //
  328. // for (Scada scada : scadaList) {
  329. // for (String s : llj) {
  330. // if (scada.getCode().contains(s) && scada.getValue().equals(defaultValue)) {
  331. // continue;
  332. // }
  333. // }
  334. // BigDecimal decimal = new BigDecimal(scada.getValue());
  335. // sum = decimal.add(sum);
  336. // if (decimal.compareTo(maxVal) > 0) {
  337. // maxVal = decimal;
  338. // }
  339. // if (decimal.compareTo(minVal) < 0) {
  340. // minVal = decimal;
  341. // }
  342. // }
  343. // average = sum.divide(BigDecimal.valueOf(scadaList.size()), RoundingMode.HALF_UP);
  344. //
  345. // StatisticsScada statisticsScada = new StatisticsScada();
  346. // statisticsScada.setCode(scadaList.get(0).getCode());
  347. // statisticsScada.setValue(scadaList.get(scadaList.size() - 1).getValue());
  348. // statisticsScada.setAve(average.toString());
  349. // statisticsScada.setMax(maxVal.toString());
  350. // statisticsScada.setMin(minVal.toString());
  351. // statisticsScada.setFold(sum.toString());
  352. // statisticsScada.setStartTime(startTime);
  353. // statisticsScada.setEndTime(endTime);
  354. // statisticsScada.setStatisticsType(DateType);
  355. //
  356. // //查询指标报警上下限计算合格率相关
  357. // QueryWrapper<Allocation> allocationQueryWrapper = new QueryWrapper<>();
  358. // allocationQueryWrapper.eq("VARIABLE_CODE", scadaList.get(0).getCode());
  359. // List<Allocation> allocations = allocationMapper.selectList(allocationQueryWrapper);
  360. // if (!CollectionUtils.isEmpty(allocations) && StringUtils.hasText(allocations.get(0).getType())) {
  361. // Set<Long> reportIdSets = getReportIdSets(allocations);
  362. // if (!CollectionUtils.isEmpty(reportIdSets)) {
  363. // logger.info("计算指标报警上下限计算合格率相关....");
  364. // QueryWrapper<ScadaReport> scadaReportQueryWrapper = new QueryWrapper<>();
  365. // scadaReportQueryWrapper.in("ID", reportIdSets);
  366. // List<ScadaReport> scadaReports = reportMapper.selectList(scadaReportQueryWrapper);
  367. // if (!CollectionUtils.isEmpty(scadaReports)) {
  368. // //超标次数
  369. // int errorCount = 0;
  370. // //不合格超标时长
  371. // long errorTimeTotal = 0;
  372. // //超标总时长
  373. // long timeTotal = (scadaList.get(scadaList.size() - 1).getScadaTime().getTime()
  374. // - scadaList.get(0).getScadaTime().getTime()) / 1000;
  375. // //错误时间
  376. // Date errorTime = null;
  377. // for (Scada scada : scadaList) {
  378. // for (String s : llj) {
  379. // if (scada.getCode().contains(s) && scada.getCode().equals("0")) {
  380. // continue;
  381. // }
  382. // }
  383. // for (ScadaReport report : scadaReports) {
  384. // if (Double.parseDouble(scada.getValue()) < Double.parseDouble(report.getReportLower())
  385. // || Double.parseDouble(scada.getValue()) > Double.parseDouble(report.getReportUpper())) {
  386. // errorCount = errorCount + 1;
  387. // errorTime = scada.getScadaTime();
  388. // break;
  389. // } else if (errorTime != null) {
  390. // errorTimeTotal = errorTimeTotal +
  391. // (scada.getScadaTime().getTime() - errorTime.getTime()) / 1000;
  392. // }
  393. // }
  394. // }
  395. // //合格率
  396. // float pass = 0;
  397. // if (timeTotal != 0) {
  398. // BigDecimal num1 = new BigDecimal(String.valueOf(errorTimeTotal));
  399. // BigDecimal num2 = new BigDecimal(String.valueOf(timeTotal));
  400. // pass = num1.divide(num2, RoundingMode.HALF_UP).floatValue();
  401. // }
  402. //
  403. // statisticsScada.setErrorCount(String.valueOf(errorCount));
  404. // statisticsScada.setErrorTimeTotal(String.valueOf(errorTimeTotal == 0 ? timeTotal : errorTimeTotal));
  405. // statisticsScada.setPass(String.valueOf(pass));
  406. // statisticsScada.setTimeTotal(String.valueOf(timeTotal));
  407. // }
  408. // }
  409. // statisticsScada.setDataType(String.valueOf(allocations.get(0).getType()));
  410. // }
  411. //
  412. // //删除旧数据
  413. //
  414. // int delete = statisticsScadaService.delete(DateType, statisticsScada.getCode(),
  415. // DateUtil.format(statisticsScada.getStartTime(), "yyyy-MM-dd HH:mm:ss"),
  416. // DateUtil.format(statisticsScada.getEndTime(), "yyyy-MM-dd HH:mm:ss"));
  417. // logger.info("成功删除{}条旧数据", delete);
  418. // //添加数据
  419. // logger.info("统计添加数据.....:{}", statisticsScada);
  420. // try {
  421. // statisticsScadaService.save(statisticsScada);
  422. // logger.info("统计添加数据完成:{}", statisticsScada);
  423. // } catch (Exception e) {
  424. // e.printStackTrace();
  425. // log.error("统计数据添加出现错误----数据:{}---错误:{}", statisticsScada, e.getMessage());
  426. // }
  427. // logger.info("统计添加数据完成:{}", statisticsScada);
  428. // }
  429. // } else {
  430. // logger.info("获取历史数据为空");
  431. // }
  432. }
  433. /**
  434. * 设置统计数据
  435. *
  436. * @param startTime 开始时间
  437. * @param endTime 结束时间
  438. * @param scadas 数据集合
  439. */
  440. @Async
  441. public void saveStatisticsData(Date startTime, Date endTime, List<Scada> scadas, Integer DateType) {
  442. // if (!CollectionUtils.isEmpty(scadas)) {
  443. // logger.info("同步scada历史数据到表:正在进行数据分组");
  444. // //有数据就分组
  445. // Map<String, List<Scada>> listMap = scadas.stream().filter(scada -> scada.getCode() != null && scada.getValue() != null)
  446. // .collect(Collectors.groupingBy(Scada::getCode));
  447. // logger.info("同步scada历史数据到表:正在进行分组循环计算");
  448. // ExecutorService executorService = Executors.newFixedThreadPool(listMap.entrySet().size());
  449. // CountDownLatch countDownLatch = new CountDownLatch(listMap.entrySet().size());
  450. // //分组循环计算添加
  451. // for (Map.Entry<String, List<Scada>> entity : listMap.entrySet()) {
  452. // //开启多线程
  453. // executorService.submit(new Runnable() {
  454. // @Override
  455. // public void run() {
  456. // //当前线程名称
  457. // String name = Thread.currentThread().getName();
  458. // logger.info("当前线程:{}----统计添加数据中。。。。", name);
  459. // List<Scada> scadaList = entity.getValue();
  460. //
  461. //// double average = scadaList.stream().mapToLong(value -> Long.parseLong(value.getValue())).average().getAsDouble();
  462. //// double max = scadaList.stream().mapToLong(value -> Long.parseLong(value.getValue())).max().getAsLong();
  463. //// double min = scadaList.stream().mapToLong(value -> Long.parseLong(value.getValue())).min().getAsLong();
  464. //// double sum = scadaList.stream().mapToLong(value -> Long.parseLong(value.getValue())).sum();
  465. // BigDecimal sum = BigDecimal.ZERO;
  466. // BigDecimal average = BigDecimal.ZERO;
  467. // BigDecimal maxVal = new BigDecimal(scadaList.get(0).getValue());
  468. // BigDecimal minVal = new BigDecimal(scadaList.get(0).getValue());
  469. // for (Scada scada : scadaList) {
  470. // BigDecimal decimal = new BigDecimal(scada.getValue());
  471. // sum = decimal.add(sum);
  472. // if (decimal.compareTo(maxVal) > 0) {
  473. // maxVal = decimal;
  474. // }
  475. // if (decimal.compareTo(minVal) < 0) {
  476. // minVal = decimal;
  477. // }
  478. // }
  479. // average = sum.divide(BigDecimal.valueOf(scadaList.size()), RoundingMode.HALF_UP);
  480. //
  481. // StatisticsScada statisticsScada = new StatisticsScada();
  482. // statisticsScada.setCode(scadaList.get(0).getCode());
  483. // statisticsScada.setValue(scadaList.get(scadaList.size() - 1).getValue());
  484. // statisticsScada.setAve(average.toString());
  485. // statisticsScada.setMax(maxVal.toString());
  486. // statisticsScada.setMin(minVal.toString());
  487. // statisticsScada.setFold(sum.toString());
  488. // statisticsScada.setStartTime(startTime);
  489. // statisticsScada.setEndTime(endTime);
  490. // statisticsScada.setStatisticsType(DateType);
  491. //
  492. // //查询指标报警上下限计算合格率相关
  493. // QueryWrapper<Allocation> allocationQueryWrapper = new QueryWrapper<>();
  494. // allocationQueryWrapper.eq("VARIABLE_CODE", scadaList.get(0).getCode());
  495. // List<Allocation> allocations = allocationMapper.selectList(allocationQueryWrapper);
  496. // if (!CollectionUtils.isEmpty(allocations) && StringUtils.hasText(allocations.get(0).getType())) {
  497. // Set<Long> reportIdSets = getReportIdSets(allocations);
  498. // if (!CollectionUtils.isEmpty(reportIdSets)) {
  499. // logger.info("计算指标报警上下限计算合格率相关....");
  500. // QueryWrapper<ScadaReport> scadaReportQueryWrapper = new QueryWrapper<>();
  501. // scadaReportQueryWrapper.in("ID", reportIdSets);
  502. // List<ScadaReport> scadaReports = reportMapper.selectList(scadaReportQueryWrapper);
  503. // if (!CollectionUtils.isEmpty(scadaReports)) {
  504. // //超标次数
  505. // int errorCount = 0;
  506. // //不合格超标时长
  507. // long errorTimeTotal = 0;
  508. // //超标总时长
  509. // long timeTotal = (scadaList.get(scadaList.size() - 1).getScadaTime().getTime()
  510. // - scadaList.get(0).getScadaTime().getTime()) / 1000;
  511. // //错误时间
  512. // Date errorTime = null;
  513. // for (Scada scada : scadaList) {
  514. // for (ScadaReport report : scadaReports) {
  515. // if (Double.parseDouble(scada.getValue()) < Double.parseDouble(report.getReportLower())
  516. // || Double.parseDouble(scada.getValue()) > Double.parseDouble(report.getReportUpper())) {
  517. // errorCount = errorCount + 1;
  518. // errorTime = scada.getScadaTime();
  519. // break;
  520. // } else if (errorTime != null) {
  521. // errorTimeTotal = errorTimeTotal +
  522. // (scada.getScadaTime().getTime() - errorTime.getTime()) / 1000;
  523. // }
  524. // }
  525. // }
  526. // //合格率
  527. // float pass = 0;
  528. // if (timeTotal != 0) {
  529. // BigDecimal num1 = new BigDecimal(String.valueOf(errorTimeTotal));
  530. // BigDecimal num2 = new BigDecimal(String.valueOf(timeTotal));
  531. // pass = num1.divide(num2, RoundingMode.HALF_UP).floatValue();
  532. // }
  533. //
  534. // statisticsScada.setErrorCount(String.valueOf(errorCount));
  535. // statisticsScada.setErrorTimeTotal(String.valueOf(errorTimeTotal == 0 ? timeTotal : errorTimeTotal));
  536. // statisticsScada.setPass(String.valueOf(pass));
  537. // statisticsScada.setTimeTotal(String.valueOf(timeTotal));
  538. // }
  539. // }
  540. // statisticsScada.setDataType(String.valueOf(allocations.get(0).getType()));
  541. // }
  542. //
  543. // //删除旧数据
  544. //
  545. // int delete = statisticsScadaService.delete(DateType, statisticsScada.getCode(),
  546. // DateUtil.format(statisticsScada.getStartTime(), "yyyy-MM-dd HH:mm:ss"),
  547. // DateUtil.format(statisticsScada.getEndTime(), "yyyy-MM-dd HH:mm:ss"));
  548. // logger.info("成功删除{}条旧数据", delete);
  549. // //添加数据
  550. // logger.info("统计添加数据完成:{}", statisticsScada);
  551. // try {
  552. // statisticsScadaService.save(statisticsScada);
  553. // } catch (Exception e) {
  554. // e.printStackTrace();
  555. // log.error("统计数据添加出现错误----数据:{}---错误:{}", statisticsScada, e.getMessage());
  556. // }
  557. // logger.info("统计添加数据完成:{}", statisticsScada);
  558. // // 闭锁
  559. // countDownLatch.countDown();
  560. // }
  561. // });
  562. // }
  563. // } else {
  564. // logger.info("获取历史数据为空");
  565. // }
  566. }
  567. /**
  568. * 截取数组长度批量插入
  569. *
  570. * @param scadas 数据集合
  571. * @param tableName 表名
  572. * @return 成功条数
  573. */
  574. public int insertMouthBatch(List<Scada> scadas, String tableName) {
  575. int size = scadas.size();
  576. int limit = 1000;
  577. int startIndex = 0;
  578. int endIndex;
  579. if (size <= limit) {
  580. //添加新数据
  581. int insertMouth = scadaMapper.insertMouth(scadas, tableName);
  582. log.info("新增数据:{}条", insertMouth);
  583. } else {
  584. while (startIndex < size) {
  585. //结束索引范围
  586. endIndex = startIndex + limit;
  587. //最后索引不能超过本身长度
  588. endIndex = endIndex > size ? size : endIndex;
  589. //截取数据
  590. List<Scada> scadaList = scadas.subList(startIndex, endIndex);
  591. //下次索引位置
  592. startIndex = endIndex;
  593. //添加新数据
  594. int insertMouth = scadaMapper.insertMouth(scadaList, tableName);
  595. log.info("新增数据:{}条", insertMouth);
  596. }
  597. }
  598. return size;
  599. }
  600. /**
  601. * 分code进行多线程存储
  602. *
  603. * @param dateType 统计类型
  604. * @param isAuto 是否为自动定时任务,true定时任务,false不是定时任务
  605. */
  606. @Transactional(rollbackFor = Exception.class)
  607. public void saveStatisticsCode(List<StatisticsScadaCalVo> scadaLists, Integer dateType, boolean isAuto) {
  608. if (!CollectionUtils.isEmpty(scadaLists)) {
  609. logger.info("计算scada统计数据:正在进行数据大小{}", scadaLists.size());
  610. // 存储新增统计数据
  611. List<StatisticsScada> saveList = new ArrayList<>();
  612. if (CollectionUtils.isEmpty(scadaLists)) {
  613. return;
  614. }
  615. scadaLists.forEach(k -> {
  616. StatisticsScada statisticsScada = new StatisticsScada();
  617. statisticsScada.setCode(k.getCode());
  618. String maxValue = k.getMaxValue() == null ? "0" : k.getMaxValue();
  619. statisticsScada.setValue(maxValue);
  620. statisticsScada.setAve(k.getAvgValue());
  621. statisticsScada.setMin(k.getMinValue() == null ? "0" : k.getMinValue());
  622. statisticsScada.setMax(maxValue);
  623. statisticsScada.setFold(k.getDifferenceValue());
  624. List<Date> dates = getDateListByType(getDateType(k.getDateTimeStr(), dateType), dateType);
  625. if (!CollectionUtils.isEmpty(dates) && dates.size() == 2) {
  626. statisticsScada.setStartTime(dates.get(0));
  627. statisticsScada.setEndTime(dates.get(1));
  628. }
  629. statisticsScada.setStatisticsType(dateType);
  630. statisticsScada.setDataType(k.getAllocationType());
  631. //添加数据
  632. saveList.add(statisticsScada);
  633. });
  634. // 新增
  635. logger.info("新增scada数据大小{}", saveList.size());
  636. statisticsScadaService.saveBatch(saveList);
  637. } else {
  638. logger.info("获取历史数据为空");
  639. }
  640. }
  641. /**
  642. * 获取时间的小时,天,月份的开始时间和结束时间
  643. *
  644. * @param time 时间
  645. * @param type 类型
  646. * @return
  647. */
  648. public List<Date> getDateListByType(Date time, Integer type) {
  649. //整个日历调整时间
  650. Calendar calendar = Calendar.getInstance();
  651. calendar.setTime(time == null ? new Date() : time);
  652. List<Date> dates = new ArrayList<>();
  653. if (type.equals(FlowConstant.INTEGER_ZERO)) {
  654. //一个小时
  655. calendar.set(Calendar.MINUTE, 0);
  656. calendar.set(Calendar.SECOND, 0);
  657. calendar.set(Calendar.MILLISECOND, 0);
  658. Date startTime = calendar.getTime();
  659. String start = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  660. //拉长时间到59分59秒
  661. calendar.set(Calendar.MINUTE, 59);
  662. calendar.set(Calendar.SECOND, 59);
  663. calendar.set(Calendar.MILLISECOND, 999);
  664. Date endTime = calendar.getTime();
  665. String end = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  666. dates.add(startTime);
  667. dates.add(endTime);
  668. } else if (type.equals(FlowConstant.INTEGER_ONE)) {
  669. //一天
  670. calendar.set(Calendar.HOUR_OF_DAY, 0);
  671. calendar.set(Calendar.MINUTE, 0);
  672. calendar.set(Calendar.SECOND, 0);
  673. calendar.set(Calendar.MILLISECOND, 0);
  674. Date startTime = calendar.getTime();
  675. String start = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  676. //拉长时间到59分59秒
  677. calendar.set(Calendar.HOUR_OF_DAY, 23);
  678. calendar.set(Calendar.MINUTE, 59);
  679. calendar.set(Calendar.SECOND, 59);
  680. calendar.set(Calendar.MILLISECOND, 999);
  681. Date endTime = calendar.getTime();
  682. String end = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  683. dates.add(startTime);
  684. dates.add(endTime);
  685. } else if (type.equals(FlowConstant.INTEGER_TWO)) {
  686. //一个月
  687. calendar.set(Calendar.DATE, 1);
  688. calendar.set(Calendar.HOUR_OF_DAY, 0);
  689. calendar.set(Calendar.MINUTE, 0);
  690. calendar.set(Calendar.SECOND, 0);
  691. calendar.set(Calendar.MILLISECOND, 0);
  692. Date startTime = calendar.getTime();
  693. String start = DateUtil.format(calendar.getTime(), "yyyy-MM-dd HH:mm:ss");
  694. //倒退一个月最后一天时间
  695. calendar.set(Calendar.DATE, calendar.getActualMaximum(Calendar.DATE));
  696. calendar.set(Calendar.HOUR_OF_DAY, 23);
  697. calendar.set(Calendar.MINUTE, 59);
  698. calendar.set(Calendar.SECOND, 59);
  699. calendar.set(Calendar.MILLISECOND, 999);
  700. Date endTime = calendar.getTime();
  701. dates.add(startTime);
  702. dates.add(endTime);
  703. }
  704. return dates;
  705. }
  706. /**
  707. * 获取历史数据
  708. *
  709. * @param start 开始时间
  710. * @param end 结束时间
  711. * @return 历史数据
  712. */
  713. public List<Scada> getScadasDatas(String code, Date start, Date end) {
  714. List<Allocation> allocations = allocationMapper.selectList(Wrappers.<Allocation>lambdaQuery()
  715. .eq(StringUtils.hasText(code), Allocation::getVariableCode, code));
  716. List<String> codes = allocations.stream().filter(allocation -> allocation.getVariableCode() != null)
  717. .map(Allocation::getVariableCode).collect(Collectors.toList());
  718. List<Scada> scadas = new ArrayList<>();
  719. if (!CollectionUtils.isEmpty(codes)) {
  720. QueryWrapper<Scada> scadaQueryWrapper = new QueryWrapper<>();
  721. scadaQueryWrapper.between("SCADA_TIME", start, end)
  722. .in("CODE", codes);
  723. scadas = scadaMapper.selectList(scadaQueryWrapper);
  724. }
  725. return scadas;
  726. }
  727. public List<String> getCodes(String code) {
  728. if (StringUtils.hasText(code) && code.contains(",")) {
  729. String[] codeList = code.split(",");
  730. List<Allocation> allocations = allocationMapper.selectList(Wrappers.<Allocation>lambdaQuery()
  731. .in(!CollectionUtils.isEmpty(Arrays.asList(codeList)), Allocation::getVariableCode, codeList));
  732. List<String> codes = allocations.stream().filter(allocation -> allocation.getVariableCode() != null)
  733. .map(Allocation::getVariableCode).collect(Collectors.toList());
  734. if (!CollectionUtils.isEmpty(codes)) {
  735. return codes;
  736. }
  737. }
  738. List<Allocation> allocations = allocationMapper.selectList(Wrappers.<Allocation>lambdaQuery()
  739. .eq(StringUtils.hasText(code), Allocation::getVariableCode, code));
  740. List<String> codes = allocations.stream().filter(allocation -> allocation.getVariableCode() != null)
  741. .map(Allocation::getVariableCode).collect(Collectors.toList());
  742. if (!CollectionUtils.isEmpty(codes)) {
  743. return codes;
  744. }
  745. return null;
  746. }
  747. @Async
  748. public void insertMouth(List<Scada> scadas, String tableName) {
  749. int size = scadas.size();
  750. int i = 0;
  751. while (size > 600) {
  752. scadaMapper.insertMouth(scadas.subList(i, i + 600), tableName);
  753. i = i + 600;
  754. size = size - 600;
  755. }
  756. if (size > 0) {
  757. scadaMapper.insertMouth(scadas.subList(i, i + size), tableName);
  758. }
  759. }
  760. @Override
  761. public List<Scada> getListData() {
  762. List<Scada> scadaList = scadaMapper.selectMonitoring("SCADA_MONITOR");
  763. for (Scada scada : scadaList) {
  764. if (StringUtils.hasText(scada.getValue()) && scada.getValue().startsWith(".")) {
  765. scada.setValue("0" + scada.getValue());
  766. }
  767. }
  768. return scadaList;
  769. }
  770. @Override
  771. public boolean fetch24HoursData(String syncTime) {
  772. Date date = null;
  773. if (StringUtils.isEmpty(syncTime)) {
  774. date = new Date();
  775. Calendar cal = Calendar.getInstance();
  776. cal.setTime(date);
  777. cal.set(Calendar.MINUTE, 0);
  778. cal.set(Calendar.SECOND, 0);
  779. } else {
  780. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
  781. try {
  782. Date parse = format.parse(syncTime);
  783. Date now = new Date();
  784. Calendar cal = Calendar.getInstance();
  785. cal.setTime(now);
  786. cal.set(Calendar.MINUTE, 0);
  787. cal.set(Calendar.SECOND, 0);
  788. if (parse.getTime() >= now.getTime()) {
  789. throw new RuntimeException("时间参数不允许大于等于当前时间:" + format.format(now));
  790. }
  791. date = parse;
  792. } catch (ParseException e) {
  793. throw new RuntimeException("时间参数格式传入错误:" + e);
  794. }
  795. }
  796. // insert24H(null, date);
  797. return true;
  798. }
  799. @Override
  800. public void fetchMonthsData(String syncTime) {
  801. // Date date = null;
  802. // if (StringUtils.isEmpty(syncTime)) {
  803. // date = new Date();
  804. // Calendar cal = Calendar.getInstance();
  805. // cal.setTime(date);
  806. // cal.set(Calendar.MINUTE, 0);
  807. // cal.set(Calendar.SECOND, 0);
  808. // } else {
  809. // SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
  810. // try {
  811. // Date parse = format.parse(syncTime);
  812. // Date now = new Date();
  813. // Calendar cal = Calendar.getInstance();
  814. // cal.setTime(now);
  815. // if (parse.getTime() > now.getTime()) {
  816. // throw new RuntimeException("时间参数不允许大于当前时间:" + format.format(now));
  817. // }
  818. // date = parse;
  819. // } catch (ParseException e) {
  820. // throw new RuntimeException("时间参数格式传入错误:" + e);
  821. // }
  822. // }
  823. // syncDataIntoMonths(null, date);
  824. }
  825. @Override
  826. public void fetchDaysData(String syncTime) {
  827. // Date date = null;
  828. // if (StringUtils.isEmpty(syncTime)) {
  829. // date = new Date();
  830. // Calendar cal = Calendar.getInstance();
  831. // cal.setTime(date);
  832. // cal.set(Calendar.MINUTE, 0);
  833. // cal.set(Calendar.SECOND, 0);
  834. // } else {
  835. // SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
  836. // try {
  837. // Date parse = format.parse(syncTime);
  838. // Date now = new Date();
  839. // Calendar cal = Calendar.getInstance();
  840. // cal.setTime(now);
  841. // if (parse.getTime() > now.getTime()) {
  842. // throw new RuntimeException("时间参数不允许大于当前时间:" + format.format(now));
  843. // }
  844. // date = parse;
  845. // } catch (ParseException e) {
  846. // throw new RuntimeException("时间参数格式传入错误:" + e);
  847. // }
  848. // }
  849. // syncDaysData(null, date);
  850. }
  851. @Override
  852. public Page<ScadaVo> get24HoursData(Page page, ScadaQuery scadaQuery) {
  853. QueryWrapper<ScadaQuery> queryWrapper = new QueryWrapper<>();
  854. if (StringUtils.hasText(scadaQuery.getCode())) {
  855. queryWrapper.like("hours.CODE", scadaQuery.getCode());
  856. }
  857. if (StringUtils.hasText(scadaQuery.getValue())) {
  858. queryWrapper.like("hours.VALUE", scadaQuery.getValue());
  859. }
  860. if (scadaQuery.getStart() != null && scadaQuery.getEnd() != null) {
  861. queryWrapper.between("hours.SCADA_TIME", scadaQuery.getStart(), scadaQuery.getEnd());
  862. }
  863. queryWrapper.orderByDesc("hours.SCADA_TIME");
  864. return scadaMapper.pageList(page, queryWrapper, "SCADA_HOURS");
  865. }
  866. @Override
  867. public Page getMonthsData(Page page, ScadaQuery scadaQuery) {
  868. QueryWrapper<ScadaQuery> queryWrapper = new QueryWrapper<>();
  869. queryWrapper.orderByDesc("hours.SCADA_TIME");
  870. if (StringUtils.hasText(scadaQuery.getCode())) {
  871. queryWrapper.like("hours.CODE", scadaQuery.getCode());
  872. }
  873. if (StringUtils.hasText(scadaQuery.getValue())) {
  874. queryWrapper.like("hours.VALUE", scadaQuery.getValue());
  875. }
  876. if (scadaQuery.getStart() != null && scadaQuery.getEnd() != null) {
  877. if (scadaQuery.getStart().getYear() == scadaQuery.getEnd().getYear() &&
  878. scadaQuery.getStart().getMonth() == scadaQuery.getEnd().getMonth()) {
  879. List<Date> startAndEndByDate = TimeUtils.getMonthsToStartAndEndByDate(scadaQuery.getStart());
  880. queryWrapper.between("hours.SCADA_TIME", scadaQuery.getStart(), scadaQuery.getEnd());
  881. Calendar cal = Calendar.getInstance();
  882. cal.setTime(startAndEndByDate.get(0));
  883. SimpleDateFormat format = new SimpleDateFormat("yyyy_MM");
  884. String date = format.format(cal.getTime());
  885. String tableName = "SCADA_MONTH_" + date;
  886. return scadaMapper.pageList(page, queryWrapper, tableName);
  887. }
  888. }
  889. return scadaMapper.pageList(page, queryWrapper, SCADA_HISTORY);
  890. }
  891. @Override
  892. public List<DeviceArchiveManageVo> getAppData(DeviceQuery deviceQuery) {
  893. QueryWrapper<DeviceArchiveManage> queryWrapper = getQueryWrapper(deviceQuery);
  894. List<DeviceArchiveManage> deviceArchiveManages = deviceArchiveManageMapper.pageList(queryWrapper);
  895. if (CollectionUtils.isEmpty(deviceArchiveManages)) {
  896. return null;
  897. }
  898. //查询所有指标
  899. List<Allocation> allocations = allocationMapper.selectList(new QueryWrapper<Allocation>().lambda().eq(StringUtil.isNotNull(deviceQuery.getIsDisplay()),Allocation::getIsDisplay,deviceQuery.getIsDisplay()));
  900. //查询所有报警
  901. List<ScadaReport> scadaReports = reportMapper.selectList(null);
  902. //查询最新scada数据
  903. List<Scada> scadas = getListData();
  904. List<DeviceArchiveManageVo> manageVos = deviceArchiveManages.stream().map(deviceArchiveManage -> {
  905. DeviceArchiveManageVo vo = new DeviceArchiveManageVo();
  906. BeanUtils.copyProperties(deviceArchiveManage, vo);
  907. //查询绑定的指标
  908. if (!CollectionUtils.isEmpty(allocations)) {
  909. //获取符合设备绑定的指标
  910. List<AllocationVo> allocationVos = new HashSet<>(allocations).stream().filter(allocation -> allocation.getDeviceId() != null &&
  911. allocation.getDeviceId().equals(vo.getId())).map(allocation -> {
  912. AllocationVo allocationVo = new AllocationVo();
  913. BeanUtils.copyProperties(allocation, allocationVo);
  914. Scada scadaBelong = null;
  915. //查询scada数据
  916. if (!CollectionUtils.isEmpty(scadas)) {
  917. List<Scada> scadaList = scadas.stream()
  918. .filter(scada -> scada.getCode() != null && scada.getCode().equals(allocation.getVariableCode()))
  919. .collect(Collectors.toList());
  920. if (!CollectionUtils.isEmpty(scadaList)) {
  921. scadaBelong = scadaList.get(0);
  922. allocationVo.setScada(scadaList.get(0));
  923. }
  924. }
  925. //设置报警相关信息
  926. Set<Long> reportIdSets = getReportIdSets(Collections.singletonList(allocation));
  927. if (!CollectionUtils.isEmpty(scadaReports) && !CollectionUtils.isEmpty(reportIdSets)) {
  928. //筛选出报警信息列表
  929. List<ScadaReport> reports = scadaReports.stream().filter(scadaReport -> scadaReport.getId() != null
  930. && reportIdSets.contains(scadaReport.getId())).collect(Collectors.toList());
  931. //设置指标报警状态为未报警
  932. allocationVo.setIsAlarm("0");
  933. //判断指标是否报警
  934. if(scadaBelong != null && scadaBelong.getValue() != null){
  935. for (ScadaReport scadaReport : reports) {
  936. if(scadaReport.getReportLower() == null || scadaReport.getReportUpper() == null){
  937. continue;
  938. }
  939. if(!isNumber(scadaReport.getReportLower())
  940. || !isNumber(scadaReport.getReportUpper())
  941. || !isNumber(scadaBelong.getValue())){
  942. continue;
  943. }
  944. Double lower = Double.parseDouble(scadaReport.getReportLower());
  945. Double upper = Double.parseDouble(scadaReport.getReportUpper());
  946. Double value = Double.parseDouble(scadaBelong.getValue());
  947. if(value.compareTo(lower) >= 0 && value.compareTo(upper) <= 0){
  948. allocationVo.setIsAlarm("1");
  949. break;
  950. }
  951. }
  952. }
  953. allocationVo.setScadaReports(reports);
  954. }
  955. return allocationVo;
  956. }).collect(Collectors.toList());
  957. vo.setAllocations(allocationVos);
  958. }
  959. if (org.apache.commons.lang3.StringUtils.isNotBlank(deviceQuery.getDeviceTypeIds())) {
  960. List<String> deviceTypeList = Arrays.stream(deviceQuery.getDeviceTypeIds().split(",")).map(String::trim)
  961. .collect(Collectors.toList());
  962. for (String s : deviceTypeList) {
  963. if (vo.getDeviceType().equals(s)) {
  964. return vo;
  965. }
  966. }
  967. return null;
  968. }
  969. return vo;
  970. }).filter(Objects::nonNull).collect(Collectors.toList());
  971. return manageVos;
  972. }
  973. /**
  974. * 获取关联指标
  975. *
  976. * @param vo
  977. */
  978. private void getAllocation(DeviceArchiveManageVo vo) {
  979. QueryWrapper<Allocation> allocationQueryWrapper = new QueryWrapper<>();
  980. allocationQueryWrapper.eq("DEVICE_ID", vo.getId());
  981. List<Allocation> allocations = allocationMapper.selectList(allocationQueryWrapper);
  982. if (!CollectionUtils.isEmpty(allocations)) {
  983. List<AllocationVo> allocationVos = allocations.stream().map(allocation -> {
  984. AllocationVo allocationVo = new AllocationVo();
  985. BeanUtils.copyProperties(allocation, allocationVo);
  986. //查询scada数据
  987. List<Scada> dataByCode = scadaMapper.getListDataByCode(allocation.getVariableCode());
  988. if (!CollectionUtils.isEmpty(dataByCode)) {
  989. allocationVo.setScada(dataByCode.get(0));
  990. }
  991. //查询绑定的报警列表
  992. Set<Long> reportIdSets = getReportIdSets(Collections.singletonList(allocation));
  993. if (!CollectionUtils.isEmpty(reportIdSets)) {
  994. logger.info("获取指标报警相关....");
  995. QueryWrapper<ScadaReport> scadaReportQueryWrapper = new QueryWrapper<>();
  996. scadaReportQueryWrapper.in("ID", reportIdSets);
  997. List<ScadaReport> scadaReports = reportMapper.selectList(scadaReportQueryWrapper);
  998. if (!CollectionUtils.isEmpty(scadaReports)) {
  999. allocationVo.setScadaReports(scadaReports);
  1000. }
  1001. }
  1002. //返回数据
  1003. return allocationVo;
  1004. }).collect(Collectors.toList());
  1005. vo.setAllocations(allocationVos);
  1006. }
  1007. }
  1008. /**
  1009. * 设置查询条件
  1010. *
  1011. * @param deviceQuery
  1012. * @return
  1013. */
  1014. private QueryWrapper<DeviceArchiveManage> getQueryWrapper(DeviceQuery deviceQuery) {
  1015. QueryWrapper<DeviceArchiveManage> queryWrapper = new QueryWrapper<>();
  1016. if (!ObjectUtils.isEmpty(deviceQuery)) {
  1017. if (StringUtils.hasText(deviceQuery.getType())) {
  1018. queryWrapper.eq("manage.TYPE", deviceQuery.getType());
  1019. }
  1020. if (StringUtils.hasText(deviceQuery.getDeviceType())) {
  1021. queryWrapper.eq("manage.DEVICE_TYPE", deviceQuery.getDeviceType());
  1022. }
  1023. if (StringUtils.hasText(deviceQuery.getStatus())) {
  1024. queryWrapper.eq("manage.STATUS", deviceQuery.getStatus());
  1025. }
  1026. if (StringUtils.hasText(deviceQuery.getQuery())) {
  1027. queryWrapper.and(q -> {
  1028. q.like("manage.ADDRESS", deviceQuery.getQuery())
  1029. .or().like("manage.NAME", deviceQuery.getQuery())
  1030. .or().like("manage.DESCRIBE", deviceQuery.getQuery());
  1031. });
  1032. }
  1033. if (StringUtils.hasText(deviceQuery.getName())) {
  1034. queryWrapper.like("manage.NAME", deviceQuery.getName());
  1035. }
  1036. if (StringUtils.hasText(deviceQuery.getCode())) {
  1037. queryWrapper.like("allocation.VARIABLE_CODE", deviceQuery.getCode());
  1038. }
  1039. if (deviceQuery.getDeviceId() != null) {
  1040. queryWrapper.eq("manage.ID", deviceQuery.getDeviceId());
  1041. }
  1042. if (deviceQuery.getLatitude() != null && deviceQuery.getLongitude() != null) {
  1043. queryWrapper.eq("manage.LATITUDE", deviceQuery.getLatitude())
  1044. .eq("manage.LONGITUDE", deviceQuery.getLongitude());
  1045. }
  1046. }
  1047. return queryWrapper;
  1048. }
  1049. // @Override
  1050. // public void insertDataAndMouth(Scada scada) {
  1051. // if (ObjectUtils.isEmpty(scada)) {
  1052. // throw new ToflyDeniedException("数据为空");
  1053. // }
  1054. // if (scada.getId() != null && StringUtils.isEmpty(scada.getValue()) && scada.getScadaTime() == null) {
  1055. // scada = getById(scada.getId());
  1056. // }
  1057. // log.info("获取到同步scada信息数据:-----{}-----", scada.toString());
  1058. // if (!ObjectUtils.isEmpty(scada)) {
  1059. // //存储到分表月份
  1060. // String tableName = createMonths(scada.getScadaTime() == null ? new Date() : scada.getScadaTime());
  1061. // log.info("创建月份表成功:-----{}-----", tableName);
  1062. // //检测数据是否存在,不存在就插入
  1063. // if (CollectionUtils.isEmpty(scadaMapper.selectHistoryById(tableName, scada.getId()))) {
  1064. // log.info("scada数据不存在月份表中,正在执行新增数据");
  1065. // scadaMapper.insertByOne(scada, tableName);
  1066. // log.info("新增数据成功:-----{}-----", scada);
  1067. // }
  1068. // //检测报警信息
  1069. // //sendNotice(scada);
  1070. // }
  1071. // }
  1072. @Override
  1073. @Async
  1074. public void insertData(Scada scada) {
  1075. if (ObjectUtils.isEmpty(scada)) {
  1076. throw new ToflyDeniedException("数据为空");
  1077. }
  1078. if (!ObjectUtils.isEmpty(scada)) {
  1079. //存储到分表月份
  1080. // String tableName = createMonths(scada.getScadaTime() == null ? new Date() : scada.getScadaTime());
  1081. // log.info("创建月份表成功:-----{}-----", tableName);
  1082. // //检测数据是否存在,不存在就插入
  1083. // if (CollectionUtils.isEmpty(scadaMapper.selectHistoryById(tableName, scada.getId()))) {
  1084. // log.info("scada数据不存在月份表中,正在执行新增数据");
  1085. // scadaMapper.insertByOne(scada, tableName);
  1086. // log.info("新增数据成功:-----{}-----", scada);
  1087. // }
  1088. //检测报警信息
  1089. sendNotice(scada);
  1090. }
  1091. }
  1092. /**
  1093. * 获取实时表SCADA_MONITORING_REALTIME中的数据根据ID
  1094. *
  1095. * @param scada
  1096. * @return
  1097. */
  1098. private Scada getScada(Scada scada) {
  1099. if (scada != null && scada.getId() != null && StringUtils.isEmpty(scada.getValue()) && scada.getScadaTime() == null) {
  1100. List<Scada> scadas = baseMapper.selectScadaMonitoringRealtimeById("SCADA_MONITORING_REALTIME", scada.getId());
  1101. if (!CollectionUtils.isEmpty(scadas)) {
  1102. scada = scadas.get(0);
  1103. }
  1104. }
  1105. return scada;
  1106. }
  1107. /**
  1108. * 定时任务,每分钟1执行一次
  1109. */
  1110. @Scheduled(cron = "0 0/1 * * * ?")
  1111. @Async
  1112. public void timingInsertData() {
  1113. Calendar calendar = Calendar.getInstance();
  1114. calendar.setTime(new Date());
  1115. Date end = calendar.getTime();
  1116. calendar.add(Calendar.MINUTE, -5);
  1117. Date start = calendar.getTime();
  1118. // List<String> codes = getCodes(null);
  1119. // if (CollectionUtils.isEmpty(codes)) {
  1120. // return;
  1121. // }
  1122. List<Scada> scadas = scadaMapper.selectList(new QueryWrapper<Scada>().lambda()
  1123. .between(Scada::getUpdateTime, start, end));
  1124. log.info("时间{}-----{}----->查询每分钟定时任务实时数据{}", start, end, scadas);
  1125. if (!CollectionUtils.isEmpty(scadas)) {
  1126. //存储到分表月份
  1127. // String tableName = createMonths(scadas.get(0).getScadaTime() == null ? new Date() : scadas.get(0).getScadaTime());
  1128. // int i = insertMouthDataBatch(scadas, tableName);
  1129. // log.info("{}新增合计数据:{}条", tableName, i);
  1130. //监控+报警
  1131. for (Scada scada : scadas) {
  1132. // todo 暂时不使用 实时监控表
  1133. // if (scada.getValue().startsWith(".")) {
  1134. // scada.setValue("0" + scada.getValue());
  1135. // }
  1136. // Scada monitoring = scadaMapper.selectDataByMonitoring(scada.getCode());
  1137. // log.info("查询存在的实时数据{}", monitoring);
  1138. // if (ObjectUtils.isEmpty(monitoring)) {
  1139. // scadaMapper.insertMonitoring(scada);
  1140. // log.info("无实时数据,进行新增{}", scada);
  1141. // } else {
  1142. // scadaMapper.updateMonitoring(scada);
  1143. // log.info("有实时数据,进行更新{}", scada);
  1144. // }
  1145. //检测报警信息
  1146. sendNotice(scada, DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
  1147. }
  1148. }
  1149. }
  1150. /**
  1151. * 截取数组长度批量插入
  1152. *
  1153. * @param scadas 数据集合
  1154. * @param tableName 表名
  1155. * @return 成功条数
  1156. */
  1157. public int insertMouthDataBatch(List<Scada> scadas, String tableName) {
  1158. int size = scadas.size();
  1159. int limit = 1000;
  1160. int startIndex = 0;
  1161. int endIndex;
  1162. if (size <= limit) {
  1163. //添加新数据
  1164. int insertMouth = scadaMapper.insertMouth(scadas, tableName);
  1165. log.info("新增数据:{}条", insertMouth);
  1166. } else {
  1167. while (startIndex < size) {
  1168. //结束索引范围
  1169. endIndex = startIndex + limit;
  1170. //最后索引不能超过本身长度
  1171. endIndex = endIndex > size ? size : endIndex;
  1172. //截取数据
  1173. List<Scada> scadaList = scadas.subList(startIndex, endIndex);
  1174. //下次索引位置
  1175. startIndex = endIndex;
  1176. //添加新数据
  1177. int insertMouth = scadaMapper.insertMouth(scadaList, tableName);
  1178. log.info("{}新增数据:{}条", tableName, insertMouth);
  1179. }
  1180. }
  1181. return size;
  1182. }
  1183. @Override
  1184. @Transactional(rollbackFor = Exception.class)
  1185. public void fetchData(String code, String start, String end, Integer type) {
  1186. Date startTime;
  1187. Date endTime;
  1188. try {
  1189. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
  1190. startTime = format.parse(start);
  1191. endTime = format.parse(end);
  1192. } catch (Exception e) {
  1193. throw new RuntimeException("时间格式有误");
  1194. }
  1195. long s = System.currentTimeMillis();
  1196. //统计类型0小时,1按天,2月份,3年份
  1197. logger.info("同步scada历史数据开始");
  1198. switch (type) {
  1199. case 0: {
  1200. log.info("开始进行统计计算-----{}", type);
  1201. insert24H(code, startTime, endTime, type);
  1202. long e = System.currentTimeMillis();
  1203. logger.info("统计计算小时结束,用时{}", e - s);
  1204. break;
  1205. }
  1206. case 1: {
  1207. log.info("开始进行统计计算-----{}", type);
  1208. syncDaysData(code, startTime, endTime, type);
  1209. long e = System.currentTimeMillis();
  1210. logger.info("统计计算天结束,用时{}", e - s);
  1211. break;
  1212. }
  1213. case 2: {
  1214. log.info("开始进行统计计算-----{}", type);
  1215. syncDataIntoMonths(code, startTime, endTime, type);
  1216. long e = System.currentTimeMillis();
  1217. logger.info("统计计算月结束,用时{}", e - s);
  1218. break;
  1219. }
  1220. case 3: {
  1221. //TODO 暂时不同步年数据
  1222. // List<Date> dates = findDates(startTime, endTime,0);
  1223. // if (!CollectionUtils.isEmpty(dates)){
  1224. // for (Date date : dates) {
  1225. // syncDaysData(date);
  1226. // }
  1227. // }
  1228. break;
  1229. }
  1230. }
  1231. }
  1232. @Override
  1233. public Page<StatisticsScadaVo> selectHistoryByMouthsTables(String mouthTable, Page page,
  1234. ScadaQuery scadaQuery, boolean isPage) {
  1235. if (isPage) {
  1236. //特殊值处理
  1237. // defaultValHandle(statisticsScadaVoPage);
  1238. return baseMapper.selectHistoryByMouthsTablePage(mouthTable, page, scadaQuery);
  1239. } else {
  1240. Page<StatisticsScadaVo> pageDb = new Page<>();
  1241. List<StatisticsScadaVo> statisticsScadaVoPage = baseMapper.selectHistoryByMouthsTableList(mouthTable, scadaQuery);
  1242. pageDb.setRecords(statisticsScadaVoPage);
  1243. //特殊值处理
  1244. // defaultValHandle(pageDb);
  1245. return pageDb;
  1246. }
  1247. }
  1248. /**
  1249. * 特殊值处理
  1250. *
  1251. * @param statisticsScadaVoPage 数据集合
  1252. */
  1253. private void defaultValHandle(Page<StatisticsScadaVo> statisticsScadaVoPage) {
  1254. for (StatisticsScadaVo scadaVo : statisticsScadaVoPage.getRecords()) {
  1255. if (!StringUtils.hasText(scadaVo.getValue())) {
  1256. scadaVo.setValue("0");
  1257. } else if (StringUtils.hasText(scadaVo.getValue()) && scadaVo.getValue().startsWith(".")) {
  1258. String value = scadaVo.getValue();
  1259. scadaVo.setValue("0" + value);
  1260. }
  1261. }
  1262. }
  1263. @Override
  1264. public void fetchDataMouth(String code, String startStr, String endStr) {
  1265. Calendar calendar = Calendar.getInstance();
  1266. calendar.setTime(new Date());
  1267. Date end = DateUtil.parseDateTime(endStr);
  1268. calendar.add(Calendar.MINUTE, -1);
  1269. Date start = DateUtil.parseDateTime(startStr);
  1270. List<String> codes = getCodes(code);
  1271. if (CollectionUtils.isEmpty(codes)) {
  1272. return;
  1273. }
  1274. List<Scada> scadas = scadaMapper.selectList(new QueryWrapper<Scada>().lambda()
  1275. .between(Scada::getScadaTime, start, end)
  1276. .in(Scada::getCode, codes));
  1277. log.info("查询每分钟定时任务实时数据{}", scadas);
  1278. if (!CollectionUtils.isEmpty(scadas)) {
  1279. for (Scada scada : scadas) {
  1280. //实时监控表
  1281. if (scada.getValue().startsWith(".")) {
  1282. scada.setValue("0" + scada.getValue());
  1283. }
  1284. Scada monitoring = scadaMapper.selectDataByMonitoring(scada.getCode());
  1285. log.info("查询存在的实时数据{}", monitoring);
  1286. if (ObjectUtils.isEmpty(monitoring)) {
  1287. scadaMapper.insertMonitoring(scada);
  1288. log.info("无实时数据,进行新增{}", scada);
  1289. } else {
  1290. scadaMapper.updateMonitoring(scada);
  1291. log.info("有实时数据,进行更新{}", scada);
  1292. }
  1293. //月份表-警告通知
  1294. insertData(scada);
  1295. }
  1296. }
  1297. }
  1298. /**
  1299. * 获取日期列表
  1300. *
  1301. * @param start 开始时间:2022-01-01 00:00:00
  1302. * @param end 结束时间:2022-10-01 14:00:00
  1303. * @param type 统计类型 //统计类型0小时,1按天,2月份,3年份
  1304. * @return
  1305. */
  1306. public static List<Date> findDates(Date start, Date end, Integer type) {
  1307. Calendar s = Calendar.getInstance();
  1308. s.setTime(start);
  1309. // Calendar e = Calendar.getInstance();
  1310. // e.setTime(end);
  1311. // if (start.getYear() == end.getYear() && start.getMonth() == end.getMonth() && start.getDay() == end.getDay()) {
  1312. // List<Date> dates = new ArrayList<>();
  1313. // dates.add(start);
  1314. // return dates;
  1315. // }
  1316. // if (start.getYear() == end.getYear() && start.getMonth() == end.getMonth() && type == 0) {
  1317. // List<Date> dates = new ArrayList<>();
  1318. // dates.add(start);
  1319. // return dates;
  1320. // }
  1321. //结束时间在开始时间之后
  1322. List<Date> dates = new ArrayList<>();
  1323. dates.add(start);
  1324. dates.add(end);
  1325. while (end.getTime() > s.getTime().getTime()) {
  1326. if (type == 1) {
  1327. s.add(Calendar.DAY_OF_MONTH, 1);
  1328. } else if (type == 2) {
  1329. s.add(Calendar.MONTH, 1);
  1330. } else if (type == 3) {
  1331. s.add(Calendar.DAY_OF_YEAR, 1);
  1332. } else if (type == 0) {
  1333. s.add(Calendar.HOUR_OF_DAY, 1);
  1334. } else if (type == 4) {
  1335. s.add(Calendar.MINUTE, 5);
  1336. }
  1337. dates.add(s.getTime());
  1338. }
  1339. return dates;
  1340. }
  1341. /**
  1342. * 异步执行同步信息
  1343. *
  1344. * @param data
  1345. */
  1346. public void syncSave(Scada data) {
  1347. if (ObjectUtils.isEmpty(data)) {
  1348. throw new RuntimeException("参数为空");
  1349. }
  1350. System.out.println("主线程 =====> 开始 =====> " + System.currentTimeMillis());
  1351. ExecutorService executorService = Executors.newSingleThreadExecutor();
  1352. executorService.submit(() -> {
  1353. System.out.println("异步线程 =====> 开始 =====> " + System.currentTimeMillis());
  1354. try {
  1355. Thread.sleep(5000);
  1356. syncScadaData(data);
  1357. } catch (InterruptedException e) {
  1358. e.printStackTrace();
  1359. }
  1360. System.out.println("异步线程 =====> 结束 =====> " + System.currentTimeMillis());
  1361. });
  1362. executorService.shutdown(); // 回收线程池
  1363. try {
  1364. Thread.sleep(2000);
  1365. } catch (InterruptedException e) {
  1366. throw new RuntimeException(e);
  1367. }
  1368. System.out.println("主线程 =====> 结束 =====> " + System.currentTimeMillis());
  1369. }
  1370. private void syncScadaData(Scada scada) {
  1371. // Scada scada = null;
  1372. // if (data.getId() != null) {
  1373. // scada = scadaMapper.selectById(data.getId());
  1374. // } else if (StringUtils.hasText(data.getCode())) {
  1375. // Page<Scada> page = new Page<>(1, 1);
  1376. // QueryWrapper<Scada> scadaQueryWrapper = new QueryWrapper<>();
  1377. // scadaQueryWrapper.eq("CODE", data.getCode())
  1378. // .orderByDesc("SCADA_TIME");
  1379. // Page<Scada> scadas = scadaMapper.selectPage(page, scadaQueryWrapper);
  1380. // List<Scada> records = scadas.getRecords();
  1381. // if (!CollectionUtils.isEmpty(records)) {
  1382. // scada = records.get(0);
  1383. // }
  1384. // }
  1385. log.info("获取到同步scada信息数据:-----{}-----", scada.toString());
  1386. if (!ObjectUtils.isEmpty(scada)) {
  1387. //检测是否为系统创建的指标
  1388. log.info("检测是否为系统创建的指标");
  1389. Integer selectCount = allocationMapper.selectCount(new QueryWrapper<Allocation>()
  1390. .eq("VARIABLE_CODE", scada.getCode()));
  1391. if (selectCount == 0) {
  1392. log.info("数据不存在指标绑定配置,无法进行同步更新存储");
  1393. throw new RuntimeException("数据不存在指标绑定配置,无法进行同步更新存储");
  1394. }
  1395. //存储到历史表
  1396. scadaMapper.insert(scada);
  1397. //存储到分表月份
  1398. String tableName = createMonths(scada.getScadaTime() == null ? new Date() : scada.getScadaTime());
  1399. log.info("创建月份表成功:-----{}-----", tableName);
  1400. //检测数据是否存在,不存在就插入
  1401. if (CollectionUtils.isEmpty(scadaMapper.selectHistoryById(tableName, scada.getId()))) {
  1402. log.info("scada数据不存在月份表中,正在执行新增数据");
  1403. scadaMapper.insertByOne(scada, tableName);
  1404. log.info("新增数据成功:-----{}-----", scada);
  1405. }
  1406. //检测报警信息
  1407. sendNotice(scada);
  1408. }
  1409. }
  1410. /**
  1411. * 发送报警信息
  1412. *
  1413. * @param scada scada数据
  1414. */
  1415. @SneakyThrows
  1416. @Async
  1417. void sendNotice(Scada scada) {
  1418. Thread.sleep(5000);
  1419. scada = getScada(scada);
  1420. log.info("获取到同步scada信息数据:-----{}-----", scada.toString());
  1421. if (!StringUtils.hasText(scada.getValue())) {
  1422. logger.info("数据:{},存在value值为空,报警流程结束", scada);
  1423. return;
  1424. }
  1425. QueryWrapper<Allocation> allocationQueryWrapper = new QueryWrapper<>();
  1426. allocationQueryWrapper.eq("VARIABLE_CODE", scada.getCode())
  1427. .eq("IS_ALARM", "1");
  1428. List<Allocation> allocations = allocationMapper.selectList(allocationQueryWrapper);
  1429. if (!CollectionUtils.isEmpty(allocations)) {
  1430. for (Allocation allocation : allocations) {
  1431. if (!StringUtils.hasText(allocation.getIsAlarm()) || allocation.getIsAlarm().equals("0")) {
  1432. logger.info("未设置报警开关,正在退出这次匹配");
  1433. break;
  1434. }
  1435. List<Long> reports = getReportIds(Collections.singletonList(allocation));
  1436. QueryWrapper<ScadaReport> scadaReportQueryWrapper = new QueryWrapper<>();
  1437. scadaReportQueryWrapper.eq("IS_DEL", FlowConstant.SHORT_ZERO)
  1438. .in("ID", reports);
  1439. List<ScadaReport> scadaReports = reportMapper.selectList(scadaReportQueryWrapper);
  1440. if (!CollectionUtils.isEmpty(scadaReports)) {
  1441. logger.info("匹配到告警模版...");
  1442. //报警值
  1443. BigDecimal scadaVal = new BigDecimal(scada.getValue());
  1444. for (ScadaReport scadaReport : scadaReports) {
  1445. BigDecimal upVal = new BigDecimal(scadaReport.getReportUpper());
  1446. BigDecimal downVal = new BigDecimal(scadaReport.getReportLower());
  1447. //当值不在正常的范围里,需要报警处理
  1448. if (scadaVal.compareTo(upVal) > 0 || scadaVal.compareTo(downVal) < 0) {
  1449. MessagepushW messagepush = new MessagepushW();
  1450. // TODO 报警模版后续处理
  1451. messagepush.setTopic(MessageTopicEnum.REPORT_MSG.getTopic());
  1452. messagepush.setMessage(createMsg(scadaReport.getMsgTemplate(), scadaReport));
  1453. messagepush.setType("8");
  1454. messagepush.setTableName(MessageTopicEnum.REPORT_MSG.getTableName());
  1455. messagepush.setCreateTime(scada.getScadaTime());
  1456. sendTidingsUtil.sendMsg(messagepush, scadaReport.getLiaisonPeople());
  1457. logger.info("发送报警消息:{}", messagepush);
  1458. //保存报警信息列表数据
  1459. saveReportListData(scada, scadaReports, allocations);
  1460. logger.info("记录到报警列表:{}", scada);
  1461. }
  1462. }
  1463. log.info("--------报警处理结束----------");
  1464. } else {
  1465. log.info("--------未匹配到报警模版---报警处理结束-------");
  1466. }
  1467. }
  1468. }
  1469. }
  1470. /**
  1471. * 发送报警信息(通过定时任务获取的数据)
  1472. *
  1473. * @param scada scada数据
  1474. */
  1475. @SneakyThrows
  1476. @Async
  1477. void sendNotice(Scada scada, String date) {
  1478. if (!StringUtils.hasText(scada.getValue())) {
  1479. logger.info("数据:{},存在value值为空,报警流程结束", scada);
  1480. return;
  1481. }
  1482. QueryWrapper<Allocation> allocationQueryWrapper = new QueryWrapper<>();
  1483. allocationQueryWrapper.eq("VARIABLE_CODE", scada.getCode())
  1484. .eq("IS_ALARM", "1");
  1485. List<Allocation> allocations = allocationMapper.selectList(allocationQueryWrapper);
  1486. if (!CollectionUtils.isEmpty(allocations)) {
  1487. for (Allocation allocation : allocations) {
  1488. if (!StringUtils.hasText(allocation.getIsAlarm()) || allocation.getIsAlarm().equals("0")) {
  1489. logger.info("未设置报警开关,正在退出这次匹配");
  1490. break;
  1491. }
  1492. List<Long> reports = getReportIds(Collections.singletonList(allocation));
  1493. QueryWrapper<ScadaReport> scadaReportQueryWrapper = new QueryWrapper<>();
  1494. scadaReportQueryWrapper.eq("IS_DEL", FlowConstant.SHORT_ZERO)
  1495. .in("ID", reports);
  1496. List<ScadaReport> scadaReports = reportMapper.selectList(scadaReportQueryWrapper);
  1497. if (!CollectionUtils.isEmpty(scadaReports)) {
  1498. //报警值
  1499. BigDecimal scadaVal = new BigDecimal(scada.getValue());
  1500. for (ScadaReport scadaReport : scadaReports) {
  1501. BigDecimal upVal = new BigDecimal(scadaReport.getReportUpper());
  1502. BigDecimal downVal = new BigDecimal(scadaReport.getReportLower());
  1503. //当值不在正常的范围里,需要报警处理
  1504. if (scadaVal.compareTo(upVal) > 0 || scadaVal.compareTo(downVal) < 0) {
  1505. MessagepushW messagepush = new MessagepushW();
  1506. // TODO 报警模版后续处理
  1507. messagepush.setTopic(MessageTopicEnum.REPORT_MSG.getTopic());
  1508. messagepush.setMessage(createMsg(scadaReport.getMsgTemplate(), scadaReport));
  1509. messagepush.setType("8");
  1510. messagepush.setTableName(MessageTopicEnum.REPORT_MSG.getTableName());
  1511. messagepush.setCreateTime(scada.getScadaTime());
  1512. sendTidingsUtil.sendMsg(messagepush, scadaReport.getLiaisonPeople());
  1513. logger.info("发送报警消息:{}", messagepush);
  1514. //保存报警信息列表数据
  1515. saveReportListData(scada, scadaReports, allocations);
  1516. }
  1517. }
  1518. log.info("--------报警处理结束----------");
  1519. } else {
  1520. log.info("--------未匹配到报警模版---报警处理结束-------");
  1521. }
  1522. }
  1523. }
  1524. }
  1525. private void saveReportListData(Scada scada, List<ScadaReport> scadaReports, List<Allocation> allocations) {
  1526. TfScadaReportList tfScadaReportList = new TfScadaReportList();
  1527. BeanUtils.copyProperties(scada, tfScadaReportList);
  1528. List<String> levelTitles = scadaReports.stream().filter(scadaReport -> StringUtils.hasText(scadaReport.getReportLevel()))
  1529. .map(ScadaReport::getReportLevel).collect(Collectors.toList());
  1530. if (!CollectionUtils.isEmpty(levelTitles)) {
  1531. String join = org.apache.commons.lang3.StringUtils.join(levelTitles);
  1532. tfScadaReportList.setReportLevel(join);
  1533. }
  1534. List<String> reportUppers = scadaReports.stream().filter(scadaReport -> StringUtils.hasText(scadaReport.getReportUpper()))
  1535. .map(ScadaReport::getReportUpper).collect(Collectors.toList());
  1536. if (!CollectionUtils.isEmpty(reportUppers)) {
  1537. String join = org.apache.commons.lang3.StringUtils.join(reportUppers);
  1538. tfScadaReportList.setReportUpper(join);
  1539. }
  1540. List<String> reportLowers = scadaReports.stream().filter(scadaReport -> StringUtils.hasText(scadaReport.getReportLower()))
  1541. .map(ScadaReport::getReportLower).collect(Collectors.toList());
  1542. if (!CollectionUtils.isEmpty(reportLowers)) {
  1543. String join = org.apache.commons.lang3.StringUtils.join(reportLowers);
  1544. tfScadaReportList.setReportLower(join);
  1545. }
  1546. List<String> displayNames = allocations.stream().filter(scadaReport -> StringUtils.hasText(scadaReport.getDisplayName()))
  1547. .map(Allocation::getDisplayName).collect(Collectors.toList());
  1548. if (!CollectionUtils.isEmpty(displayNames)) {
  1549. String join = org.apache.commons.lang3.StringUtils.join(displayNames);
  1550. tfScadaReportList.setDisplayName(join);
  1551. }
  1552. tfScadaReportList.setStatus("0");
  1553. tfScadaReportListService.insert(tfScadaReportList);
  1554. log.info("添加列表报警数据成功:{}", tfScadaReportList);
  1555. }
  1556. /**
  1557. * 构建信息内容模版
  1558. *
  1559. * @param template
  1560. * @param scadaReport
  1561. * @return
  1562. */
  1563. public String createMsg(String template, ScadaReport scadaReport) {
  1564. if (template.contains("#TIME#")) {
  1565. String date = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss");
  1566. template = template.replaceAll("#TIME#", date);
  1567. }
  1568. if (template.contains("#YHXM#")) {
  1569. String user = scadaReportService.getUser(scadaReport.getLiaisonPeople());
  1570. template = template.replace(user, "#YHXM#");
  1571. }
  1572. if (template.contains("#LXDH#")) {
  1573. template = template.replace(scadaReport.getLiaisonPhone(), "#LXDH#");
  1574. }
  1575. return template;
  1576. }
  1577. /**
  1578. * 获取报警ID集合
  1579. *
  1580. * @param allocations
  1581. * @return
  1582. */
  1583. private List<Long> getReportIds(List<Allocation> allocations) {
  1584. List<Long> reports = allocations.stream().filter(allocation -> StringUtils.hasText(allocation.getReportId()))
  1585. .map(allocation -> {
  1586. if (allocation.getReportId().contains(",")) {
  1587. String[] reportIds = allocation.getReportId().split(",");
  1588. for (String reportId : reportIds) {
  1589. return Long.parseLong(reportId);
  1590. }
  1591. } else {
  1592. return Long.parseLong(allocation.getReportId());
  1593. }
  1594. return null;
  1595. }).collect(Collectors.toList());
  1596. return reports;
  1597. }
  1598. /**
  1599. * 获取报警ID集合(去重)
  1600. *
  1601. * @param allocations
  1602. * @return
  1603. */
  1604. public static Set<Long> getReportIdSets(List<Allocation> allocations) {
  1605. Set<Long> reports = allocations.stream().filter(allocation -> StringUtils.hasText(allocation.getReportId()))
  1606. .map(allocation -> {
  1607. if (allocation.getReportId().contains(",")) {
  1608. String[] reportIds = allocation.getReportId().split(",");
  1609. for (String reportId : reportIds) {
  1610. return Long.parseLong(reportId);
  1611. }
  1612. } else {
  1613. return Long.parseLong(allocation.getReportId());
  1614. }
  1615. return null;
  1616. }).collect(Collectors.toSet());
  1617. return reports;
  1618. }
  1619. @Scheduled(cron = "0 0/5 * * * ? ")
  1620. @Transactional(rollbackFor = Exception.class)
  1621. public void automaticSendNetCumulativeFlow() {
  1622. Calendar calendar = Calendar.getInstance();
  1623. calendar.setTime(new Date());
  1624. Date endTime = calendar.getTime();
  1625. calendar.add(Calendar.MINUTE, -5);
  1626. Date startTime = calendar.getTime();
  1627. sendNetCumulativeFlow(startTime, endTime);
  1628. }
  1629. @Override
  1630. @Transactional(rollbackFor = Exception.class)
  1631. public void manualSendNetCumulativeFlow(Date sDate, Date eDate) {
  1632. log.info("手动同步scada正反数据");
  1633. sendNetCumulativeFlow(sDate, eDate);
  1634. }
  1635. private void sendNetCumulativeFlow(Date sDate, Date eDate) {
  1636. long l = System.currentTimeMillis();
  1637. List<Scada> scadaList = new ArrayList<>();
  1638. // 获取时间范围内设备类型deviceType 为 2,3 的指标类型为正反17 18的监测数据
  1639. logger.info("同步scada正反数据时间区间: {}", DateUtil.format(sDate, "yyyy-MM-dd HH:mm:ss") + "-" + DateUtil.format(eDate, "yyyy-MM-dd HH:mm:ss"));
  1640. List<NetCumulativeFlow> netCumulativeFlow = scadaMapper.getNetCumulativeFlow(sDate, eDate);
  1641. logger.info("同步scada正反数据获取大小: {}", netCumulativeFlow.size());
  1642. // 根据设备id分组
  1643. Map<Long, List<NetCumulativeFlow>> longListMap = netCumulativeFlow.stream().collect(Collectors.groupingBy(NetCumulativeFlow::getDeviceId));
  1644. // 处理虚拟指标,获取对应虚拟指标信息
  1645. Map<Long, Allocation> newAllocation = newAllocation(longListMap.keySet());
  1646. logger.info("同步scada虚拟指标: {}", JSON.toJSONString(newAllocation));
  1647. // 循环处理计算每个设备净累计
  1648. longListMap.forEach((k, v) -> {
  1649. Allocation allocation = newAllocation.get(k);
  1650. // 按照时间分组
  1651. Map<Date, List<NetCumulativeFlow>> dateListMap = v.stream().collect(Collectors.groupingBy(NetCumulativeFlow::getScadaTime));
  1652. dateListMap.forEach((n, m) -> {
  1653. AtomicReference<BigDecimal> value = new AtomicReference<>(BigDecimal.ZERO);
  1654. m.forEach(i -> {
  1655. BigDecimal bigDecimal = BigDecimal.valueOf(Double.parseDouble(i.getValue() == null ? "0" : i.getValue()));
  1656. value.set(value.get().subtract(bigDecimal).abs());
  1657. });
  1658. Scada scada = new Scada();
  1659. BigDecimal bigDecimal = value.get();
  1660. scada.setCode(allocation.getVariableCode());
  1661. scada.setScadaTime(n);
  1662. scada.setValue(value.get().toString());
  1663. //
  1664. if (null != bigDecimal && bigDecimal.compareTo(BigDecimal.ZERO) != 0) {
  1665. scadaList.add(scada);
  1666. }
  1667. });
  1668. });
  1669. logger.info("同步scada新增scada历史数据大小: {}", scadaList.size());
  1670. // 新增scada历史数据
  1671. if (scadaList.size() > 0) {
  1672. // 清空编码后缀为.JLJ的指标数据
  1673. this.remove(new QueryWrapper<Scada>().lambda().like(Scada::getCode, ".JLJ").ge(Scada::getScadaTime, sDate).le(Scada::getScadaTime,eDate));
  1674. scadaMonitorService.remove(new QueryWrapper<ScadaMonitor>().lambda().like(ScadaMonitor::getCode, ".JLJ"));
  1675. this.saveBatch(scadaList,2000);
  1676. // 获取最大时间数据更新实时数据
  1677. Map<String, List<Scada>> collect = scadaList.stream().collect(Collectors.groupingBy(Scada::getCode));
  1678. List<ScadaMonitor> scadaMonitorList = new ArrayList<>();
  1679. collect.forEach((k,v)->{
  1680. Optional<Scada> max = v.stream().max(Comparator.comparingDouble(d->d.getScadaTime().getTime()));
  1681. Scada scada = max.get();
  1682. ScadaMonitor scadaMonitor = new ScadaMonitor(scada.getCode(),scada.getValue(),scada.getScadaTime());
  1683. scadaMonitorList.add(scadaMonitor);
  1684. });
  1685. logger.info("同步scada新增scada实时数据: {}", JSON.toJSONString(scadaMonitorList));
  1686. scadaMonitorService.saveBatch(scadaMonitorList);
  1687. }
  1688. long l1 = System.currentTimeMillis();
  1689. log.info("同步scada正反数据,用时{}mm", l1 - l);
  1690. }
  1691. /**
  1692. * 处理虚拟指标
  1693. *
  1694. * @param deviceIdList
  1695. * @return
  1696. */
  1697. private Map<Long, Allocation> newAllocation(Set<Long> deviceIdList) {
  1698. // 存放需要新增的虚拟指标列表
  1699. Set<Long> xnDeviceIdSet = new HashSet<>();
  1700. // 获取虚拟指标
  1701. List<Allocation> allocations = allocationMapper.selectList(new QueryWrapper<Allocation>().lambda().eq(Allocation::getType, "19"));
  1702. if (null != allocations && allocations.size() > 0) {
  1703. // 虚拟指标设备id分组 一个设备只能有一个虚拟指标
  1704. Map<Long, List<Allocation>> xnCodeMap = allocations.stream().collect(Collectors.groupingBy(Allocation::getDeviceId));
  1705. Set<Long> collect = deviceIdList.stream().filter(deviceId -> !xnCodeMap.containsKey(deviceId)).collect(Collectors.toSet());
  1706. xnDeviceIdSet.addAll(collect);
  1707. } else {
  1708. // 全部赋值需要新增虚拟表的设备
  1709. xnDeviceIdSet.addAll(deviceIdList);
  1710. }
  1711. // 新增虚拟指标
  1712. List<Allocation> newAllocationList = new ArrayList<>();
  1713. if (xnDeviceIdSet.size() > 0) {
  1714. // 获取需要新增虚拟的指标设备列表
  1715. List<Allocation> oldAllocationList = allocationMapper.selectList(new QueryWrapper<Allocation>().lambda().in(Allocation::getDeviceId, xnDeviceIdSet));
  1716. // 按照设备id分组
  1717. Map<Long, List<Allocation>> mapAllocation = oldAllocationList.stream().collect(Collectors.groupingBy(Allocation::getDeviceId));
  1718. mapAllocation.forEach((k, v) -> {
  1719. // 判断当前设备下是否存在正向指标 若不存在则获取反向属性
  1720. Map<String, Allocation> allocationMap = v.stream().collect(Collectors.toMap(Allocation::getType, t -> t, (key1, key2) -> key2));
  1721. Allocation allocation;
  1722. if (allocationMap.containsKey("17")) {
  1723. allocation = allocationMap.get("17");
  1724. } else {
  1725. allocation = allocationMap.get("18");
  1726. }
  1727. allocation.setId(null);
  1728. allocation.setVariableCode(allocation.getVariableCode() + ".JLJ");
  1729. allocation.setVariableName(allocation.getVariableName() + ".净累计");
  1730. allocation.setDisplayName(allocation.getDisplayName() + ".净累计");
  1731. allocation.setType("19");
  1732. newAllocationList.add(allocation);
  1733. });
  1734. // 新增指标记录
  1735. // 同步到区域表
  1736. if (newAllocationList.size() > 0) {
  1737. allocationService.saveBatch(newAllocationList);
  1738. List<Dmatable> dmaTableList = newAllocationList.stream().map(v -> new Dmatable(v.getDeviceId(), Double.parseDouble("0"))).collect(Collectors.toList());
  1739. dmatableService.saveBatch(dmaTableList);
  1740. }
  1741. }
  1742. // 返回所有 设备id 虚拟指标设备
  1743. if (null != allocations && allocations.size() > 0) {
  1744. newAllocationList.addAll(allocations);
  1745. }
  1746. return newAllocationList.stream().collect(Collectors.toMap(Allocation::getDeviceId, t -> t, (key1, key2) -> key2));
  1747. }
  1748. /**
  1749. * 判断字符串内容是否为数字类型
  1750. *
  1751. * @param str
  1752. * @return
  1753. */
  1754. public static boolean isNumber(String str) {
  1755. Matcher m = pattern.matcher(str);
  1756. return m.matches();
  1757. }
  1758. }