本文章主要是通过基础篇应用Executor框架
一、如何寻找可并行的任务
为了使用Executor框架,你一定要把任务描述成Runnable,所以要设计这样的框架,主要是要做好确定任务的边界:一个好的任务边界是非常困难,因为涉及到任务的划分,每个任务有特定的运行时间,如果你划分的任务运行的时间不一致(运行长的时间的任务远远大于运行短时间的任务)时候,那么就会严重影响你应用程序的并发性,下面有具体的例子演示。
下面的例子主要是说明应用程序如何进行并行化
二、举例子
下面是一个用户请求页面的例子:
也就是页面的渲染:页面包含了文本文字和图片,下面有几个版本都是对这个需求进行并行化使得应用程序达到更好的性能。
版本一、使用单线程的思想开发这个应用程序如:
public class SingleThreadRenderer {
void renderPage(CharSequence source) {
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for (ImageInfo imageInfo : scanForImageInfo(source))
imageData.add(imageInfo.downloadImage());
for (ImageData data : imageData)
renderImage(data);
}
}
出现的问题:由于下载文本文字十分的快,但是下载图片涉及到I/O瓶颈,在下载图片时候cup做了很少的事情。如此这样的设计就会导致用户等待很长的时候才能看到请求的页面。
解决方案:充分利用并发性来解决出现的问题
版本二:
用Future和ExecutorService解决这个问题,把下载图片的任务全部提交给Executor框架,返回一个Future,然后调用future.get()方法(等待call方法中返回的结果集)。
public class FutureRenderer {
private final ExecutorService executor = ...;
void renderPage(CharSequence source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task =
new Callable<List<ImageData>>() {
public List<ImageData> call() {
List<ImageData> result
= new ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos)
result.add(imageInfo.downloadImage());
return result;
}
};
Future<List<ImageData>> future = executor.submit(task);
renderText(source);
try {
List<ImageData> imageData = future.get();
for (ImageData data : imageData)
renderImage(data);
} catch (InterruptedException e) {
// Re-assert the thread's interrupted status
Thread.currentThread().interrupt();
// We don't need the result, so cancel the task too
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
该类是使用了两个任务:一个渲染text,另外一个是下载图片,如果渲染text比下载图片快很多,那么这个划分的并发程序与单线程程序差不多,那么如何进一步寻找应用程序的并发呢?请看下一个版本
该版本的优点:用户请求能够很快响应出页面文本信息
缺点: 图片的下载,要全部下载完成后才能够渲染
记住一条原则:
真正高性能并发的程序是多个独立的相同任务并发的执行也就是说多个任务处理的时间差不多
版本三:
基于上个版本的劣势,那么最理想的做法应该是下载一张图片应该上马响应给客户。接下来该版本说明如何解决这个问题,讲这个问题之前要说明一下一个类和里一个接口。
CompletionService接口:
将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。也就是说不需要等待全部的任务的结果完成后才返回,只要有任务的结果就会存放在一个队列中;如果需要任务的结果只需要调用take方法即可。
下面讲一下实现该接口的ExecutorCompletionService类:
大概源码如:
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
由源码可以看出该类借助Future和一个阻塞队列LinkedBlockingQueue
从submit方法,传了一个QueueingFuture对象,该类的父类是FutureTask(它实现了Runnable接口),并且还是实现了done方法,表示任务完成后就把结果存放在阻塞队列中。
从源码中可以看出Executor使用了多个ExecutorCompletionService,即提交了多个任务。这个实现可以在submit中体会得到。
由于ExecutorCompletionService类可以对多个任务提交,不必要等待整个任务返回结果才能得到结果,只需要某个任务完成就可以利用take方法,获取结果。
本版本我了充分利用并发性能,我们为每张图片下载使用一个线程,放在Executor框架中执行即线程池。
代码:
public class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) { this.executor = executor; }
void renderPage(CharSequence source) {
final List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService =
new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
take方法相当于在阻塞队列拿去任务执行完的结果
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
从上面的代码可以看出多个ExecutorCompletionService共享一个Executor。
三、另外一个例子---旅游公司标价
需求说明:
想象一下一个旅游的网站,用户输入旅游数据和条件,要显示标价---不同的航空公司、旅店和租赁公司。得到公司的标价可能需要调用一个Web服务、访问数据库、执行EDI事务或者一些其他的机制。所以用户要得到相应的数据,就需要设计好应用程序才能得到及时的响应。应该对于那些不能及时响应的公司(设定一定的时间),那么页面应该忽略或者用一个默认的数据代替。
下面是其代码:
Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask());
// Render the page while waiting for the ad
Page page = renderPageBody();
Ad ad;
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cancel(true);
}
page.setAd(ad);
return page;
}
从一个公司获取标价与从另外一个公司获取标价是互不影响,所以为了使该应用程序得到更好的性能,则获取所有公司的标价放在Executor池中批量执行(获取每个公司的标价就使用一个线程)。
也就是说很容易的创建n个任务,把他们提交给线程池,获得Futures,从Future获取结果时加一个时间限制。
下面使用ExecutorService中的invokeAll方法(带有时间限制),
private class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;
...
public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}
public List<TravelQuote> getRankedTravelQuotes(
TravelInfo travelInfo, Set<TravelCompany> companies,
Comparator<TravelQuote> ranking, long time, TimeUnit unit)
throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company : companies)
tasks.add(new QuoteTask(company, travelInfo));
List<Future<TravelQuote>> futures =
exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes =
new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote(e));
}
}
Collections.sort(quotes, ranking);
return quotes;
}
分享到:
相关推荐
azkaban-executor-server-2.5.0-tar.gz azkaban-web-server-2.5.0-tar.gz azkaban-sql-script-2.5.0-tar.gz
高效率 快捷操作
xxl-job-executor-sample-springboot-2.2.0.jar 与xxl-job配套的执行器包,用于 Docker-compose搭建xxl-job(并配置Python3环境xxl-job执行器) 中相应的文件
该文档详细记录了Executor框架结构、使用示意图、ThreadPoolExecutor使用示例、线程池原理分析、几种常见线程池(FixedThreadPool、SingleThreadExecutor、CachedThreadPool)的详解以及线程池大小确定等内容
python库,解压后可用。 资源全名:qcg_pilotjob_executor_api-0.12.3-py3-none-any.whl
资源来自pypi官网。 资源全名:qcg_pilotjob_executor_api-0.12.3-py3-none-any.whl
普罗米修斯执行者 -am-executor是一个HTTP服务器,它从接收警报,并执行将警报详细信息设置为环境变量的给定命令。 建造 要求 1.克隆此存储库 git clone https://github.com/imgix/prometheus-am-executor.git 2....
async-global-executor-在async-executor和async-io之上构建的全局执行器async-global-executor在async-executor和async-io之上构建的全局执行器功能async-io:如果启用,则async-global -executor将在内部使用async...
NULL 博文链接:https://jsx112.iteye.com/blog/865884
xxl-job-executor-go-master
001 - Spark框架 - 简介.avi 002 - Spark框架 - Vs Hadoop.avi 003 - Spark框架 - 核心模块 - 介绍.avi 005 - Spark框架 - ...020 - Spark框架 - 核心概念 - Executor & Core & 并行度.avi 023 - SparkCore - 分布式
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
executor 查找工具,方便查找应用软件,可以自定义文件快捷键。
python库,解压后可用。 资源全名:Flask_Executor-0.3.1-py3-none-any.whl
包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。 异步计算的结果 。...
xxl-job-executor的gin中间件背景xxl-job-executor-go是xxl-job的golang执行器,可以独立运行,有时候我们要与项目或者框架(如:gin框架)集成起来合并为一个服务,本项目因此而生。执行器项目地址与gin集成示例...
执行程序同步sls错误日志 执行者-被动同步-存储在sls的服务的错误日志 在适当的内容的基础上添加服务责任人并@
Storm Executor Task QuartzJob流式框架定时任务处理框架