java多线程之优雅的并发批量查询大数据量

需求

某数据采集平台,统计各个设备每个月的到报情况

  • 建议使用hive,预处理或者跑任务的方式,因为mysql不适合做相关统计的功能。性能 hive>oracle>mysql

思路

  • 使用多线程并发查询每一天的数据在合并。

代码

/**
	 * 获取自动站站点汇总数据
	 * @param start
	 * @param end
	 */
	public Map<Integer,ElementReportVo>  getZidongzhanSum(LocalDate start,LocalDate end) throws Exception{
		//获取时间列表
		List<String> betweenDates = ReportStatisUtil.getBetweenDate(start,end);
		 //定义固定长度的线程池  防止线程过多
        ExecutorService executorService = Executors.newFixedThreadPool(15);

        List<ElementReportVo> result = new ArrayList<>();//返回结果

        List<Callable<List<ElementReportVo>>> tasks = new ArrayList<>();
	    for (String str : betweenDates) {
	    	 Callable<List<ElementReportVo>> blockZidongzhan = getBlockZidongzhan(str+" 00:00:00",str+" 23:59:59");
	         tasks.add(blockZidongzhan);
		}
	    //Future用于获取结果
        List<Future<List<ElementReportVo>>> futures=executorService.invokeAll(tasks);
        //处理线程返回结果
        if(futures!=null&&futures.size()>0){
            for (Future<List<ElementReportVo>> future:futures){    
             result.addAll(future.get());
            }
        }

        executorService.shutdown();//关闭线程池

        Map<Integer, ElementReportVo> zidongzhanMap = getZidongzhanMap(result);
		return zidongzhanMap;
	}
/**
	 * 查询自动站某一天的数据
	 * @param startTime
	 * @param endTime
	 * @return
	 */
	 private Callable<List<ElementReportVo>> getBlockZidongzhan(String startTime,String endTime){
		 Callable<List<ElementReportVo>> callable = new Callable<List<ElementReportVo>>() {
	            public List<ElementReportVo> call() throws Exception {
	                HashMap<String,String> maps = new HashMap<>();
	            	maps.put("startTime", startTime);
	            	maps.put("endTime", endTime);
	                List<ElementReportVo> list=stationQualityDetailInfoMapper.getZidongzhanLists(maps);
	                return list;
	            }
	        };
		 return callable; 
	 };
/**
	 * 获取自动站maps
	 * @param list
	 * @return
	 */
	public Map<Integer,ElementReportVo> getZidongzhanMap(List<ElementReportVo> list){
		Map<Integer, List<ElementReportVo>> collect = list.stream().collect(Collectors.groupingBy(ElementReportVo::getStationInfoId));
		Map<Integer,ElementReportVo> maps = new HashMap<Integer, ElementReportVo>();
		for (Entry<Integer, List<ElementReportVo>> elEntry : collect.entrySet()) {
			//一个月的数据
			List<ElementReportVo> elementReportVos = elEntry.getValue();
			int spd = elementReportVos.stream().mapToInt(ElementReportVo::getSpd).sum();
			int dir = elementReportVos.stream().mapToInt(ElementReportVo::getDir).sum();
			int hum = elementReportVos.stream().mapToInt(ElementReportVo::getHum).sum();
			int pa = elementReportVos.stream().mapToInt(ElementReportVo::getPa).sum();
			int rain = elementReportVos.stream().mapToInt(ElementReportVo::getRain).sum();
			int temp = elementReportVos.stream().mapToInt(ElementReportVo::getTemp).sum();

			ElementReportVo elementReportVo = maps.get(elEntry.getKey());
			if(elementReportVo == null){
				elementReportVo = new ElementReportVo();
			}
			elementReportVo.setStationInfoId(elEntry.getKey());
			elementReportVo.setDir(dir);
			elementReportVo.setHum(hum);
			elementReportVo.setPa(pa);
			elementReportVo.setRain(rain);
			elementReportVo.setSpd(spd);
			elementReportVo.setTemp(temp);
			maps.put(elEntry.getKey(), elementReportVo);
		}
		return maps;
	}