RT,事情的起因是这样的:
"之前的老代码执行缓慢问题找到了吗?"
"emm,那个是这样的,arthas不支持过长的方法,我们的那个代码我觉得挺短的"
"挺短的?1000行了都,给我拆了!"
"emm,好"
然后我小心翼翼的拆分了祖传的作文代码,经过三两下分析我发现想让我们的列表变快,除去SQL的优化之外只有异步和多线程处理了。于是我开始准备使用异步和多线程的方式去处理这个类似问题:
for (Object o : list){
sloveObject(o);
}
parallelStream
java8提供的方法,以多核计算著称,流并行处理程序的代替方法。并行多线底层使用ForkJoinPool,来看下这里的策略:
public void compute() {
Spliterator<S> rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink<S> taskSink = sink;
ForEachTask<S, T> task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}
策略是将任务分成两份,对于已经分开的部分依旧会继续分割,当值达到sizeThreshold的值时停止这个操作,这里是他forEach时执行的策略。但时按照订单列表的场景往往需要排序什么的,stream流的方法提供了保证原列表顺序的forEachOrdered,使用了不同的计算方法:
private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
Spliterator<S> rightSplit = task.spliterator, leftSplit;
long sizeThreshold = task.targetSize;
boolean forkRight = false;
while (rightSplit.estimateSize() > sizeThreshold &&
(leftSplit = rightSplit.trySplit()) != null) {
ForEachOrderedTask<S, T> leftChild =
new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
ForEachOrderedTask<S, T> rightChild =
new ForEachOrderedTask<>(task, rightSplit, leftChild);
task.addToPendingCount(1);
rightChild.addToPendingCount(1);
task.completionMap.put(leftChild, rightChild);
if (task.leftPredecessor != null) {
leftChild.addToPendingCount(1);
if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
task.addToPendingCount(-1);
} else {
leftChild.addToPendingCount(-1);
}
}
ForEachOrderedTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
task = leftChild;
taskToFork = rightChild;
} else {
forkRight = true;
task = rightChild;
taskToFork = leftChild;
}
taskToFork.fork();
if (task.getPendingCount() > 0) {
@SuppressWarnings("unchecked")
IntFunction<T[]> generator = size -> (T[]) new Object[size];
Node.Builder<T> nb = task.helper.makeNodeBuilder(
task.helper.exactOutputSizeIfKnown(rightSplit),
generator);
task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
task.spliterator = null;
}
task.tryComplete();
}
}
两个看起来大致策略是一样的,但是有一个地方是不同的:forEachOrdered构造器是这么写的:
protected ForEachOrderedTask(PipelineHelper<T> helper,
Spliterator<S> spliterator,
Sink<T> action) {
super(null);
this.helper = helper;
this.spliterator = spliterator;
this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
// Size map to avoid concurrent re-sizes
this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1));
this.action = action;
this.leftPredecessor = null;
}
相较于普通的foreach,forEachOrdered这里新加了一个当前公共线程池最大线程数二倍(最大16)的completionMap,来确认任务执行的并发数,最大的不同在于当前计算执行完的合并策略:
public final void tryComplete() {
CountedCompleter<?> a = this, s = a;
for (int c;;) {
if ((c = a.pending) == 0) {
a.onCompletion(s);
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
return;
}
}
不同于forEach的是,在a.pending的时候多了一句a.onCompletion(s):
public void onCompletion(CountedCompleter<?> caller) {
if (node != null) {
node.forEach(action);
node = null;
}
else if (spliterator != null) {
helper.wrapAndCopyInto(action, spliterator);
spliterator = null;
}
ForEachOps.ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this);
if (leftDescendant != null)
leftDescendant.tryComplete();
}
显然,未完成的任务,只要是子任务未执行完会循环等待,通俗点讲,forEachOrdered并不会发挥出并行分治,多任务计算的优势。所以对于parallelStream ,若想他能够实现多线程计算,不能和forEachOrdered共用,用了就和普通的for循环差距不大,所以需要处理完再进行重排序。
除了上面的问题,parallelStream不保证线程安全,且并行处理提高的是cpu运算并不适合IO频繁的场景。也就是说在上面的 sloveObject()方法中出现了list、map的add\put操作,需要注意线程安全问题,且处理的方法内部不能有过多的io操作。看了下我的sloveObject()方法,我还是放弃了这种简便的并行处理。因为无法自定义线程池(定义只能是启动通过vm参数来确认:),且我的方法io操作蛮多,可能后续会有扩展的地方,现在写成consumer的方式,不利于以后的维护。
但是使用这种方式,代码简洁,可读性高。然后这里有一篇介绍ForkJoin原理的blog:java8新特性(六):Stream多线程并行数据处理
Future
我印象里除了流处理之外,再就是future的异步处理和阻塞等待了,对于Future而言,线程池的高自由度和Runnable式的执行其实是比较适合使用的。相较于parallelStream,优势在于自由度。
这样一来需要注意线程安全问题,而不需要关注列表顺序,当然,若果在 sloveObject()方法中出现了list.add的操作则需要注意一下顺序重排。
List<Future> futureList = new ArrayList<>();
list.forEach(o -> futureList.add(executor.submit(() -> sloveObject(o))));
futureList.forEach(future -> {
try {
future.get();
}catch (Exception e){
log.error("ERROR:",e);
}
});
Future提供了不少接口,供查看任务执行的进度、状态以及等待结果返回、等一系列方法,相较于Java8提供的并发流,更加自由,这里有详细介绍:彻底理解Java的Future模式
针对我的问题,其实最开始的并发流就能解决,而Future也能更好的去处理这个问题,但是java要出13了,总不能一直停在1.5就提供的方法上,我相信高版本的处理和底层优化上一定是优于低版本的。
CompletableFuture
总是有人说Future是残缺的,主要体现在但不限于:多次不同计算的结果合并、任务完成的监听(便宜的isdone()并不好用),提供统一的结果完成事件。其实上面这些缺陷在1.8版本的时候java提供了一套完整的解决方案:CompletableFuture,除了实现Future的所有接口之外CompletableFuture也提供了一些船新的方法:JDK8新特性之CompletableFuture详解,这样以来我的写法发生了一些略微的变化:
List<CompletableFuture> completableFutures = new ArrayList<>();
list.forEach(o -> completableFutures.add(CompletableFuture.runAsync(() ->sloveObject(o),executor)));
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[list.size()]));
其同样是多线程处理,上面我使用了自定义的executor,也可以使用其默认的线程池(ForkJoin.commonPool,Java8官方指定公有线程池昂,通常可以传入参数去自定义,灵活度拉满)
总结
1.执行方式和效率
在选择这些方法作为我们列表底层Convert的操作步骤时我写了一个测试类来测试这几种不同的实现方式对于同一列表convert的所需时间,自定义线程设置为:
private static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(){{
setCorePoolSize(16);
setMaxPoolSize(64);
setQueueCapacity(16);
setThreadNamePrefix("TestThreadPoolTask_");
setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
setKeepAliveSeconds(60);
}};
对于公有的ForkJoin使用参数调控:
// 这里说下 对于默认的值 是Runtime.getRuntime().availableProcessors() - 1
// 由于foreachOrdered map的限制这里的值大于16是没有意义的
// 对标自己的PoolSize
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
对于处理方法考虑到convert方法中http请求的不确定性写个随机函数,生成100-999ms的消耗时间:
public static int buildRandom(int length) {
int num = 1;
double random = Math.random();
if (random < 0.1) {
random = random + 0.1;
}
for (int i = 0; i < length; i++) {
num = num * 10;
}
return (int) ((random * num));
}
private static void sleepStock(long time){
try {
TimeUnit.MILLISECONDS.sleep(time);
}catch (Exception e){
log.error("Error:",e);
}
}
这里模拟的方法如下:
private static void invoke(Integer in, List<Integer> sks){
int x = buildRandom(3);
sleepStock(x);
sks.add(in);
}
测试段落如下:
log.info("---start CompletableFuture---");
time = System.currentTimeMillis();
List<CompletableFuture> completableFutures = new ArrayList<>();
in.forEach(integer -> completableFutures.add(CompletableFuture.runAsync(() ->invoke(integer,sks),executor)));
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[in.size()]));
completableFutures.forEach(future -> {
try {
future.get();
}catch (Exception e){
log.error("ERROR:",e);
}
});
log.info("CompletableFuture -{},cost:{}",sks.size(),System.currentTimeMillis() -time);
sks.clear();
sleepStock(1000L);
log.info("---start Future---");
time = System.currentTimeMillis();
List<Future> futureList = new ArrayList<>();
in.forEach(integer -> futureList.add(executor.submit(() ->invoke(integer,sks))));
futureList.forEach(future -> {
try {
future.get();
}catch (Exception e){
log.error("ERROR:",e);
}
});
log.info("Future-{},cost:{}",sks.size(),System.currentTimeMillis() -time);
sks.clear();
sleepStock(1000L);
log.info("---start parallelStream forEach---");
time = System.currentTimeMillis();
in.parallelStream().forEach(integer ->invoke(integer,sks));
log.info("parallelStream-{},cost:{}",sks.size(),System.currentTimeMillis() -time);
sks.clear();
sleepStock(1000L);
log.info("---start parallelStream forEachOrdered---");
time = System.currentTimeMillis();
in.parallelStream().forEachOrdered(integer ->invoke(integer,sks));
// 重排序
log.info("parallelStream forEachOrdered-{},cost:{}",sks.size(),System.currentTimeMillis() -time);
在操作数有16的时候结果如下:
original size:17,om:[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
---start forEach---
For -17,cost:10579
---start CompletableFuture---
CompletableFuture -17,cost:1080
---start Future---
Future-17,cost:1004
---start parallelStream forEach---
parallelStream-17,cost:1001
---start parallelStream forEachOrdered---
parallelStream forEachOrdered-17,cost:8537
操作数有100的时候:
original size:101,om:[.......]
---start forEach---
For -101,cost:50772
---start CompletableFuture---
CompletableFuture -101,cost:1554
---start Future---
Future-101,cost:3352
---start parallelStream forEach---
parallelStream-101,cost:3703
---start parallelStream forEachOrdered---
parallelStream forEachOrdered-101,cost:48792
对比结果可以明显看出,在任务数较低时,当前配置的线程数16几个方式操作得出的结果除开坑爹的forEachOrdered和for循环,大多都达到了标准,当实际任务数需要排队等待时,CompletableFuture优势就展示了出来,这里说一下为什么CompletableFuture执行效率会比Future高,我们可以稍微看下CompletableFuture的实现逻辑:
public void run() {
CompletableFuture<Void> d; Runnable f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
f.run();
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
首先说明下,我是用的是可以自定义线程池无返回参数的runAsync,而其AsyncRun类继承 ForkJoinTask,实现 Runnable, AsynchronousCompletionTask ,Future则是最终交给了FutureTask的run方法:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
粗看的话是CompletableFuture.runAsync底层使用Runnable的run方法,是无返回值的,这里造成了变量不一样,修改程序:
in.forEach(integer -> completableFutures.add(CompletableFuture.supplyAsync(() ->invoke(integer,sks),executor)));
得到新结果:依旧是是completableFutures快:
CompletableFuture -101,cost:1370
Future-101,cost:1985
此处supplyAsync是由返回值的,同样其AsyncSupply继承 ForkJoinTask,实现 Runnable, AsynchronousCompletionTask,为啥还能取到值?原因在于:
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
得益于CompletableFuture丰富的方法,在判断完成的时候直接将Supplier产生的值返回了,比上Future的Callable的call方法,精明和鸡贼了很多。快也就是情理之中了。
说了这么多 终于到了要做选择的时候了:首先受限于业务,目前的convert是无需返回值且无后续复杂操作的,便于扩展而言CompletableFuture是最优的选择,而parallelStream相较于其他的唯一的优势就是可读性.....何况convert里经常出现一些蜜汁高io的DBselect的操作。pass掉parallelStream,至于CompletableFuture和Future选谁我上面说这么多,懂的都懂。更何况业务场景有一次加载130-150个订单情况存在,所以我之前说的100个效率问题是真实存在的,到此我决定使用CompletableFuture。
2.计算原则和配置
确定了使用的方法之后又到了纯粹抱头哥的发呆时间:线程池怎么办。因为领导其实很反感API层面的线程池,因为伴随着线程池出现的问题往往包括但不限于:失败的异常处理,错误的核心线程数配置,虚假的阻塞执行策略,离谱的线程工厂类等等等,使用默认的又不太易于管理....
《Java并发编程实战》中给出一个计算公式:
Number = NCpu * Ucpu * ( 1 + W/C)
Number : 线程数量
NCpu : 处理器核数
UCpu : 期望cpu利用率
W/C : 等待时间与计算时间比
既然这样子,就生产一个bean作为convert方法的线程池把....
3.要注意的地方
- 多线程往往要看线程安全问题,得格外关注
- 合理配置线程池
- 寻找适合自己业务场景方法,不一定最快的就是最适合的
- Java高版本的方法大概率比低版的效率高,不论是8的HashMap改写还11的Jvm优化和GC的新增,总是向好的方向去发展,有机会别太留念老方法,毕竟9012了。
- 这期copy的太多源码 别说是我懒,是因为写不出来