本笔记来源于 :尚硅谷JUC并发编程(对标阿里P6-P7)b站视频
文章来自: https://www.yuque.com/gongxi-wssld/csm31d/ln0mq1w7wp1oy99g https://www.yuque.com/liuyanntes/vx9leh/fpy93i https://blog.csdn.net/dolpin_ink/category_11847910.html
脑图 本地:尚硅谷JUC并发编程
在线:尚硅谷JUC并发编程
在线脑图加载时间超长。
1、Future和Callable接口
1.1 FutureTask 实现类
FutureTak(实现了x接口,x接口又继承了a和v接口)
在源码可以看到,他既继承了RunnableFuture
接口,也在构造方法中实现了Callable
接口(有返回值、可抛出异常)和Runnable
接口
(ctrl
+alt
+u
)
完成上面目的的代码 - 多线程/有返回/异步
一个主线程,一个mythread | 步执行了 | 返回了”hello callable”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class CompletableFutureDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask <>(new MyThread ()); Thread t1 = new Thread (futureTask,"t1" ); t1.start(); System.out.println(futureTask.get()); } }class MyThread implements Callable <String>{ @Override public String call () throws Exception { System.out.println("-----come in call() ----异步执行" ); return "hello Callable 返回值" ; } }
2、Future到CompletableFuture 2.1 Future优点
方案二,3个任务3个线程,利用线程池(假如每次new一个Thread,太浪费资源,会有GC这些工作),大概400毫秒 。
2.2 Future缺点 1 get()阻塞 一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class FutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask <String>(()->{ System.out.println(Thread.currentThread().getName()+"\t ------副线程come in" ); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } return "task over" ; }); Thread t1 = new Thread (futureTask,"t1" ); t1.start(); System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了" ); System.out.println(futureTask.get()); } }public class FutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask <String>(()->{ System.out.println(Thread.currentThread().getName()+"\t ------副线程come in" ); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } return "task over" ; }); Thread t1 = new Thread (futureTask,"t1" ); t1.start(); System.out.println(futureTask.get()); System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了" ); } }
2 isDone()轮询 利用if(futureTask.isDone())的方式使得他在结束之后才get(),但是也会消耗cpu
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class FutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException, TimeoutException { FutureTask<String> futureTask = new FutureTask <String>(()->{ System.out.println(Thread.currentThread().getName()+"\t ------副线程come in" ); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } return "task over" ; }); Thread t1 = new Thread (futureTask,"t1" ); t1.start(); System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了" ); while (true ){ if (futureTask.isDone()){ System.out.println(futureTask.get()); break ; }else { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("正在处理中------------正在处理中" ); } } } }
2.3 Future应用现状 对于简单的业务场景使用Future完全OK
回调通知
前面的isDone()
方法耗费cpu资源,一般应该还是利用回调函数 ,在Future结束时自动调用该回调函数。应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
创建异步任务
多个任务前后依赖可以组合处理(水煮鱼)
想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值,将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果
比如买鱼-加料-烹饪
对计算速度选最快完成的(并返回结果)
当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果。
…
3、CompletableFuture基本介绍
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture
3.1 CompletableFuture 1 public class CompletableFuture <T> implements Future <T>, CompletionStage<T> {
在Java 8中, CompletableFuture提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合CompletableFuture的方法。
它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。
它实现了Future
和Completion Stage
接口
3.2 CompletionStage
CompletionStage代表异步计算过程中的某一个阶段 , 一个阶段完成以后可能会触发另外一个阶段
一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.then Apply(x->square(x) ) .then Accept (x->System.out.print(x) ) .then Run() ->System.out.print In() )
,一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
3.3 核心的四个静态方法(分为两组)
runAsync无返回值 1 runAsync 1 public static CompletableFuture<Void> runAsync (Runnable runnable)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class CompletableFutureBuildDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(voidCompletableFuture.get()); } }
2 runAsync+线程池 1 2 public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class CompletableFutureBuildDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3 ); CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } },executorService); System.out.println(voidCompletableFuture.get()); } }
supplyAsync有返回值 3 supplyAsync 1 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class CompletableFutureBuildDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3 ); CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return "helllo supplyasync" ; }); System.out.println(objectCompletableFuture.get()); } }
4 supplyAsync+线程池 1 2 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class CompletableFutureBuildDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3 ); CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return "helllo supplyasync" ; },executorService); System.out.println(objectCompletableFuture.get()); } }
3.4 CompletableFuture使用演示(日常使用) 基本功能
CompletableFuture
可以完成Future
的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class CompletableFutureUseDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"----副线程come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1秒钟后出结果" +result); return result; }); System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务" ); System.out.println(objectCompletableFuture.get()); } }
减少阻塞和轮询whenComplete
CompletableFuture
通过whenComplete
来减少阻塞和轮询 (自动回调)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class CompletableFutureUseDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"--------副线程come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return result; }).whenComplete((v,e) -> { if (e == null ){ System.out.println("------------------计算完成,更新系统updataValue" +v); } }).exceptionally(e->{ e.printStackTrace(); System.out.println("异常情况" +e.getCause()+"\t" +e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务" ); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class CompletableFutureUseDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"--------副线程come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return result; },threadPool).whenComplete((v,e) -> { if (e == null ){ System.out.println("------------------计算完成,更新系统updataValue" +v); } }).exceptionally(e->{ e.printStackTrace(); System.out.println("异常情况" +e.getCause()+"\t" +e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务" ); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
异常情况的展示,设置一个异常 int i = 10 / 0 ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class CompletableFutureUseDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"--------副线程come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-----结果---异常判断值---" +result); if (result > 2 ){ int i = 10 / 0 ; } return result; },threadPool).whenComplete((v,e) -> { if (e == null ){ System.out.println("------------------计算完成,更新系统updataValue" +v); } }).exceptionally(e->{ e.printStackTrace(); System.out.println("异常情况" +e.getCause()+"\t" +e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务" ); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.5 CompletableFuture优点总结
异步任务结束时,会自动回调 某个对象的方法;
主线程设置好毁掉后,不再关心异步任务的执行,异步任务之间可以顺序执行
异步任务出错时,会自动回调某个对象的方法。
4、CompletableFuture案例精讲
4.1 编程必备技能准备 函数式接口
函数式接口 的定义:
任何接口 ,如果只包含唯一一个 抽象方法,那么它就是一个函数式接口 。对于函数式接口,我们可以通过lambda表达式 来创建该接口的对象。
1 2 3 public interface Runnable { public abstract void run () ; }
https://www.runoob.com/java/java8-functional-interfaces.html
Runnable
1 2 3 4 @FunctionalInterface public interface Runnable { public abstract void run () ; }
Function
1 2 3 4 @FunctionalInterface public interface Function <T, R> { R apply (T t) ; }
Consumer(消费接口)
1 2 3 4 @FunctionalInterface public interface Consumer <T> { void accept (T t) ; }
Supplier(生产接口)
1 2 3 4 5 6 7 8 9 10 @FunctionalInterface public interface Supplier <T> { T get () ; }
Biconsumer(Bi代表两个的意思,我们要传入两个参数,在上面的案例中是v和e)
1 2 3 4 5 @FunctionalInterface public interface BiConsumer <T, U> { void accept (T t, U u) ; }
函数式接口名称
方法名称
参数
返回值
Runnable
run
无参数
无返回值
Function
apply
1个参数
有返回值
Consume
accept
1个参数
无返回值
Supplier
get
没有参数
有返回值
Biconsumer
accept
2个参数
无返回值
链式调用、链式编程、链式写法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Chain { public static void main (String[] args) { new Student ().setId(1 ).setName("大卡" ).setMajor("cs" ); } }@NoArgsConstructor @AllArgsConstructor @Data @Accessors(chain = true) class Student { private int id; private String name; private String major; }
join和get对比
功能几乎一样,区别在于编码时是否需要抛出异常
get()方法需要抛出异常
join()方法不需要抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Chain { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "hello 12345" ; }); System.out.println(completableFuture.get()); } }public class Chain { public static void main (String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "hello 12345" ; }); System.out.println(completableFuture.join()); } }
4.2 实战精讲-比价网站case 需求 1 2 3 4 5 6 7 8 9 10 11 12 13 1 需求说明1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少2 输出返回: 出来结果希望是同款产品的在不同地方的价格清单列表, 返回一个List<String> 《mysql》in jd price is 88.05 《mysql》in dang dang price is 86.11 《mysql》in tao bao price is 90.43 3 解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表1 stepbystep , 按部就班, 查完京东查淘宝, 查完淘宝查天猫......2 all in ,万箭齐发,一口气多线程异步任务同时查询。。。
基本框架搭建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class Case { static List<NetMall> list = Arrays.asList( new NetMall ("jd" ), new NetMall ("dangdang" ), new NetMall ("taobao" ) ); public static List<String> getPrice (List<NetMall> list,String productName) { return list .stream() .map(netMall -> String.format(productName + " in %s price is %.2f" , netMall.getNetMallName(), netMall.calcPrice(productName))).collect(Collectors.toList()); } public static void main (String[] args) { long startTime = System.currentTimeMillis(); List<String> list1 = getPrice(list, "mysql" ); for (String element:list1){ System.out.println(element); } long endTime = System.currentTimeMillis(); System.out.println("---当前操作花费时间----costTime:" +(endTime-startTime)+"毫秒" ); } }class NetMall { @Getter private String netMallName; public NetMall (String netMallName) { this .netMallName = netMallName; } public double calcPrice (String productName) { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0 ); } }
从功能到性能:利用CompletableFuture
这里是利用异步线程,万箭齐发
此处用了两步流式编程 。
性能差距巨大
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public class Case { static List<NetMall> list = Arrays.asList( new NetMall ("jd" ), new NetMall ("dangdang" ), new NetMall ("taobao" ) ); public static List<String> getPrice (List<NetMall> list,String productName) { return list .stream() .map(netMall -> String.format(productName + " in %s price is %.2f" , netMall.getNetMallName(), netMall.calcPrice(productName))).collect(Collectors.toList()); } public static List<String> getPricesByCompletableFuture (List<NetMall> list,String productName) { return list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f" , netMall.getNetMallName(), netMall.calcPrice(productName)))) .collect(Collectors.toList()) .stream() .map(s->s.join()) .collect(Collectors.toList()); } public static void main (String[] args) { long startTime = System.currentTimeMillis(); List<String> list1 = getPrice(list, "mysql" ); for (String element:list1){ System.out.println(element); } long endTime = System.currentTimeMillis(); System.out.println("--普通版----当前操作花费时间----costTime:" +(endTime-startTime)+"毫秒" ); System.out.println("------------------------------分割----------------------" ); startTime = System.currentTimeMillis(); List<String> list2 = getPricesByCompletableFuture(list, "mysql" ); for (String element:list2){ System.out.println(element); } endTime = System.currentTimeMillis(); System.out.println("--性能版-当前操作花费时间----costTime:" +(endTime-startTime)+"毫秒" ); } }class NetMall { @Getter private String netMallName; public NetMall (String netMallName) { this .netMallName = netMallName; } public double calcPrice (String productName) { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0 ); } }
5、CompletableFuture常用API
getNow调用的时候如果计算完了,就拿取这个计算完的值;否则就拿备胎值
1.获得结果和触发计算 获取结果
public T get()
不见不散,容易阻塞
public T get(long timeout,TimeUnit unit)
过时不候,超过时间会爆异常
public T join()
类似于get(),区别在于是否需要抛出异常
public T getNow(T valueIfAbsent)
没有计算完成的情况下,给一个替代结果
立即获取结果不阻塞
计算完,返回计算完成后的结果
没算完,返回设定的valueAbsent(直接返回了备胎值xxx)
主动触发计算
public boolean complete(T value)
是否立即打断get()方法返回括号值
(执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false和原来的abc)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class CompletableFutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } return "abc" ; }); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(uCompletableFuture.complete("completeValue" )+"\t" +uCompletableFuture.get()); } }
2.对计算结果进行处理
thenApply
计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class CompletableFutureDemo2 { public static void main (String[] args) throws ExecutionException, InterruptedException{ CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("111" ); return 1024 ; }).thenApply(f -> { System.out.println("222" ); return f + 1 ; }).thenApply(f -> { System.out.println("333" ); return f + 1 ; }).whenCompleteAsync((v,e) -> { System.out.println("*****v: " +v); }).exceptionally(e -> { e.printStackTrace(); return null ; }); System.out.println("-----主线程结束,END" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
handle
类似于thenApply,但是有异常的话仍然可以往下走一步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class CompletableFutureDemo2 { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("111" ); return 1024 ; }).handle((f,e) -> { int age = 10 /0 ; System.out.println("222" ); return f + 1 ; }).handle((f,e) -> { System.out.println("333" ); return f + 1 ; }).whenCompleteAsync((v,e) -> { System.out.println("*****v: " +v); }).exceptionally(e -> { e.printStackTrace(); return null ; }); System.out.println("-----主线程结束,END" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.对计算结果进行消费
接收任务的处理结果,并消费处理,无返回结果 、消费型函数式接口 ,之前的是Function
thenAccept
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> { return 1 ; }).thenApply(f -> { return f + 2 ; }).thenApply(f -> { return f + 3 ; }).thenApply(f -> { return f + 4 ; }).thenAccept(r -> System.out.println(r)); }
补充:Code之任务之间的顺序执行
thenRun
thenRun(Runnable runnable)
任务A执行完执行B,并且B不需要A的结果
thenAccept
thenAccept(Consumer action)
任务A执行完执行B,B需要A的结果,但是任务B无返回值
thenApply
thenApply(Function fn)
任务A执行完执行B,B需要A的结果,同时任务B有返回值
1 2 3 4 5 6 7 8 9 System.out.println(CompletableFuture.supplyAsync(() -> "resultA" ).thenRun(() -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA" ).thenAccept(resultA -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA" ).thenApply(resultA -> resultA + " resultB" ).join());
4.CompleteFuture和线程池说明(非常重要)
上面的几个方法都有普通版本和后面加Async 的版本
以thenRun
和thenRunAsync
为例,有什么区别?
先看结论
没有传入自定义线程池,都用默认线程池ForkJoinPool
传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池
调用thenRun
方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池
调用thenRunAsync
执行第二个任务的时候,则第一个任务使用的是你自己 传入的线程池,第二个任务使用的是ForkJoin线程池
也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class CompletableFutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(5 ); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务" +"\t" +Thread.currentThread().getName()); return "abcd" ; },threadPool).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务" +"\t" +Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务" +"\t" +Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务" +"\t" +Thread.currentThread().getName()); }); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class CompletableFutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(5 ); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务" +"\t" +Thread.currentThread().getName()); return "abcd" ; },threadPool).thenRunAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务" +"\t" +Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务" +"\t" +Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务" +"\t" +Thread.currentThread().getName()); }); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CompletableFutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(5 ); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ System.out.println("1号任务" +"\t" +Thread.currentThread().getName()); return "abcd" ; },threadPool).thenRun(()->{ System.out.println("2号任务" +"\t" +Thread.currentThread().getName()); }).thenRun(()->{ System.out.println("3号任务" +"\t" +Thread.currentThread().getName()); }).thenRun(()->{ System.out.println("4号任务" +"\t" +Thread.currentThread().getName()); }); } }
1 2 3 4 5 6 7 8 public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); } public CompletableFuture<Void> thenRunAsync (Runnable action) { return uniRunStage(asyncPool, action); }
1 2 3 4 5 6 7 8 9 10 private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1 );private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor ();
5.对计算速度进行选用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class CompletableFutureDemo2 {public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in " ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } return "play1 " ; }); CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in " ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return "play2" ; }); CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> { return f + " is winner" ; }); System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get()); } }
6.对计算结果进行合并
thenCombine
合并
两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理
先完成的先等着,等待其它分支任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CompletableFutureDemo2 { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in " ); return 10 ; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in " ); return 20 ; }); CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in " ); return x + y; }); System.out.println(thenCombineResult.get()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class CompletableFutureDemo2 { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1" ); return 10 ; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2" ); return 20 ; }), (x,y) -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3" ); return x + y; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4" ); return 30 ; }),(a,b) -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5" ); return a + b; }); System.out.println("-----主线程结束,END" ); System.out.println(thenCombineResult.get()); try { TimeUnit.SECONDS.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } }