多线程应用中,经常会遇到这种场景:后面的处理,依赖前面的N个线程的处理结果,必须等前面的线程执行完毕后,后面的代码才允许执行。
在我不知道CyclicBarrier之前,最容易想到的就是放置一个公用的static变量,假如有10个线程,每个线程处理完上去累加下结果,然后后面用一个死循环(或类似线程阻塞的方法),去数这个结果,达到10个,说明大家都爽完了,可以进行后续的事情了,这个想法虽然土鳖,但是基本上跟语言无关,几乎所有主流编程语言都支持。
package yjmyzz.test; public class ThreadLockTest { public static int flag = 0;//公用变量 public static void main(String[] args) throws Exception { ThreadLockTest testObj = new ThreadLockTest(); final int threadNum = 10; for (int i = 0; i < threadNum; i++) { new Thread(new MyRunable(i, testObj)).start(); } while (true) { if (testObj.flag >= threadNum) { System.out.println("-----------\n所有thread执行完成!"); break; } Thread.sleep(10); } } static class MyRunable implements Runnable { int _i = 0; ThreadLockTest _test; public MyRunable(int i, ThreadLockTest test) { this._i = i; this._test = test; } @Override public void run() { try { Thread.sleep((long) (Math.random() * 10)); System.out.println("thread " + _i + " done"); //利用synchronized获得同步锁 synchronized (_test) { _test.flag += 1; } System.out.println("thread " + _i + " => " + _test.flag);//测试用 } catch (Exception e) { e.printStackTrace(); } } }}
输出结果:
thread 0 done
thread 0 => 1 thread 9 done thread 9 => 2 thread 1 done thread 1 => 3 thread 3 done thread 3 => 4 thread 7 done thread 7 => 5 thread 6 done thread 6 => 6 thread 2 done thread 2 => 7 thread 4 done thread 4 => 8 thread 8 done thread 8 => 9 thread 5 done thread 5 => 10 ----------- 所有thread执行完成!除了这个方法,还可以借助FutureTask,达到类似的效果,其get方法会阻塞线程,等到该异步处理完成。缺点就是,FutureTask调用的是Callable,必须要有返回值,所以就算你不想要返回值,也得返回点啥
package yjmyzz.test; import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask; public class FutureTaskTest { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask[] tasks = new FutureTask[10]; for (int i = 0; i < tasks.length; i++) { final int j = i; tasks[i] = new FutureTask (new Callable () { @Override public String call() throws Exception { Thread.sleep((long) (Math.random() * 100)); return "task" + j + " done"; } }); new Thread(tasks[i]).start(); } for (int i = 0; i < tasks.length; i++) { System.out.println(tasks[i].get());//依次等待所有task执行完毕 } System.out.println("-----------\n所有task执行完成!"); }}
执行结果:
task0 done
task1 done task2 done task3 done task4 done task5 done task6 done task7 done task8 done task9 done ----------- 所有task执行完成!此外,Thread的Join方法也可以实现类似的效果,主要代码如下:
public static void main(String[] args) throws Exception { final int threadNum = 10; Thread[] threads = new Thread[threadNum]; for (int i = 0; i < threadNum; i++) { threads[i] = new Thread(new MyRunable(i)); threads[i].start(); } for (int i = 0; i < threadNum; i++) { threads[i].join(); } System.out.println("-----------\n所有thread执行完成!"); }
当然,这个需求最“正统”的解法应该是使用CyclicBarrier,它可以设置一个所谓的“屏障点”(或称集合点),好比在一项团队活动中,每个人都是一个线程,但是规定某一项任务开始前,所有人必须先到达集合点,集合完成后,才能继续后面的任务。
package yjmyzz.test; import java.util.concurrent.CyclicBarrier; public class ThreadTest { public static void main(String[] args) throws Exception { final int threadNum = 10; CyclicBarrier cb = new CyclicBarrier(threadNum + 1);//注意:10个子线程 + 1个主线程 for (int i = 0; i < threadNum; i++) { new Thread(new MyRunable(cb, i)).start(); } cb.await(); System.out.println("-----------\n所有thread执行完成!"); } static class MyRunable implements Runnable { CyclicBarrier _cb; int _i = 0; public MyRunable(CyclicBarrier cb, int i) { this._cb = cb; this._i = i; } @Override public void run() { try { Thread.sleep((long) (Math.random() * 100)); System.out.println("thread " + _i + " done,正在等候其它线程完成..."); _cb.await(); } catch (Exception e) { e.printStackTrace(); } } }}
thread 9 done,正在等候其它线程完成...
thread 5 done,正在等候其它线程完成... thread 0 done,正在等候其它线程完成... thread 6 done,正在等候其它线程完成... thread 4 done,正在等候其它线程完成... thread 2 done,正在等候其它线程完成... thread 3 done,正在等候其它线程完成... thread 8 done,正在等候其它线程完成... thread 7 done,正在等候其它线程完成... thread 1 done,正在等候其它线程完成... ----------- 所有thread执行完成!