Procházet zdrojové kódy

批次集合、不重复下载、时间修正

hdc před 2 roky
rodič
revize
c4588fe331

+ 9 - 0
tongfei_river_data_collection/pom.xml

@@ -54,6 +54,15 @@
             <artifactId>lombok</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>net.sf.ehcache</groupId>
+            <artifactId>ehcache</artifactId>
+            <version>2.10.9.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context-support</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.jsoup</groupId>
             <artifactId>jsoup</artifactId>

+ 3 - 0
tongfei_river_data_collection/src/main/java/com/ublinkage/datacollection/Application.java

@@ -3,12 +3,15 @@ package com.ublinkage.datacollection;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.system.ApplicationHome;
+import org.springframework.cache.annotation.EnableCaching;
 
 import java.io.File;
 
 @SpringBootApplication
+@EnableCaching
 public class Application {
     public static void main(String[] args) {
+        System.setProperty(net.sf.ehcache.CacheManager.ENABLE_SHUTDOWN_HOOK_PROPERTY,"true");
         ApplicationHome home = new ApplicationHome(Application.class);
         String logPath = home.getDir().getAbsolutePath() + File.separator + "log";
         System.setProperty("LOG_PATH", logPath);

+ 12 - 0
tongfei_river_data_collection/src/main/java/com/ublinkage/datacollection/Constant.java

@@ -0,0 +1,12 @@
+package com.ublinkage.datacollection;
+
+import java.time.format.DateTimeFormatter;
+
+public class Constant {
+
+    public final static DateTimeFormatter BATCH_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH_192");
+    public final static DateTimeFormatter BATCH_DATE_TIME_FORMATTER2 = DateTimeFormatter.ofPattern("yyyyMMdd_HH_192");
+    private Constant(){
+
+    }
+}

+ 2 - 5
tongfei_river_data_collection/src/main/java/com/ublinkage/datacollection/Util.java

@@ -20,9 +20,8 @@ public class Util {
 
     @NotNull
     private static LocalDateTime getLatestPublishLateTime() {
-        //LocalDateTime now = LocalDateTime.of(2023, 6, 27, 1, 0);
         LocalDateTime now = LocalDateTime.now(Clock.systemUTC());
-        long seconds = now.toEpochSecond(ZoneOffset.of("+8"));
+        long seconds = now.plusHours(-12).toEpochSecond(ZoneOffset.of("+0"));
         long hours = seconds / (60 * 60);
         long newHours = (hours / 6) * 6;
         LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(newHours * (60 * 60), 0, ZoneOffset.UTC);
@@ -31,10 +30,8 @@ public class Util {
     }
 
     public static String getPublishDt(){
-        //LocalDateTime now = LocalDateTime.of(2023, 6, 27, 1, 0);
         LocalDateTime localDateTime = getLatestPublishLateTime();
-        DateTimeFormatter pattern = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH_192");
-        return pattern.format(localDateTime);
+        return localDateTime.format(Constant.BATCH_DATE_TIME_FORMATTER);
     }
 
     public static void main(String[] args) {

+ 387 - 236
tongfei_river_data_collection/src/main/java/com/ublinkage/datacollection/service/QxybDataService.java

@@ -2,12 +2,16 @@ package com.ublinkage.datacollection.service;
 
 import cn.hutool.core.io.FileUtil;
 import com.alibaba.fastjson.JSONObject;
+import com.ublinkage.datacollection.Constant;
 import com.ublinkage.datacollection.entity.PublishInfo;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 
@@ -19,7 +23,9 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * 气象预报数据采集服务
@@ -42,6 +48,11 @@ public class QxybDataService {
     private String snowCdFile;
     @Value("${snow.qzk.file}")
     private String snowQzkFile;
+    @Autowired
+    private CacheManager cacheManager;
+
+    private final static String CACHE_NAME_DOWNLOAD_DATA = "cache_downloaded_data";
+    private final static String CACHE_KEY_DOWNLOAD_DATA = "key_downloaded_data";
 
     protected static OkHttpClient client = new OkHttpClient().newBuilder()
             .connectTimeout(10, TimeUnit.SECONDS)
@@ -61,28 +72,26 @@ public class QxybDataService {
         // 服务器不能并行
         try {
             log.info("###开始下载流域15分区降水预报:{}-{}", startTime, endTime);
-            this.downloadDIVRainData("15", startTime, endTime);
+            this.downloadDIVRainData("LCJ_DIV15", startTime, endTime);
             log.info("###结束下载流域15分区降水预报:{}-{}", startTime, endTime);
         } catch (Exception e) {
             log.error("###下载流域15分区降水预报异常", e);
         }
 
-
         try {
             log.info("###开始下载流域29分区降水预报:{}-{}", startTime, endTime);
-            this.downloadDIVRainData("29", startTime, endTime);
+            this.downloadDIVRainData("LCJ_DIV29", startTime, endTime);
             log.info("###结束下载流域29分区降水预报:{}-{}", startTime, endTime);
         } catch (Exception e) {
             log.error("下载下载流域29分区降水预报异常", e);
         }
 
-
         try {
-            log.info("###开始下载网格降水/温度预报:{}-{}", startTime, endTime);
-            this.downloadGridRainAndTempData(startTime, endTime);
-            log.info("###结束下载网格降水/温度预报:{}-{}", startTime, endTime);
+            log.info("###开始下载澜沧江上游高程带温度和降水预报:{}-{}", startTime, endTime);
+            this.downloadDomainTempAndRainData(startTime, endTime);
+            log.info("###结束下载澜沧江上游高程带温度和降水预报:{}-{}", startTime, endTime);
         } catch (Exception e) {
-            log.error("###下载网格降水/温度预报异常", e);
+            log.error("###下载澜沧江上游高程带温度和降水预报异常", e);
         }
 
         try {
@@ -94,11 +103,11 @@ public class QxybDataService {
         }
 
         try {
-            log.info("###开始下载澜沧江上游高程带温度和降水预报:{}-{}", startTime, endTime);
-            this.getDomainTempAndRain(startTime, endTime);
-            log.info("###结束下载澜沧江上游高程带温度和降水预报:{}-{}", startTime, endTime);
+            log.info("###开始下载网格降水/温度/流量预报:{}-{}", startTime, endTime);
+            this.downloadGridRainAndTempData(startTime, endTime);
+            log.info("###结束下载网格降水/温度/流量预报:{}-{}", startTime, endTime);
         } catch (Exception e) {
-            log.error("###下载澜沧江上游高程带温度和降水预报异常", e);
+            log.error("###下载网格降水/温度/流量预报异常", e);
         }
 
         log.info("###结束下载气象预报数据:{}-{}", startTime, endTime);
@@ -109,58 +118,308 @@ public class QxybDataService {
     /**
      * 下载流域15、29分区降水预报
      */
-    private void downloadDIVRainData(String divNum, String startTime, String endTime) {
-        String divisionCode = "LCJ_DIV" + divNum;
+    private void downloadDIVRainData(String divisionCode, String startTime, String endTime) {
         int hour = 1;
         List<PublishInfo> publishList = null;
         try {
-            String url = BASE_URL + "/getForePrecipPublishDT?divisionCode=" + divisionCode + "&hour=" + hour + "&ensemble=" + ensemble + "&startTime=" + startTime + "&endTime=" + endTime;
-            log.info("下载流域{}分区降水预报批次:{}", divNum, url);
+            String batchFileName = String.format("%s,%s,%s,%s,%s.json", "FRB", divisionCode, hour, forFileName(startTime), forFileName(endTime));
+            String url = String.format("%s/getForePrecipPublishDT?divisionCode=%s&hour=%s&startTime=%s&endTime=%s",
+                    BASE_URL, divisionCode, hour, startTime, endTime);
+            log.info("下载{}:{}", batchFileName, url);
             String batchResult = getResult(url);
+            writeFile(batchResult, downloadFilePath + batchFileName);
+            log.info("下载{}成功", batchFileName);
             JSONObject json = JSONObject.parseObject(batchResult);
             publishList = json.getJSONArray("data").toJavaList(PublishInfo.class);
-            writeFile(batchResult, downloadFilePath + "FRB," + divisionCode + "," + hour + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-            log.info("下载成功");
+            Map<String, List<PublishInfo>> publishDtMap = publishList.stream()
+                    .collect(Collectors.groupingBy(e -> e.getPublishDT(), Collectors.toList()));
+
+            publishDtMap.forEach((k, v) -> {
+                v.add(new PublishInfo(k, "MPSE", getPublishTime(k)));
+                v.forEach(p -> {
+                    downloadDivRainData("FRD", divisionCode, p, 1);
+                });
+            });
+            publishDtMap.forEach((k, v) -> {
+                v.add(new PublishInfo(k, "MPSE", getPublishTime(k)));
+                v.forEach(p -> {
+                    downloadDivRainData("FRD", divisionCode, p, 24);
+                });
+            });
+
         } catch (Exception e) {
             log.error("下载失败", e);
-            return;
         }
+    }
 
-        for (PublishInfo publishInfo : publishList) {
-            try {
-                int hour2 = 1;
-                String url2 = BASE_URL + "/getForePrecipData?publishDT=" + publishInfo.getPublishDT() + "&divisionCode=" + divisionCode + "&hour=" + hour2 + "&ensemble=" + ensemble;
-                log.info("下载流域{}分区降水预报数据(小时尺度):{}", divNum, url2);
+    private void downloadDomainTempAndRainData(String startTime, String endTime) {
+        List<PublishInfo> publishList = null;
+        try {
+            String divisionCode = "LCJ_DGD";
+            int hour = 24;
+            String batchFileName = String.format("%s,%s,%s,%s.json", "DomainBatch", hour, forFileName(startTime), forFileName(endTime));
+            String url = String.format("%s/getForePrecipPublishDT?divisionCode=%s&hour=%s&startTime=%s&endTime=%s",
+                    BASE_URL, divisionCode, hour, startTime, endTime);
+            log.info("下载{}:{}", batchFileName, url);
+            String batchResult = getResult(url);
+            writeFile(batchResult, downloadFilePath + batchFileName);
+            log.info("下载{}成功", batchFileName);
+
+            JSONObject json = JSONObject.parseObject(batchResult);
+            publishList = json.getJSONArray("data").toJavaList(PublishInfo.class);
+            Map<String, List<PublishInfo>> publishDtMap = publishList.stream()
+                    .collect(Collectors.groupingBy(e -> e.getPublishDT(), Collectors.toList()));
+
+            publishDtMap.forEach((k, v) -> {
+                v.forEach(p -> {
+                    downloadDivRainData("DomainRain", divisionCode, p, 24);
+                });
+            });
+
+            publishDtMap.forEach((k, v) -> {
+                v.forEach(p -> {
+                    downloadDivTempData("DomainTemp", divisionCode, p, 24);
+                });
+            });
+
+        } catch (Exception e) {
+            log.error("下载失败", e);
+        }
+    }
+
+    private void downloadDivRainData(String prefix, String divisionCode, PublishInfo p, int hour) {
+        try {
+            String publishDT = p.getPublishDT();
+            String ensemble1 = p.getEnsemble();
+            String fileName = String.format("%s,%s,%s,%s,%s.json", prefix, divisionCode, publishDT, ensemble1, hour);
+            if (!isSuccess(fileName)) {
+                String url2 = String.format("%s/getForePrecipData?publishDT=%s&divisionCode=%s&hour=%s&ensemble=%s",
+                        BASE_URL, publishDT, divisionCode, hour, ensemble1.equals("MPSE") ? "" : ensemble1);
+                log.info("下载{}:{}", fileName, url2);
                 String result = getResult(url2);
-                writeFile(result, downloadFilePath + "FRD," + publishInfo.getPublishDT() + "," + divisionCode + "," + hour2 + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-                log.info("下载成功");
-            } catch (Exception e) {
-                log.warn("下载失败", e);
-            }
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // ignore ...
+                writeFile(result, downloadFilePath + fileName);
+                Cache cache = cacheManager.getCache(CACHE_NAME_DOWNLOAD_DATA);
+                cache.put(fileName, System.currentTimeMillis());
+                log.info("下载{}成功", fileName);
+            } else {
+                log.info("{}已下载", fileName);
             }
+        } catch (Exception e) {
+            log.error("下载失败", e);
+        }
+    }
 
-            try {
-                int hour2 = 24;
-                String url2 = BASE_URL + "/getForePrecipData?publishDT=" + publishInfo.getPublishDT() + "&divisionCode=" + divisionCode + "&hour=" + hour2 + "&ensemble=" + ensemble;
-                log.info("下载流域{}分区降水预报数据(日尺度):{}", divNum, url2);
+    private void downloadDivTempData(String prefix, String divisionCode, PublishInfo p, int hour) {
+        try {
+            String publishDT = p.getPublishDT();
+            String ensemble1 = p.getEnsemble();
+            String fileName = String.format("%s,%s,%s,%s.json", prefix, publishDT, ensemble1, hour);
+            if (!isSuccess(fileName)) {
+                String url2 = String.format("%s/getT_Forecast_24h_avg_division?publishDT=%s&divisionCode=%s&ensemble=%s",
+                        BASE_URL, publishDT, divisionCode, ensemble1.equals("MPSE") ? "" : ensemble1);
+                log.info("下载{}:{}", fileName, url2);
                 String result = getResult(url2);
-                writeFile(result, downloadFilePath + "FRD," + publishInfo.getPublishDT() + "," + divisionCode + "," + hour2 + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-                log.info("下载成功");
-            } catch (Exception e) {
-                log.warn("下载失败", e);
+                writeFile(result, downloadFilePath + fileName);
+                Cache cache = cacheManager.getCache(CACHE_NAME_DOWNLOAD_DATA);
+                cache.put(fileName, System.currentTimeMillis());
+                log.info("下载{}成功", fileName);
+            } else {
+                log.info("{}已下载", fileName);
             }
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // ignore ...
+        } catch (Exception e) {
+            log.error("下载失败", e);
+        }
+    }
+
+    private void downloadGridRainData(String rdField, String domain, PublishInfo p, int hour) {
+        try {
+            String prefix = "RainGrid";
+            String publishDT = p.getPublishDT();
+            String publishTime = p.getPublishTime();
+            String ensemble1 = p.getEnsemble();
+            String[] strings = publishDT.split("_");
+            String timeFlg = strings[1];
+            LocalDateTime ldtPulishTime = LocalDateTime.parse(publishTime, DATE_TIME_FORMATTER);
+
+            if (hour == 1) {
+                List<LocalDateTime> hoursForGridTemp = getHoursForGridRain(timeFlg, ldtPulishTime);
+
+                hoursForGridTemp.forEach(h -> {
+                    try {
+                        String dt1 = h.format(DATE_TIME_FORMATTER1);
+                        String dt2 = h.plusHours(1).format(DATE_TIME_FORMATTER1);
+                        String fileName = String.format("%s,%s,%s,%s,%s.json", prefix, publishDT, ensemble1, hour, dt1);
+                        if (!isSuccess(fileName)) {
+                            String url2 = String.format("%s/getForecastMatrixByWrfRDField?content=precipitation&RDField=%s&domain=%s&publishDT=%s&ensemble=%s&step=%s&DT1=%s&DT2=%s",
+                                    BASE_URL, rdField, domain, publishDT, ensemble1.equals("MPSE") ? "" : ensemble1, hour, dt1, dt2);
+                            log.info("下载{}:{}", fileName, url2);
+                            String result = getResult(url2);
+                            writeFile(result, downloadFilePath + fileName);
+                            Cache cache = cacheManager.getCache(CACHE_NAME_DOWNLOAD_DATA);
+                            cache.put(fileName, System.currentTimeMillis());
+                            log.info("下载{}成功", fileName);
+                            try {
+                                Thread.sleep(100);
+                            } catch (InterruptedException e) {
+                                // ignore...
+                            }
+                        } else {
+                            log.info("{}已下载", fileName);
+                        }
+
+                    } catch (Exception e) {
+                        log.warn("下载失败", e);
+                    }
+                });
+            } else {
+                List<LocalDateTime> hoursForGridTemp = getDaysForGridRain(timeFlg, ldtPulishTime);
+
+                hoursForGridTemp.forEach(h -> {
+                    try {
+                        String dt1 = h.format(DATE_TIME_FORMATTER1);
+                        String dt2 = h.plusDays(1).format(DATE_TIME_FORMATTER1);
+                        String fileName = String.format("%s,%s,%s,%s,%s.json", prefix, publishDT, ensemble1, hour, dt1);
+                        if (!isSuccess(fileName)) {
+                            String url2 = String.format("%s/getForecastMatrixByWrfRDField?content=precipitation&RDField=%s&domain=%s&publishDT=%s&ensemble=%s&step=%s&DT1=%s&DT2=%s",
+                                    BASE_URL, rdField, domain, publishDT, ensemble1.equals("MPSE") ? "" : ensemble1, hour, dt1, dt2);
+                            log.info("下载{}:{}", fileName, url2);
+                            String result = getResult(url2);
+                            writeFile(result, downloadFilePath + fileName);
+                            Cache cache = cacheManager.getCache(CACHE_NAME_DOWNLOAD_DATA);
+                            cache.put(fileName, System.currentTimeMillis());
+                            log.info("下载{}成功", fileName);
+                            try {
+                                Thread.sleep(100);
+                            } catch (InterruptedException e) {
+                                // ignore...
+                            }
+                        } else {
+                            log.info("{}已下载", fileName);
+                        }
+
+                    } catch (Exception e) {
+                        log.warn("下载失败", e);
+                    }
+                });
             }
+        } catch (Exception e) {
+            log.error("下载失败", e);
         }
     }
 
+    private void downloadGridTempData(String rdField, String domain, PublishInfo p, int hour) {
+        try {
+            String prefix = "TempGrid";
+            String publishDT = p.getPublishDT();
+            String publishTime = p.getPublishTime();
+            String ensemble1 = p.getEnsemble();
+            String[] strings = publishDT.split("_");
+            String timeFlg = strings[1];
+            LocalDateTime ldtPulishTime = LocalDateTime.parse(publishTime, DATE_TIME_FORMATTER);
+            List<LocalDateTime> hoursForGridTemp = getHoursForGridTemp(timeFlg, ldtPulishTime);
+
+            hoursForGridTemp.forEach(h -> {
+                try {
+                    String dt1 = h.format(DATE_TIME_FORMATTER1);
+                    String fileName = String.format("%s,%s,%s,%s,%s.json", prefix, publishDT, ensemble1, hour, dt1);
+                    if (!isSuccess(fileName)) {
+                        String url2 = String.format("%s/getForecastMatrixByWrfRDField?RDField=%s&domain=%s&publishDT=%s&ensemble=%s&step=%s&DT1=%s",
+                                BASE_URL, rdField, domain, publishDT, ensemble1.equals("MPSE") ? "" : ensemble1, hour, dt1);
+                        log.info("下载{}:{}", fileName, url2);
+                        String result = getResult(url2);
+                        writeFile(result, downloadFilePath + fileName);
+                        Cache cache = cacheManager.getCache(CACHE_NAME_DOWNLOAD_DATA);
+                        cache.put(fileName, System.currentTimeMillis());
+                        log.info("下载{}成功", fileName);
+                        try {
+                            Thread.sleep(100);
+                        } catch (InterruptedException e) {
+                            // ignore...
+                        }
+                    } else {
+                        log.info("{}已下载", fileName);
+                    }
+
+                } catch (Exception e) {
+                    log.warn("下载失败", e);
+                }
+            });
+        } catch (Exception e) {
+            log.error("下载失败", e);
+        }
+    }
+
+    private void downloadStationTempData(String rdField, PublishInfo p, int hour) {
+        try {
+            String prefix = "TempSiteData";
+            String publishDT = p.getPublishDT();
+            String ensemble1 = p.getEnsemble();
+
+            String fileName = String.format("%s,%s,%s,%s.json", prefix, publishDT, ensemble1, hour);
+            if (!isSuccess(fileName)) {
+                String url2 = String.format("%s/getForeTempData?RDField=%s&publishDT=%s&ensemble=%s",
+                        BASE_URL, rdField, publishDT, ensemble1.equals("MPSE") ? "" : ensemble1);
+                log.info("下载{}:{}", fileName, url2);
+                String result = getResult(url2);
+                writeFile(result, downloadFilePath + fileName);
+                Cache cache = cacheManager.getCache(CACHE_NAME_DOWNLOAD_DATA);
+                cache.put(fileName, System.currentTimeMillis());
+                log.info("下载{}成功", fileName);
+            } else {
+                log.info("{}已下载", fileName);
+            }
+        } catch (Exception e) {
+            log.error("下载失败", e);
+        }
+    }
+
+    private void downloadGridFlowData(String rdField, String domain, PublishInfo p, int hour) {
+        try {
+            String prefix = "FlowGrid";
+            String publishDT = p.getPublishDT();
+            String publishTime = p.getPublishTime();
+            String ensemble1 = p.getEnsemble();
+            String[] strings = publishDT.split("_");
+            String timeFlg = strings[1];
+            LocalDateTime ldtPulishTime = LocalDateTime.parse(publishTime, DATE_TIME_FORMATTER);
+            List<LocalDateTime> hoursForGridTemp = getHoursForGridFlow(timeFlg, ldtPulishTime);
+
+            hoursForGridTemp.forEach(h -> {
+                try {
+                    String dt1 = h.format(DATE_TIME_FORMATTER1);
+                    String fileName = String.format("%s,%s,%s,%s,%s.json", prefix, publishDT, ensemble1, hour, dt1);
+                    if (!isSuccess(fileName)) {
+                        String url2 = String.format("%s/getForecastHydroMatrixByWrfRDField?RDField=%s&domain=%s&publishDT=%s&ensemble=%s&step=%s&DT1=%s",
+                                BASE_URL, rdField, domain, publishDT, ensemble1.equals("MPSE") ? "" : ensemble1, hour, dt1);
+                        log.info("下载{}:{}", fileName, url2);
+                        String result = getResult(url2);
+                        writeFile(result, downloadFilePath + fileName);
+                        Cache cache = cacheManager.getCache(CACHE_NAME_DOWNLOAD_DATA);
+                        cache.put(fileName, System.currentTimeMillis());
+                        log.info("下载{}成功", fileName);
+                        try {
+                            Thread.sleep(100);
+                        } catch (InterruptedException e) {
+                            // ignore...
+                        }
+                    } else {
+                        log.info("{}已下载", fileName);
+                    }
+
+                } catch (Exception e) {
+                    log.warn("下载失败", e);
+                }
+            });
+        } catch (Exception e) {
+            log.error("下载失败", e);
+        }
+    }
+
+
+    private boolean isSuccess(String fileName) {
+        Cache cache = cacheManager.getCache(CACHE_NAME_DOWNLOAD_DATA);
+        return cache.get(fileName) != null;
+    }
 
     /**
      * 下载澜沧江上游高程带融雪覆盖预报
@@ -229,99 +488,36 @@ public class QxybDataService {
         log.info("###结束下载澜沧江上游高程带融雪覆盖预报,批次:{}", publishDt);
     }
 
-    /**
-     * 下载澜沧江上游高程带降水和气温预报
-     */
-    private void getDomainTempAndRain(String startTime, String endTime) {
-        List<PublishInfo> publishList = null;
-        String divisionCode = "LCJ_DGD";
-        int hour = 24;
-        try {
-            String url = BASE_URL + "/getForePrecipPublishDT?divisionCode=" + divisionCode + "&hour=" + hour + "&ensemble=" + ensemble + "&startTime=" + startTime + "&endTime=" + endTime;
-            log.info("下载澜沧江上游高程带降水和气温预报批次:{}", url);
-            String batchResult = getResult(url);
-            JSONObject json = JSONObject.parseObject(batchResult);
-            publishList = json.getJSONArray("data").toJavaList(PublishInfo.class);
-            writeFile(batchResult, downloadFilePath + "DomainBatch," + divisionCode + "," + hour + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-            log.info("下载成功");
-        } catch (Exception e) {
-            log.warn("下载失败", e);
-            return;
-        }
-
-        for (PublishInfo publishInfo : publishList) {
-            try {
-                String url2 = BASE_URL + "/getForePrecipData?publishDT=" + publishInfo.getPublishDT() + "&divisionCode=" + divisionCode + "&hour=" + hour + "&ensemble=" + ensemble;
-                log.info("下载澜沧江上游高程带降水数据(日尺度):{}", url2);
-                String result = getResult(url2);
-                writeFile(result, downloadFilePath + "DomainRain," + publishInfo.getPublishDT() + "," + divisionCode + "," + hour + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-                log.info("下载成功");
-            } catch (Exception e) {
-                log.warn("下载失败", e);
-                return;
-            }
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // ignore...
-            }
-        }
-
-        for (PublishInfo publishInfo : publishList) {
-            try {
-                String url2 = BASE_URL + "/getT_Forecast_24h_avg_division?divisionCode=" + divisionCode + "&ensemble=" + ensemble + "&publishDT=" + publishInfo.getPublishDT();
-                log.info("下载澜沧江上游高程带气温数据(日尺度):{}", url2);
-                String result = getResult(url2);
-                writeFile(result, downloadFilePath + "DomainTemp," + publishInfo.getPublishDT() + "," + divisionCode + "," + hour + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-                log.info("下载成功");
-            } catch (Exception e) {
-                log.warn("下载失败", e);
-                return;
-            }
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // ignore...
-            }
-        }
-
-    }
-
     /**
      * 下载流域站点气温预报
      */
     private void downloadStationTempData(String startTime, String endTime) {
+
         List<PublishInfo> publishList = null;
         try {
-            String url = BASE_URL + "/getForeTempPublishDT?RDField=" + RDField + "&ensemble=" + ensemble + "&startTime=" + startTime + "&endTime=" + endTime;
-            log.info("下载流域站点气温预报批次:{}", url);
+
+            String rdField = "LCJDomain";
+            String batchFileName = String.format("%s,%s,%s.json", "TempSiteBatch", forFileName(startTime), forFileName(endTime));
+            String url = String.format("%s/getForeTempPublishDT?RDField=%s&startTime=%s&endTime=%s",
+                    BASE_URL, rdField, startTime, endTime);
+            log.info("下载{}:{}", batchFileName, url);
             String batchResult = getResult(url);
+            writeFile(batchResult, downloadFilePath + batchFileName);
+            log.info("下载{}成功", batchFileName);
             JSONObject json = JSONObject.parseObject(batchResult);
             publishList = json.getJSONArray("data").toJavaList(PublishInfo.class);
-            writeFile(batchResult, downloadFilePath + "TempSiteBatch," + this.toyyyy_MM_dd_HH_mm_ss(startTime) + "," + this.toyyyy_MM_dd_HH_mm_ss(endTime) + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-            log.info("下载成功");
-        } catch (Exception e) {
-            log.warn("下载失败", e);
-            return;
-        }
+            Map<String, List<PublishInfo>> publishDtMap = publishList.stream()
+                    .collect(Collectors.groupingBy(e -> e.getPublishDT(), Collectors.toList()));
 
-        for (PublishInfo publishInfo : publishList) {
-            try {
-                int hour = 1;
-                String getForeTempUrl = BASE_URL + "/getForeTempData?publishDT=" + publishInfo.getPublishDT() + "&RDField=" + RDField + "&ensemble=" + ensemble;
-                log.info("下载流域站点气温预报数据(小时尺度):{}", getForeTempUrl);
-                String result = getResult(getForeTempUrl);
-                writeFile(result, downloadFilePath + "TempSiteData," + publishInfo.getPublishDT() + "," + hour + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-                log.info("下载成功");
-            } catch (Exception e) {
-                log.warn("下载失败", e);
-            }
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // ignore...
-            }
+            publishDtMap.forEach((k, v) -> {
+                v.add(new PublishInfo(k, "MPSE", getPublishTime(k)));
+                v.forEach(p -> {
+                    downloadStationTempData(rdField, p, 1);
+                });
 
+            });
+        } catch (Exception e) {
+            log.error("下载失败", e);
         }
 
     }
@@ -330,129 +526,55 @@ public class QxybDataService {
      * 网格降水/温度预报
      */
     private void downloadGridRainAndTempData(String startTime, String endTime) {
+
         List<PublishInfo> publishList = null;
         try {
-            String url = BASE_URL + "/getForecastBaseInfo?RDField=" + RDField + "&domain=" + domain + "&ensemble=" + ensemble + "&startTime=" + startTime + "&endTime=" + endTime;
-            log.info("下载网格降水/温度预报批次:{}", url);
+            String rdField = "LCJDomain";
+            String domain = "domain02";
+            String batchFileName = String.format("%s,%s,%s.json", "GridBatch", forFileName(startTime), forFileName(endTime));
+            String url = String.format("%s/getForecastBaseInfo?RDField=%s&domain=%s&startTime=%s&endTime=%s",
+                    BASE_URL, rdField, domain, startTime, endTime);
+            log.info("下载{}:{}", batchFileName, url);
             String batchResult = getResult(url);
+            writeFile(batchResult, downloadFilePath + batchFileName);
+            log.info("下载{}成功", batchFileName);
             JSONObject json = JSONObject.parseObject(batchResult);
             publishList = json.getJSONObject("data").getJSONArray("publishInfoList").toJavaList(PublishInfo.class);
-            writeFile(batchResult, downloadFilePath + "GridBatch," + this.toyyyy_MM_dd_HH_mm_ss(startTime) + "," + this.toyyyy_MM_dd_HH_mm_ss(endTime) + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-            log.info("下载成功");
-        } catch (Exception e) {
-            log.warn("下载失败", e);
-            return;
-        }
-
-
-        for (PublishInfo publishInfo : publishList) {
-            String publishTime = publishInfo.getPublishTime();
-            String publishDT = publishInfo.getPublishDT();
-            String[] strings = publishDT.split("_");
-            String timeFlg = strings[1];
-            LocalDateTime ldtPulishTime = LocalDateTime.parse(publishTime, DATE_TIME_FORMATTER);
-            List<LocalDateTime> hoursForGridRain = getHoursForGridRain(timeFlg, ldtPulishTime);
-            hoursForGridRain.forEach(hour -> {
-                try {
-                    String dt1 = hour.format(DATE_TIME_FORMATTER1);
-                    String dt2 = hour.plusHours(1).format(DATE_TIME_FORMATTER1);
-                    String url2 = BASE_URL + "/getForecastMatrixByWrfRDField?RDField=" + RDField + "&publishDT=" + publishInfo.getPublishDT() + "&ensemble=" + publishInfo.getEnsemble() + "&domain=" + domain + "&content=precipitation&DT1=" + dt1 + "&DT2=" + dt2 + "&step=" + 1;
-                    log.info("下载网格降水预报数据(小时尺度):{}", url2);
-                    String result = getResult(url2);
-                    writeFile(result, downloadFilePath + "RainGrid," + publishInfo.getPublishDT() + "," + dt1 + "," + 1 + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-                    log.info("下载成功");
-                } catch (Exception e) {
-                    log.warn("下载失败", e);
-                }
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    // ignore...
-                }
-            });
-
-            List<LocalDateTime> daysForGridRain = getDaysForGridRain(timeFlg, ldtPulishTime);
-            daysForGridRain.forEach(day -> {
-                try {
-                    String dt1 = day.format(DATE_TIME_FORMATTER1);
-                    String dt2 = day.plusDays(1).format(DATE_TIME_FORMATTER1);
-                    String url2 = BASE_URL + "/getForecastMatrixByWrfRDField?RDField=" + RDField + "&publishDT=" + publishInfo.getPublishDT() + "&ensemble=" + publishInfo.getEnsemble() + "&domain=" + domain + "&content=precipitation&DT1=" + dt1 + "&DT2=" + dt2 + "&step=" + 24;
-                    log.info("下载网格降水预报数据(日尺度):{}", url2);
-                    String result = getResult(url2);
-                    writeFile(result, downloadFilePath + "RainGrid," + publishInfo.getPublishDT() + "," + dt1 + "," + 24 + "," + ensemble + "," + System.currentTimeMillis() + ".json");
-                    log.info("下载成功");
-                } catch (Exception e) {
-                    log.warn("下载失败", e);
-                }
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    // ignore...
-                }
-            });
+            Map<String, List<PublishInfo>> publishDtMap = publishList.stream()
+                    .collect(Collectors.groupingBy(e -> e.getPublishDT(), Collectors.toList()));
+
+            publishDtMap.forEach((k, v) -> {
+                //v.add(new PublishInfo(k, "MPSE", getPublishTime2(k)));
+                v.forEach(p -> {
+                    downloadGridTempData(rdField, domain, p, 1);
+                    downloadGridRainData(rdField, domain, p, 1);
+                    downloadGridRainData(rdField, domain, p, 24);
+                    downloadGridFlowData(rdField, domain, p, 1);
+                });
 
-            List<LocalDateTime> hoursForGridTemp = getHoursForGridTemp(timeFlg, ldtPulishTime);
-            hoursForGridTemp.forEach(hour -> {
-                try {
-                    String dt1 = hour.format(DATE_TIME_FORMATTER1);
-                    String url2 = BASE_URL + "/getForecastMatrixByWrfRDField?RDField=" + RDField + "&publishDT="
-                            + publishInfo.getPublishDT() + "&ensemble="
-                            + publishInfo.getEnsemble() + "&domain="
-                            + domain + "&content=2mTC&DT1="
-                            + dt1 + "&step=" + 1;
-                    log.info("下载网格气温预报数据(小时尺度):{}", url2);
-                    String result = getResult(url2);
-                    writeFile(result, downloadFilePath + "TempGrid," + publishInfo.getPublishDT()
-                            + "," + dt1 + "," + 1 + "," + ensemble + ","
-                            + System.currentTimeMillis() + ".json");
-                    log.info("下载成功");
-                } catch (Exception e) {
-                    log.warn("下载失败", e);
-                }
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    // ignore...
-                }
             });
-
-            hoursForGridRain.forEach(hour -> {//网格流量与网格降水预报预见期一样
-                try {
-                    String dt1 = hour.format(DATE_TIME_FORMATTER1);
-                    String url2 = BASE_URL + "/getForecastHydroMatrixByWrfRDField?RDField=" + RDField + "&publishDT="
-                            + publishInfo.getPublishDT() + "&ensemble="
-                            + publishInfo.getEnsemble() + "&domain="
-                            + domain + "&content=2mTC&DT1="
-                            + dt1 + "&step=" + 1;
-                    log.info("下载网格流量预报数据(小时尺度):{}", url2);
-                    String result = getResult(url2);
-                    writeFile(result, downloadFilePath + "FlowGrid," + publishInfo.getPublishDT()
-                            + "," + dt1 + "," + 1 + "," + ensemble + ","
-                            + System.currentTimeMillis() + ".json");
-                    log.info("下载成功");
-                } catch (Exception e) {
-                    log.warn("下载失败", e);
-                }
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    // ignore...
-                }
-            });
-
-
+        } catch (Exception e) {
+            log.error("下载失败", e);
         }
 
     }
 
+    private String getPublishTime(String publishDt) {
+        return LocalDateTime
+                .parse(publishDt, Constant.BATCH_DATE_TIME_FORMATTER2)
+                .plusHours(8)
+                .format(DATE_TIME_FORMATTER);
+    }
+
     /**
      * 转换格式
      *
-     * @param str
+     * @param datetime
      * @return
      */
-    private String toyyyy_MM_dd_HH_mm_ss(String str) {
-        return str.replace("-", "_").replace(":", "_").replace(" ", "_");
+    private String forFileName(String datetime) {
+        LocalDateTime time = LocalDateTime.parse(datetime, DATE_TIME_FORMATTER);
+        return time.format(DATE_TIME_FORMATTER1);
     }
 
     private String getResult(String url) {
@@ -554,6 +676,35 @@ public class QxybDataService {
         return list;
     }
 
+
+    /**
+     * 流域网格流量预报(小时尺度)小时列
+     *
+     * @param timeFlg 日批次:00、06、12、18
+     * @param time    发布时间(北京时间)
+     * @return
+     */
+    private static List<LocalDateTime> getHoursForGridFlow(String timeFlg, LocalDateTime time) {
+        List<LocalDateTime> list = new ArrayList<>();
+        LocalDateTime startTime = time.plusHours(12);
+        int hours = 0;
+        switch (timeFlg) {
+            case "00":
+            case "06":
+            case "18":
+                hours = 84;
+                break;
+            case "12":
+                hours = 348;
+                break;
+            default:
+                throw new RuntimeException("日批次号无效");
+                //break;
+        }
+        for (int i = 0; i < hours; list.add(startTime.plusHours(i++))) ;
+        return list;
+    }
+
     /**
      * 流域网格降水预报(日尺度)日期序列
      *
@@ -578,11 +729,11 @@ public class QxybDataService {
                 break;
             case "12":
                 days = 14;
-                startTime = LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.of(14, 0));
+                startTime = LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.of(8, 0));
                 break;
             case "18":
                 days = 3;
-                startTime = LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.of(8, 0));
+                startTime = LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.of(14, 0));
                 break;
             default:
                 throw new RuntimeException("日批次号无效");

+ 28 - 0
tongfei_river_data_collection/src/main/resources/ehcache.xml

@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd"
+         updateCheck="false">
+
+    <!--diskStore: 为缓存路径,ehcache分为内存和磁盘两级,此属性定义磁盘的缓存位置-->
+    <diskStore path="./cache" />
+    <!--默认缓存策略,当ehcache找不到定义的缓存时,则使用这个缓存策略。只能定义一个。-->
+    <defaultCache
+            maxElementsInMemory="10000"
+            eternal="true"
+            overflowToDisk="true"
+            maxElementsOnDisk="0"
+            diskPersistent="true"
+            memoryStoreEvictionPolicy="LRU"/>
+
+    <!--自定缓存策略,为自定义的缓存策略-->
+    <cache name="cache_downloaded_data"
+           maxElementsInMemory="10000"
+           eternal="true"
+           overflowToDisk="true"
+           maxElementsOnDisk="0"
+           diskPersistent="true"
+           memoryStoreEvictionPolicy="LRU">
+        <!--<cacheEventListenerFactory class="net.sf.ehcache.distribution.RMICacheReplicatorFactory" />
+        <bootstrapCacheLoaderFactory class="net.sf.ehcache.distribution.RMIBootstrapCacheLoaderFactory"/>-->
+    </cache>
+
+</ehcache>