需求
某数据采集平台,统计各个设备每个月的到报情况
- 建议使用hive,预处理或者跑任务的方式,因为mysql不适合做相关统计的功能。性能 hive>oracle>mysql
思路
- 使用多线程并发查询每一天的数据在合并。
代码
/**
* 获取自动站站点汇总数据
* @param start
* @param end
*/
public Map<Integer,ElementReportVo> getZidongzhanSum(LocalDate start,LocalDate end) throws Exception{
//获取时间列表
List<String> betweenDates = ReportStatisUtil.getBetweenDate(start,end);
//定义固定长度的线程池 防止线程过多
ExecutorService executorService = Executors.newFixedThreadPool(15);
List<ElementReportVo> result = new ArrayList<>();//返回结果
List<Callable<List<ElementReportVo>>> tasks = new ArrayList<>();
for (String str : betweenDates) {
Callable<List<ElementReportVo>> blockZidongzhan = getBlockZidongzhan(str+" 00:00:00",str+" 23:59:59");
tasks.add(blockZidongzhan);
}
//Future用于获取结果
List<Future<List<ElementReportVo>>> futures=executorService.invokeAll(tasks);
//处理线程返回结果
if(futures!=null&&futures.size()>0){
for (Future<List<ElementReportVo>> future:futures){
result.addAll(future.get());
}
}
executorService.shutdown();//关闭线程池
Map<Integer, ElementReportVo> zidongzhanMap = getZidongzhanMap(result);
return zidongzhanMap;
}
/**
* 查询自动站某一天的数据
* @param startTime
* @param endTime
* @return
*/
private Callable<List<ElementReportVo>> getBlockZidongzhan(String startTime,String endTime){
Callable<List<ElementReportVo>> callable = new Callable<List<ElementReportVo>>() {
public List<ElementReportVo> call() throws Exception {
HashMap<String,String> maps = new HashMap<>();
maps.put("startTime", startTime);
maps.put("endTime", endTime);
List<ElementReportVo> list=stationQualityDetailInfoMapper.getZidongzhanLists(maps);
return list;
}
};
return callable;
};
/**
* 获取自动站maps
* @param list
* @return
*/
public Map<Integer,ElementReportVo> getZidongzhanMap(List<ElementReportVo> list){
Map<Integer, List<ElementReportVo>> collect = list.stream().collect(Collectors.groupingBy(ElementReportVo::getStationInfoId));
Map<Integer,ElementReportVo> maps = new HashMap<Integer, ElementReportVo>();
for (Entry<Integer, List<ElementReportVo>> elEntry : collect.entrySet()) {
//一个月的数据
List<ElementReportVo> elementReportVos = elEntry.getValue();
int spd = elementReportVos.stream().mapToInt(ElementReportVo::getSpd).sum();
int dir = elementReportVos.stream().mapToInt(ElementReportVo::getDir).sum();
int hum = elementReportVos.stream().mapToInt(ElementReportVo::getHum).sum();
int pa = elementReportVos.stream().mapToInt(ElementReportVo::getPa).sum();
int rain = elementReportVos.stream().mapToInt(ElementReportVo::getRain).sum();
int temp = elementReportVos.stream().mapToInt(ElementReportVo::getTemp).sum();
ElementReportVo elementReportVo = maps.get(elEntry.getKey());
if(elementReportVo == null){
elementReportVo = new ElementReportVo();
}
elementReportVo.setStationInfoId(elEntry.getKey());
elementReportVo.setDir(dir);
elementReportVo.setHum(hum);
elementReportVo.setPa(pa);
elementReportVo.setRain(rain);
elementReportVo.setSpd(spd);
elementReportVo.setTemp(temp);
maps.put(elEntry.getKey(), elementReportVo);
}
return maps;
}