`

定时JOB,去请求数据,并找出最新数据持久化

 
阅读更多

JOB:

 

/**
 * 同步警情json 到t_alarm。如果服务器down掉几天,会遗漏期间的数据
 * @author wj
 * @date 2016-12-30
 *
 */
public class AlarmJob implements Job {
	
	
	
	public static AtomicReference<Date>  maxBjsj  ;
	
	static{
		if(CacheUtils.get(Contants.jq_maxDateCache, Contants.jq_maxDate_key) == null){
			maxBjsj = new  AtomicReference<Date>();
		}else{
			maxBjsj = (AtomicReference<Date>)CacheUtils.get(Contants.jq_maxDateCache, Contants.jq_maxDate_key);
		}
	}
	
	public static int count = 0;
	
	//可以缓存到内存
//	@Cacheable(value="sysCache" )
	@Override
	public void execute(JobExecutionContext context) throws JobExecutionException {

		String url = Contants.alarm_url+DateUtils.getDate("yyyyMMdd");
		
		try {
			 long t1 = System.currentTimeMillis(); // 排序前取得当前时间  
			 
			if(new HttpClient().get(url,maxBjsj,Contants.TYPE_JQ)){
				
				 long t2 = System.currentTimeMillis(); // 排序后取得当前时间  
				  Calendar c = Calendar.getInstance();  
			        c.setTimeInMillis(t2 - t1);  
			        System.out.println("----第"+ ++count +"次 同步结束----"+"耗时: " + c.get(Calendar.MINUTE) + "分 "  
		                + c.get(Calendar.SECOND) + "秒 " + c.get(Calendar.MILLISECOND)  
		                + " 毫秒");
			}
//		
		} catch (Exception e) {
			e.printStackTrace();
		}
	
	}

	
}

 httpclient:

public boolean get(String url ,AtomicReference<Date> maxBjsj,String type)  throws Exception{
		CloseableHttpClient httpclient = HttpClients.createDefault();
		try {
			// 创建httpget.  
			HttpGet httpget = new HttpGet(url);
//			httpget.setParams(params);
			
			System.out.println("\n-----------------------");
			logger.debug("executing request " + httpget.getURI());
			// 执行get请求.  
			CloseableHttpResponse response = httpclient.execute(httpget);
			try {
				// 获取响应实体  
				HttpEntity entity = response.getEntity();
				
				// 打印响应状态  
				logger.info(response.getStatusLine().toString());	
				
				if(200==(response.getStatusLine().getStatusCode())){
					if (entity != null) {
						String s = EntityUtils.toString(entity, "UTF-8");
						// 打印响应内容长度  
						logger.debug("Response content length: " + entity.getContentLength());
						// 打印响应内容  
						logger.debug("Response content: " + s);
						
						Map<String,List<Map<String,Object> >> map = FastJsonUtil.stringToCollect(s);
						List<Map<String,Object> > data = map.get("data");
						
						data = getCurrentDateData(data, type);
						
						if( maxBjsj.get()==null ){ //只会执行第一次,如果缓存里没有值
							
//							maxBjsj =  new AtomicReference<Date>(getBJSJDate(data.get(0)))   ;
							maxBjsj.set(getBJSJDate(data.get(0),type)); 
						}else{
							data = filterDateGt(maxBjsj.get(), data,type);
						}
						
						
						List<Map<String,Object> > listWithoutDup = data;
						
						String jobName = "";
						String cacheName = "";
						String cacheKeyName = "";
						if(Contants.TYPE_JQ.equals(type)){
							//去重 ,获取下一次最大日期
							listWithoutDup =  withoutDuplicate(data, "JQBH",maxBjsj,type);  //警情去重
							jobName="警情";
							cacheName=Contants.jq_maxDateCache;
							cacheKeyName=Contants.jq_maxDate_key;
						}else if(Contants.TYPE_DPZD.equals(type)){  //没有主键,无法去重
							setMaxDate(data, maxBjsj,type);
							jobName="调派中队";
							cacheName=Contants.dpzd_maxDateCache;
							cacheKeyName=Contants.dpzd_maxDate_key;
						}else if(Contants.TYPE_DPCL.equals(type)){
							setMaxDate(data, maxBjsj,type);
							jobName="调派车辆";
							cacheName=Contants.dpcl_maxDateCache;
							cacheKeyName=Contants.dpcl_maxDate_key;
						}else{
							throw new RuntimeException("不应该到这");
						}
						
					
						
						boolean ret = true;
						if(!CollectionUtils.isEmpty(listWithoutDup)){
							for(Map<String,Object> cur: listWithoutDup){
								
								try{
									
									BeanMapper.setAndPersist(cur,type); //如果这里报错,因为已经在上面有了最大日期,下一次JOB、数据就为空除非有新数据
									
								}catch (org.springframework.dao.DuplicateKeyException e){ // 过滤掉之前日期,不应该到这
//									maxBjsj.set(null);  //清空最大日期,让下一次JOB继续插入(下一次数据也许会变)
									logger.error(e.getMessage()+"--duplicate: "+(String)cur.get("JQBH"));
									ret = false;
								}
								
							}
							if(ret)
							  logger.info("-------------【"+jobName+"】成功导入"+listWithoutDup.size() +"条数据-----------------------");
						}else{
							logger.info("-------------【"+jobName+"】没有最新数据-----------------------");
							ret=  false;
						}
						
						
						// 可以比较一下缓存里的日期和这里的最大日期。不如直接put;比较还得get再比较,然后再put
						//将最大日期缓存磁盘(用于防止server关闭)。如果没有关闭,最大日期值 依然在外面静态变量里
						CacheUtils.put(cacheName, cacheKeyName,maxBjsj);
						//看看有没有持久化成功
						org.springframework.util.Assert.isTrue(comparDate(((AtomicReference<Date>)CacheUtils.get(cacheName, cacheKeyName)).get(), maxBjsj.get())==0   ,"缓存日期和刚才日应该相等");
						
						CacheUtils.flush(cacheName);
						
						
						return ret;
						
						
					}
				}else {
					logger.warn("------请求["+url+"]错误:"+response.getStatusLine().toString());//   HTTP/1.1 404 Not Found
					return false;
					
				}
				
			} finally {
				response.close();
				
			}
		} catch (ClientProtocolException e) {
			e.printStackTrace();
		} catch (ParseException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}catch (SQLIntegrityConstraintViolationException e) {
			logger.error("--------------主键冲突异常:"+e.getMessage()+"-----------------------");
		} finally {
			// 关闭连接,释放资源  
			try {
				httpclient.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return false;
		
	}

 其他私有辅助方法:

/**
	 * 去重,如果重复,只保留第一个。并获得最大日期
	 * @author wj
	 * @param list
	 * @param key 要去重的map的key
	 * @return
	 */
	private List<Map<String,Object>>  withoutDuplicate(List<Map<String,Object>> list,String key,AtomicReference<Date> maxBjsj,String type){
		
			if(!CollectionUtils.isEmpty(list)){
				 List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>();
			        Set<Object> keysSet = new HashSet<Object>();
			        for(Map<String,Object> map : list){
			            Object keys = map.get(key);
			            int beforeSize = keysSet.size();
			            keysSet.add(keys);
			            int afterSize = keysSet.size();
			            if(afterSize == beforeSize + 1){
			                tmpList.add(map);
			                
			                Date date =  getBJSJDate(map,type);
			                if(comparDate(date, maxBjsj.get())==1){
//			                	if(date>maxBjsj){
//			                	maxBjsj =  new AtomicReference<Date>(date)    ;
			                	maxBjsj.set(date);    
			                }
			                
			            }
			        }
			        return tmpList;
				
			}
			return null;
		  
	}
	private void  setMaxDate(List<Map<String,Object>> list,AtomicReference<Date> maxBjsj,String type){
		
		if(!CollectionUtils.isEmpty(list)){
			for(Map<String,Object> map : list){
					Date date =  getBJSJDate(map,type);
					if(comparDate(date, maxBjsj.get())==1){
						maxBjsj.set(date);    
					}
					
			}
			
		}
		
	}
	/**
	 * 查询日期大于{@code date} 的数据
	 * @param date
	 * @param list
	 * @return
	 */
	private List<Map<String,Object>>  filterDateGt(Date date ,List<Map<String,Object>> list,String type){
		
		if(!CollectionUtils.isEmpty(list)){
			List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>();
			for(Map<String,Object> map : list){
				Date date1 = getBJSJDate(map,type);
				if(comparDate(date1, date) == 1){
					tmpList.add(map);
				}
			}
			return tmpList;
			
		}
		return null;
		
	}
	
	private Date getBJSJDate(Map<String,Object> map,String type){
		String BJSJ ="";
		if(Contants.TYPE_JQ.equals(type)){
			 BJSJ = (String)map.get("BJSJ");//报警时间  --是否对应到 ALARM_TIME
		}else if(Contants.TYPE_DPZD.equals(type)){
			 BJSJ = (String)map.get("DPSJ");
		}else if(Contants.TYPE_DPCL.equals(type)){//
			 BJSJ = (String)map.get("DPSJ");
		}else{
			throw new RuntimeException("不应该到这");
		}
		
		
		
		try {
			Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS");//2016/12/29 13:56:22.000
			return BJSJDate;
		} catch (java.text.ParseException e) {
			e.printStackTrace();
		} 
		return null;
	}
	
	public static int comparDate(Date DATE1, Date DATE2) {
        try {
        	if(DATE1 ==null && DATE2==null){
        		return 0;
        	}else if(DATE1 ==null ||  DATE2==null){
        		throw new Exception();
        	}
            if (DATE1.getTime() > DATE2.getTime()) {
//                System.out.println("dt1 在dt2前");
                return 1;
            } else if (DATE1.getTime() < DATE2.getTime()) {
//                System.out.println("dt1在dt2后");
                return -1;
            } else {
                return 0;
            }
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        return 0;
    }
	
	
	public static List<Map<String,Object> > getCurrentDateData(List<Map<String,Object> >  list,String type) throws java.text.ParseException{
		
		if(!CollectionUtils.isEmpty(list)){
			List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>();
			for(Map<String,Object> map : list){
				
				if(Contants.TYPE_JQ.equals(type)){
					
					String BJSJ = (String)map.get("BJSJ");//报警时间  --是否对应到 ALARM_TIME
					Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000
					BJSJ = DateUtils.formatDate(BJSJDate);
					
					if(BJSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){
						tmpList.add(map);
					}
					
				}else if(Contants.TYPE_DPZD.equals(type)){
					
					String DPSJ = (String)map.get("DPSJ");
					Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000
					DPSJ = DateUtils.formatDate(DPSJDate);
					
					if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){
						tmpList.add(map);
					}
					
				}else if(Contants.TYPE_DPCL.equals(type)){
					
					String DPSJ = (String)map.get("DPSJ");
					Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2017/01/01 08:44:31.000
					DPSJ = DateUtils.formatDate(DPSJDate);
					
					if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){
						tmpList.add(map);
					}
					
				}else{
					throw new RuntimeException("不应该到这");
				}
				
			}
			
			return tmpList;
		
		}
		return null;
		
	}

 用到ehcache缓存最大时间到磁盘,缓存可以看我另一篇博客

0
0
分享到:
评论
1 楼 faradayroger 2017-01-09  
[color=green][color=red][/color][/color]不错不错,没看懂

相关推荐

Global site tag (gtag.js) - Google Analytics