串并行转换 - Future模式


假设阿亮同学突发奇想的想要做一顿美餐, 发现没有菜和锅, 怎么办呢?

Future是什么

按年纪大的老亮的思路呢,自己先到菜场买菜,再到商店去买个锅,最后回来做饭 而年轻的小亮,网上买菜+买锅,自己在家里等快递送上门.等待期间可以听听歌,看看书(好同学)

虽然送货上门和老亮自己买回来都要花时间,但这里要注意是 ,花的是谁的时间

并发编程的目的就是把事情分给别人(别的线程/其他CPU核心)干

A代表买菜,B代表买锅,C代表做饭

老亮的思路用代码写就是

static main(String[] args){
    A();
    B();
    C();
}

小亮的思路:

static main(String[] args){
      Thread t1 = new Thread(new Runable(){void run(){
        A();
    }}).start;
      Thread t2 = new Thread(new Runable(){void run(){
        B();
    }}).start;
    //小亮在等待送货上门的过程中, 可以read一下book;
    readBook();
    t1.join();
    t2.join();
    C();
}

但上面小亮思路的实现有一点点问题. t2 线程会在 t1线程执行完成后才启动,也就是菜送到后,锅才开始送货. 无疑使用join等待的方式其效率大打折扣

可以用修改内存的方式替换join


TaskState cb = new Object(){
    public boolean isDone = false;
    public Object result ;
    void getResult(){
       return Reuslt;
    }
}
static main(String[] args){
    TaskState ts1 =  new TaskState();    
    TaskState ts2 =  new TaskState();
      Thread t1 = new Thread(new Runable(ts1){void run(CallBack cb){
        A();
        ts1.result = xxx;
        ts1.isDone = true;
    }}).start;
      Thread t2 = new Thread(new Runable(ts2){void run(CallBack cb){
        B();
        ts2.result = yyy;
        ts2.isDone = true;
    }}).start;

    //小亮在等待送货上门的过程中, 可以read一下book;
    readBook();
    while(ts1.isDone&&ts2.isDone){
        C();
    }

}

大家会发现上述的重复的代码很多,可以封装起来,其实JDK为我们做了一个封装,源码如下:

package java.util.concurrent;

public interface Future<V> {
    //试图取消对此任务的执行
    boolean cancel(boolean mayInterruptIfRunning);

    //如果在任务正常完成前将其取消,则返回 true
    boolean isCancelled();

    //如果任务已完成,则返回 true
    boolean isDone();

    //如有必要,等待计算完成,然后获取其结果
    V get() throws InterruptedException, ExecutionException;

    //如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

同时大家会发现 TaskState的代码是否总是和Runnable一起出现,JDK也为我们进一步做了封装.

FutureTask 类是 Future 的一个实现, Future 可实现 Runnable,所以可通过 Executor 来执。


  FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
    public String call() {
       return searcher.search(target);
    }
 });
 executor.execute(future);

 while(futrue.isDone()){
     ...
 }

Future的缺点

Future目前代码的写法有一个问题就是,它有一个while死循环,浪费CPU. 虽然还有一个get方法可以挂起当前线程,直到异步任务完成被唤醒. 但这不适用于一些主线程不能被挂起的场景.

比如Android JavaScipt

我们换成回调的方式,改一改代码:


public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

    //定义一个回调接口
    public interface CompletedListener<V>{
        void onCompleted(V v);
    }
    //任务完成时调用回调函数通知监听者,并把结果传递给监听者
    void addCompletedListener(CompletedListener cb);
}

这样堵塞主线程的问题就解决了. 但是, 又引起了一个问题 回调地狱

假设阿亮需要等买回来菜的实际品质来决定烹饪的手法,那么代码的写法:

onCompleted1(){
    onCompleted2(){
        onCompleted3(){
            ...
        }
    }
}

很容易代码写成层层嵌套, 来看看更好的解决方案CompletableFuture

Future的改进-CompletableFuture

CompletableFuturethenCompose,可以将多个异步任务串联起来而不会挂起堵塞主线程,同时写法上和老亮的思路一致;

A().then(B()).then(C());

下面利用CompletableFuture模拟一个经典GUI框架的例子:

  • GUI程序通常有一个主循环.
  • GUI启动前需要加载一些系统资源完成初始化,如图片,数据库
  • 初始化过程中可以更新进度条,完成后再执行应用层的代码
static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
static volatile boolean isRun = true;
static volatile boolean isInit = false;

public static void GUILoop() {
    System.out.println("GUI.onDraw");
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
    initTask().thenAccept(result -> {
        System.out.println("loading task is complete");
        isInit = true;
    });
    int count = 0;
    while (isRun) {
        if (!isInit) {
            count++;
            System.out.println("loading... "+count +" s");
        } else {
            GUILoop();
        }
        Thread.sleep(1000);
    }
    System.out.println("loop  exit");
    fixedThreadPool.shutdown();
}

下面是异步加载资源的具体示例代码


private static CompletableFuture initTask() {

        CompletableFuture<String> completableFuture1 = CompletableFuture
                .supplyAsync(() -> {
                    //模拟执行耗时任务
                    System.out.println(Thread.currentThread().getName() + "-> load res1 doing...");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //返回结果
                    return "result1";
                }, fixedThreadPool);
        completableFuture1.thenAccept(s -> System.out.println(Thread.currentThread().getName() + "-> load res1 done ,result:" + s));

        //等第一个任务完成后,将任务结果传给参数result,执行后面的任务并返回一个代表任务的completableFuture
        CompletableFuture<String> completableFuture2 = completableFuture1
                .thenComposeAsync(result -> CompletableFuture.supplyAsync(() -> {
                    //模拟执行耗时任务
                    System.out.println(Thread.currentThread().getName() + "-> load res2 doing...");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //返回结果
                    return "result2";
                }, fixedThreadPool));

        CompletableFuture initTask = completableFuture2.thenAccept(s -> {
            System.out.println(Thread.currentThread().getName() + "-> load res2 done");
        });

        return initTask;
    }

最终输出如下:

"C:\Program Files\Java\jdk1.8.0_91\bin\java" -j...
pool-1-thread-1-> load res1 doing...
loading... 1 s
loading... 2 s
loading... 3 s
pool-1-thread-1-> load res1 done ,result:result1
loading... 4 s
pool-1-thread-2-> load res2 doing...
loading... 5 s
loading... 6 s
loading... 7 s
pool-1-thread-2-> load res2 done
loading task is complete
GUI onDraw
GUI onDraw
GUI onDraw
GUI onDraw

没有堵塞主线程, 没有回调嵌套

总结

Future是并行异步编程思维的封装

CompletableFuture是对future的改进,当然CompletableFuture还提供了50做种操控执行顺序的方式, 是并行代码串行的体现

其他语言还有更高级的语法糖. 如TypeScript的await(编译器帮忙写回调)

不管是Future还是CompletableFuture或者awit,其本质上都是基于回调,但做到了 代码读上去是串行,实际上并行

最后更新于 10th May 2019