开发过程中,经常会遇到一键操作这样的功能,当数据量较少或者业务逻辑单一的时候没什么问题,但是当遇到数据量较大,而且业务逻辑较为复杂的情况,就比较棘手了,一键执行后,仿佛整个世界都在跟着转圈圈,直到请求超时,更有甚者,服务器直接驾崩。
最近,在帮客户做微信会员资料更新操作时就遇到这样的情况,最开始是最简单的遍历执行,发现行不通;于是又换用分页批量执行的方法,结果发现换汤不换药,还是请求超时;最后没办发,只能通过使用多线程机制,通过开启多线程,以增加系统开销来节省请求时间,终于把问题解决了。也许会有更好的方法,但是目前实现功能要紧。话不多说,关门,放代码。。。
对此,我写了两套方案,其一是通过实现Callable接口异步执行业务方法,最后返回执行结果;
package com.web.demo.thread;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.Callable;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;/** * 异步执行业务方法,实现Callable接口,返回执行结果 * * @author jiangyf */public class AsyncTask implements Callable
其二是通过继承Thread类异步执行业务方法,最后不返回执行结果。
package com.web.demo.thread;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutionException;/** * 异步执行业务方法,继承Thread类,不返回执行结果 * * @author jiangyf */public class AsyncJob extends Thread { // 执行任务名称 private String jobName; // 执行任务时间 private long jobTime; // 线程同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待 private CountDownLatch latch; public AsyncJob(String jobName, long jobTime, CountDownLatch latch) { super(); this.jobName = jobName; this.jobTime = jobTime; this.latch = latch; } public void run() { // 任务开始时间 long begin = System.currentTimeMillis(); System.out.println(jobName + " 任务开始...."); // 执行具体业务 try { Thread.sleep(jobTime * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(jobName + " 任务结束...."); // 任务结束时间 long end = System.currentTimeMillis(); jobTime = (end - begin) / 1000; System.out.println(jobName + "任务用时:" + jobTime + "秒"); if (latch != null) { // 任务完成,计数器减一 latch.countDown(); } } public static void main(String[] args) throws InterruptedException, ExecutionException { // 任务开始时间 long begin = System.currentTimeMillis(); // 初始化计数器 CountDownLatch latch = new CountDownLatch(2); // 初始化线程 AsyncJob job = new AsyncJob("running", 5, latch); AsyncJob job2 = new AsyncJob("walking", 2, latch); job.start(); job2.start(); // 全部任务执行完成前,会一直阻塞当前线程,直到计时器的值为0 latch.await(); // 任务结束时间 long end = System.currentTimeMillis(); System.out.println("任务总用时:" + ((end - begin) / 1000) + "秒"); }}
以下为业务代码实现示例:
// 会员资料更新失败的卡号 private static StringBuffer cardNoStr = new StringBuffer(); /** * 同步微信会员信息 */ public String syncVipInfo(String weixinId) { log.info("-------------同步微信会员信息开始"); String msg = ""; setWeixinInfo(weixinId); try { ListvipInfos = vipInfoDao.getByWeixinId(weixinId, null, null); int totalRows = vipInfos.size(); msg = "需要同步的微信会员数:" + totalRows; log.info(msg); if (totalRows == 0) { return msg; } Map map = PropertiesUtil .propertiesToMap("syncvipinfo.properties"); if (map.get("max_num") == null || map.get("max_thread") == null) { msg = "同步微信会员信息配置文件错误"; log.info(msg); return msg; } int offset = 0; int rows = Integer.parseInt(map.get("max_num")); int threadNum = Integer.parseInt(map.get("max_thread")); int count = totalRows / rows; if ((totalRows % rows) > 0) { count += 1; } if (count > threadNum) { if ((totalRows % threadNum) > 0) { count = threadNum - 1; rows = totalRows / count; if ((totalRows % count) > 0) { count += 1; } } else { count = threadNum; rows = totalRows / count; } } log.info("需要开启线程数量:" + count); // 任务开始时间 long begin = System.currentTimeMillis(); // 初始化计数器 CountDownLatch latch = new CountDownLatch(count); // 初始化线程池 ExecutorService executorService = Executors .newFixedThreadPool(count); // 初始化线程 for (int i = 0; i < count; i++) { Future future = executorService.submit(new AsyncTask( latch, vipInfos, offset, rows)); offset += rows; } executorService.shutdown(); log.info("线程池是否已关闭:" + executorService.isShutdown()); // 初始化线程 /* for (int i = 0; i < count; i++) { AsyncJob job = new AsyncJob(latch, vipInfos, offset, rows); job.start(); offset += rows; } */ if (count > 0) { // 全部任务执行完成前,会一直阻塞当前线程,直到计时器的值为0 latch.await(); } if (!"".equals(cardNoStr.toString())) { msg = "同步微信会员信息失败的会员卡号有:" + cardNoStr.toString().substring(0, cardNoStr.lastIndexOf(",")); } else { msg = "同步微信会员信息成功"; } // 任务结束时间 long end = System.currentTimeMillis(); log.info("同步微信会员信息任务用时:" + ((end - begin) / 1000) + "秒"); } catch (Exception e) { msg = "同步微信会员信息出现异常"; log.error(msg + e.getMessage()); } log.info("执行结果-------" + msg + "-------"); log.info("-------------同步微信会员信息结束"); return msg; } /** * 异步执行业务方法,实现Callable接口,返回执行结果 * * @author jiangyf */ static class AsyncTask implements Callable { private Logger log = LoggerFactory.getLogger(WXWebService.class); // 线程同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待 private CountDownLatch latch; private List vipInfos; private int offset; private int rows; public AsyncTask(CountDownLatch latch, List vipInfos, int offset, int rows) { super(); this.latch = latch; this.vipInfos = vipInfos; this.offset = offset; this.rows = rows; } public String call() { // 任务开始时间 long begin = System.currentTimeMillis(); log.info("当前线程更新会员数量:------" + vipInfos.size()); // 执行具体业务 for (int i = 0; i < rows; i++) { int num = offset + i; if (num >= vipInfos.size()) { break; } VipInfo vipInfo = vipInfos.get(num); if (vipInfo != null) { String cardNo = vipInfo.getCardNo(); WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo .getOpenId()); try { updateVipInfo(vipInfo, userInfo); log.info("会员卡号为" + cardNo + "的会员资料更新成功"); } catch (SQLException e) { cardNoStr.append(cardNo + ","); log.info("会员卡号为" + cardNo + "的会员资料更新失败"); } } } // 任务结束时间 long end = System.currentTimeMillis(); log.info("当前线程执行任务用时:" + ((end - begin) / 1000) + "秒"); if (latch != null) { latch.countDown();// 任务完成,计数器减一 } return cardNoStr.toString(); } } static class AsyncJob extends Thread { private Logger log = LoggerFactory.getLogger(WXWebService.class); // 线程同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待 private CountDownLatch latch; private List vipInfos; private int offset; private int rows; public AsyncJob(CountDownLatch latch, List vipInfos, int offset, int rows) { super(); this.latch = latch; this.vipInfos = vipInfos; this.offset = offset; this.rows = rows; } public void run() { // 任务开始时间 long begin = System.currentTimeMillis(); log.info("当前线程更新会员数量:------" + vipInfos.size()); // 执行具体业务 for (int i = 0; i < rows; i++) { int num = offset + i; if (num >= vipInfos.size()) { break; } VipInfo vipInfo = vipInfos.get(num); if (vipInfo != null) { String cardNo = vipInfo.getCardNo(); WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo .getOpenId()); try { updateVipInfo(vipInfo, userInfo); log.info("会员卡号为" + cardNo + "的会员资料更新成功"); } catch (SQLException e) { cardNoStr.append(cardNo + ","); log.info("会员卡号为" + cardNo + "的会员资料更新失败"); } } } // 任务结束时间 long end = System.currentTimeMillis(); log.info("当前线程执行任务用时:" + ((end - begin) / 1000) + "秒"); if (latch != null) { latch.countDown();// 任务完成,计数器减一 } } }
代码地址: