博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用多线程机制异步执行业务方法
阅读量:6637 次
发布时间:2019-06-25

本文共 9023 字,大约阅读时间需要 30 分钟。

hot3.png

    开发过程中,经常会遇到一键操作这样的功能,当数据量较少或者业务逻辑单一的时候没什么问题,但是当遇到数据量较大,而且业务逻辑较为复杂的情况,就比较棘手了,一键执行后,仿佛整个世界都在跟着转圈圈,直到请求超时,更有甚者,服务器直接驾崩。

    最近,在帮客户做微信会员资料更新操作时就遇到这样的情况,最开始是最简单的遍历执行,发现行不通;于是又换用分页批量执行的方法,结果发现换汤不换药,还是请求超时;最后没办发,只能通过使用多线程机制,通过开启多线程,以增加系统开销来节省请求时间,终于把问题解决了。也许会有更好的方法,但是目前实现功能要紧。话不多说,关门,放代码。。。

    对此,我写了两套方案,其一是通过实现Callable接口异步执行业务方法,最后返回执行结果;

package com.web.demo.thread;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.Callable;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;/** * 异步执行业务方法,实现Callable接口,返回执行结果 *  * @author jiangyf */public class AsyncTask implements Callable
>> { // 执行任务名称 private String taskName; // 执行任务时间 private long taskTime; // 线程同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待 private CountDownLatch latch; // 任务执行结果 List
> resultList; private Map
resultMap; public AsyncTask(String taskName, long taskTime, CountDownLatch latch) { super(); this.taskName = taskName; this.taskTime = taskTime; this.latch = latch; } @Override public List
> call() throws Exception { resultList = new ArrayList
>(); resultMap = new HashMap
(); // 任务开始时间 long begin = System.currentTimeMillis(); System.out.println(taskName + " 任务开始...."); // 执行具体业务 Thread.sleep(taskTime * 1000); System.out.println(taskName + " 任务结束...."); // 任务结束时间 long end = System.currentTimeMillis(); taskTime = (end - begin) / 1000; resultMap.put("taskName", taskName); resultMap.put("taskTime", taskTime); resultList.add(resultMap); System.out.println(taskName + "任务用时:" + taskTime + "秒"); if (latch != null) { // 任务完成,计数器减一 latch.countDown(); } return resultList; } public static void main(String[] args) throws InterruptedException, ExecutionException { // 任务开始时间 long begin = System.currentTimeMillis(); // 初始化计数器 CountDownLatch latch = new CountDownLatch(2); // 初始化线程池 ExecutorService executorService = Executors.newFixedThreadPool(2); // 初始化线程 Future
>> future = executorService .submit(new AsyncTask("running", 2, latch)); Future
>> future2 = executorService .submit(new AsyncTask("walking", 5, latch)); executorService.shutdown(); // 全部任务执行完成前,会一直阻塞当前线程,直到计时器的值为0 latch.await(); List
> result = future.get(); List
> result2 = future2.get(); result.addAll(result2); System.out.println(result.size()); // 任务结束时间 long end = System.currentTimeMillis(); System.out.println("任务总用时:" + ((end - begin) / 1000) + "秒"); }}

其二是通过继承Thread类异步执行业务方法,最后不返回执行结果。

package com.web.demo.thread;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutionException;/** * 异步执行业务方法,继承Thread类,不返回执行结果 *  * @author jiangyf */public class AsyncJob extends Thread {	// 执行任务名称	private String jobName;	// 执行任务时间	private long jobTime;	// 线程同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待	private CountDownLatch latch;	public AsyncJob(String jobName, long jobTime, CountDownLatch latch) {		super();		this.jobName = jobName;		this.jobTime = jobTime;		this.latch = latch;	}	public void run() {		// 任务开始时间		long begin = System.currentTimeMillis();		System.out.println(jobName + " 任务开始....");		// 执行具体业务		try {			Thread.sleep(jobTime * 1000);		} catch (InterruptedException e) {			e.printStackTrace();		}		System.out.println(jobName + " 任务结束....");		// 任务结束时间		long end = System.currentTimeMillis();		jobTime = (end - begin) / 1000;		System.out.println(jobName + "任务用时:" + jobTime + "秒");		if (latch != null) {			// 任务完成,计数器减一			latch.countDown();		}	}	public static void main(String[] args) throws InterruptedException,			ExecutionException {		// 任务开始时间		long begin = System.currentTimeMillis();		// 初始化计数器		CountDownLatch latch = new CountDownLatch(2);		// 初始化线程		AsyncJob job = new AsyncJob("running", 5, latch);		AsyncJob job2 = new AsyncJob("walking", 2, latch);		job.start();		job2.start();		// 全部任务执行完成前,会一直阻塞当前线程,直到计时器的值为0		latch.await();				// 任务结束时间		long end = System.currentTimeMillis();		System.out.println("任务总用时:" + ((end - begin) / 1000) + "秒");	}}

以下为业务代码实现示例:

// 会员资料更新失败的卡号	private static StringBuffer cardNoStr = new StringBuffer();	/**	 * 同步微信会员信息	 */	public String syncVipInfo(String weixinId) {		log.info("-------------同步微信会员信息开始");		String msg = "";		setWeixinInfo(weixinId);		try {			List
vipInfos = vipInfoDao.getByWeixinId(weixinId, null, null); int totalRows = vipInfos.size(); msg = "需要同步的微信会员数:" + totalRows; log.info(msg); if (totalRows == 0) { return msg; } Map
map = PropertiesUtil .propertiesToMap("syncvipinfo.properties"); if (map.get("max_num") == null || map.get("max_thread") == null) { msg = "同步微信会员信息配置文件错误"; log.info(msg); return msg; } int offset = 0; int rows = Integer.parseInt(map.get("max_num")); int threadNum = Integer.parseInt(map.get("max_thread")); int count = totalRows / rows; if ((totalRows % rows) > 0) { count += 1; } if (count > threadNum) { if ((totalRows % threadNum) > 0) { count = threadNum - 1; rows = totalRows / count; if ((totalRows % count) > 0) { count += 1; } } else { count = threadNum; rows = totalRows / count; } } log.info("需要开启线程数量:" + count); // 任务开始时间 long begin = System.currentTimeMillis(); // 初始化计数器 CountDownLatch latch = new CountDownLatch(count); // 初始化线程池 ExecutorService executorService = Executors .newFixedThreadPool(count); // 初始化线程 for (int i = 0; i < count; i++) { Future
future = executorService.submit(new AsyncTask( latch, vipInfos, offset, rows)); offset += rows; } executorService.shutdown(); log.info("线程池是否已关闭:" + executorService.isShutdown()); // 初始化线程 /* for (int i = 0; i < count; i++) { AsyncJob job = new AsyncJob(latch, vipInfos, offset, rows); job.start(); offset += rows; } */ if (count > 0) { // 全部任务执行完成前,会一直阻塞当前线程,直到计时器的值为0 latch.await(); } if (!"".equals(cardNoStr.toString())) { msg = "同步微信会员信息失败的会员卡号有:" + cardNoStr.toString().substring(0, cardNoStr.lastIndexOf(",")); } else { msg = "同步微信会员信息成功"; } // 任务结束时间 long end = System.currentTimeMillis(); log.info("同步微信会员信息任务用时:" + ((end - begin) / 1000) + "秒"); } catch (Exception e) { msg = "同步微信会员信息出现异常"; log.error(msg + e.getMessage()); } log.info("执行结果-------" + msg + "-------"); log.info("-------------同步微信会员信息结束"); return msg; } /** * 异步执行业务方法,实现Callable接口,返回执行结果 * * @author jiangyf */ static class AsyncTask implements Callable
{ private Logger log = LoggerFactory.getLogger(WXWebService.class); // 线程同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待 private CountDownLatch latch; private List
vipInfos; private int offset; private int rows; public AsyncTask(CountDownLatch latch, List
vipInfos, int offset, int rows) { super(); this.latch = latch; this.vipInfos = vipInfos; this.offset = offset; this.rows = rows; } public String call() { // 任务开始时间 long begin = System.currentTimeMillis(); log.info("当前线程更新会员数量:------" + vipInfos.size()); // 执行具体业务 for (int i = 0; i < rows; i++) { int num = offset + i; if (num >= vipInfos.size()) { break; } VipInfo vipInfo = vipInfos.get(num); if (vipInfo != null) { String cardNo = vipInfo.getCardNo(); WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo .getOpenId()); try { updateVipInfo(vipInfo, userInfo); log.info("会员卡号为" + cardNo + "的会员资料更新成功"); } catch (SQLException e) { cardNoStr.append(cardNo + ","); log.info("会员卡号为" + cardNo + "的会员资料更新失败"); } } } // 任务结束时间 long end = System.currentTimeMillis(); log.info("当前线程执行任务用时:" + ((end - begin) / 1000) + "秒"); if (latch != null) { latch.countDown();// 任务完成,计数器减一 } return cardNoStr.toString(); } } static class AsyncJob extends Thread { private Logger log = LoggerFactory.getLogger(WXWebService.class); // 线程同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待 private CountDownLatch latch; private List
vipInfos; private int offset; private int rows; public AsyncJob(CountDownLatch latch, List
vipInfos, int offset, int rows) { super(); this.latch = latch; this.vipInfos = vipInfos; this.offset = offset; this.rows = rows; } public void run() { // 任务开始时间 long begin = System.currentTimeMillis(); log.info("当前线程更新会员数量:------" + vipInfos.size()); // 执行具体业务 for (int i = 0; i < rows; i++) { int num = offset + i; if (num >= vipInfos.size()) { break; } VipInfo vipInfo = vipInfos.get(num); if (vipInfo != null) { String cardNo = vipInfo.getCardNo(); WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo .getOpenId()); try { updateVipInfo(vipInfo, userInfo); log.info("会员卡号为" + cardNo + "的会员资料更新成功"); } catch (SQLException e) { cardNoStr.append(cardNo + ","); log.info("会员卡号为" + cardNo + "的会员资料更新失败"); } } } // 任务结束时间 long end = System.currentTimeMillis(); log.info("当前线程执行任务用时:" + ((end - begin) / 1000) + "秒"); if (latch != null) { latch.countDown();// 任务完成,计数器减一 } } }

 

代码地址:

 

转载于:https://my.oschina.net/jiangyf/blog/805423

你可能感兴趣的文章
stm32 usb error : identifier "bool" is undefined
查看>>
Mycat配置文件rule.xml
查看>>
MATLAB图像处理工具箱
查看>>
【spring 注解 错误】使用controller 作为后台给前台ajax交互数据出错
查看>>
BZOJ3567 : AABB
查看>>
图解phpstorm常用快捷键
查看>>
h.264并行解码算法3D-Wave实现(基于多核共享内存系统)
查看>>
Scheduler
查看>>
iOS:抽屉侧滑动画两种形式(1、UIView侧滑 2、ViewController侧滑)
查看>>
HA模式手动切换namenode状态
查看>>
CSS优先级
查看>>
聊下 git remote prune origin
查看>>
[Intel Edison开发板] 04、Edison开发基于nodejs和redis的服务器搭建
查看>>
虚拟机NAT模式无法上网问题的解决办法
查看>>
python-学习笔记1-面向对象编程
查看>>
ArcGIS ArcMap 与 ArcServer关于Python的冲突
查看>>
ubuntu php 安装
查看>>
ASP.NET与ASP.NET Core用户验证Cookie并存解决方案
查看>>
Waiting For Debugger
查看>>
启动Tomcat报错:class path resource cannot be opened
查看>>