开发者的天空

 

 

搜索
开发者的天空 论坛 Java编程基础 利用CyclicBarrier进行多线程编程
查看: 2079|回复: 1
go

[Java综合] 利用CyclicBarrier进行多线程编程  

Rank: 75Rank: 75Rank: 75

发表于 2010-7-14 22:05 |显示全部帖子
从Java SE 5开始,在java.util.concurrent包中提供了很多线程相关的工具类,如CyclicBarrier和CountdownLatch类。在我们使用多线程进行编程的时候,这些类能够提供很大的帮助。下面我们以一个例子来说明怎样使用CyclicBarrier类。例子中的题目是一个面试题:
有一个很大的整数list,需要求这个list中所有整数的和,写一个可以充分利用多核CPU的代码,来计算结果。

一:分析题目
从题中可以看到“很大的List”以及“充分利用多核CPU”,这就已经充分告诉我们要采用多线程(任务)进行编写。具体怎么做呢?大概的思路就是分割List,每一小块的List采用一个线程(任务)进行计算其和,最后等待所有的线程(任务)都执行完后就可得到这个“很大的List”中所有整数的和。
二:具体分析和技术方案
既然我们已经决定采用多线程(任务),并且还要分割List,每一小块的List采用一个线程(任务)进行计算其和,那么我们必须要等待所有的线程(任务)完成之后才能得到正确的结果,那么怎么才能保证“等待所有的线程(任务)完成之后输出结果呢”?这就要靠 java.util.concurrent包中的CyclicBarrier类了。它是一个同步辅助类,它允许一组线程(任务)互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程(任务)的程序中,这些线程(任务)必须不时地互相等待,此时 CyclicBarrier 很有用。简单的概括其适应场景就是:当一组线程(任务)并发的执行一件工作的时候,必须等待所有的线程(任务)都完成时才能进行下一个步骤。具体技术方案步骤如下:

    * 分割List,根据采用的线程(任务)数平均分配,即list.size()/threadCounts。
    * 定义一个记录“很大List”中所有整数和的变量sum,采用一个线程(任务)处理一个分割后的子List,计算子List中所有整数和(subSum),然后把和(subSum)累加到sum上。
    * 等待所有线程(任务)完成后输出总和(sum)的值。
7d88cf28-5541-3965-bf99-0a39976f1248.png

三:详细编码实现
代码中有很详细的注释,这里就不解释了。
  1. /**
  2. * 计算List中所有整数的和<br>
  3. * 采用多线程,分割List计算
  4. * @author 飞雪无情
  5. * @since 2010-7-12
  6. */
  7. public class CountListIntegerSum {
  8.         private long sum;//存放整数的和
  9.         private CyclicBarrier barrier;//障栅集合点(同步器)
  10.         private List<Integer> list;//整数集合List
  11.         private int threadCounts;//使用的线程数
  12.         public CountListIntegerSum(List<Integer> list,int threadCounts) {
  13.                 this.list=list;
  14.                 this.threadCounts=threadCounts;
  15.         }
  16.         /**
  17.          * 获取List中所有整数的和
  18.          * @return
  19.          */
  20.         public long getIntegerSum(){
  21.                 ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
  22.                 int len=list.size()/threadCounts;//平均分割List
  23.                 //List中的数量没有线程数多(很少存在)
  24.                 if(len==0){
  25.                         threadCounts=list.size();//采用一个线程处理List中的一个元素
  26.                         len=list.size()/threadCounts;//重新平均分割List
  27.                 }
  28.                 barrier=new CyclicBarrier(threadCounts+1);
  29.                 for(int i=0;i<threadCounts;i++){
  30.                         //创建线程任务
  31.                         if(i==threadCounts-1){//最后一个线程承担剩下的所有元素的计算
  32.                                 exec.execute(new SubIntegerSumTask(list.subList(i*len,list.size())));
  33.                         }else{
  34.                                 exec.execute(new SubIntegerSumTask(list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1))));
  35.                         }
  36.                 }
  37.                 try {
  38.                         barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处
  39.                 } catch (InterruptedException e) {
  40.                         System.out.println(Thread.currentThread().getName()+":Interrupted");
  41.                 } catch (BrokenBarrierException e) {
  42.                         System.out.println(Thread.currentThread().getName()+":BrokenBarrier");
  43.                 }
  44.                 exec.shutdown();
  45.                 return sum;
  46.         }
  47.         /**
  48.          * 分割计算List整数和的线程任务
  49.          * @author lishuai
  50.          *
  51.          */
  52.         public class SubIntegerSumTask implements Runnable{
  53.                 private List<Integer> subList;
  54.                 public SubIntegerSumTask(List<Integer> subList) {
  55.                         this.subList=subList;
  56.                 }
  57.                 public void run() {
  58.                         long subSum=0L;
  59.                         for (Integer i : subList) {
  60.                                 subSum += i;
  61.                         }  
  62.                         synchronized(CountListIntegerSum.this){//在CountListIntegerSum对象上同步
  63.                                 sum+=subSum;
  64.                         }
  65.                         try {
  66.                                 barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处
  67.                         } catch (InterruptedException e) {
  68.                                 System.out.println(Thread.currentThread().getName()+":Interrupted");
  69.                         } catch (BrokenBarrierException e) {
  70.                                 System.out.println(Thread.currentThread().getName()+":BrokenBarrier");
  71.                         }
  72.                         System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);
  73.                 }
  74.                
  75.         }
  76.        
  77. }
复制代码
有人可能对barrier=new CyclicBarrier(threadCounts+1);//创建的线程数和主线程main有点不解,不是采用的线程(任务)数是 threadCounts个吗?怎么为CyclicBarrier设置的给定数量的线程参与者比我们要采用的线程数多一个呢?答案就是这个多出来的一个用于控制main主线程的,主线程也要等待,它要等待其他所有的线程完成才能输出sum值,这样才能保证sum值的正确性,如果main不等待的话,那么结果将是不可预料的。
  1. /**
  2. * 计算List中所有整数的和测试类
  3. * @author 飞雪无情
  4. * @since 2010-7-12
  5. */
  6. public class CountListIntegerSumMain {

  7.         /**
  8.          * @param args
  9.          */
  10.         public static void main(String[] args) {
  11.                 List<Integer> list = new ArrayList<Integer>();
  12.                 int threadCounts = 10;//采用的线程数
  13.                 //生成的List数据
  14.                 for (int i = 1; i <= 1000000; i++) {
  15.                         list.add(i);
  16.                 }
  17.                 CountListIntegerSum countListIntegerSum=new CountListIntegerSum(list,threadCounts);
  18.                 long sum=countListIntegerSum.getIntegerSum();
  19.                 System.out.println("List中所有整数的和为:"+sum);
  20.         }

  21. }
复制代码
四:总结
本文主要通过一个淘宝的面试题为引子,介绍了并发的一点小知识,主要是介绍通过CyclicBarrier同步辅助器辅助多个并发任务共同完成一件工作。Java SE5的java.util.concurrent引入了大量的设计来解决并发问题,使用它们有助于我们编写更加简单而健壮的并发程序。

附mathfox提到的ExecutorService.invokeAll()方法的实现
这个不用自己控制等待,invokeAll执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。sdh5724也说用了同步,性能不好。这个去掉了同步,根据返回结果的 Future 列表相加就得到总和了。
  1. /**
  2. * 使用ExecutorService的invokeAll方法计算
  3. * @author 飞雪无情
  4. *
  5. */
  6. public class CountSumWithCallable {

  7.         /**
  8.          * @param args
  9.          * @throws InterruptedException
  10.          * @throws ExecutionException
  11.          */
  12.         public static void main(String[] args) throws InterruptedException, ExecutionException {
  13.                 int threadCounts =19;//使用的线程数
  14.                 long sum=0;
  15.                 ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
  16.                 List<Callable<Long>> callList=new ArrayList<Callable<Long>>();
  17.                 //生成很大的List
  18.                 List<Integer> list = new ArrayList<Integer>();
  19.                 for (int i = 0; i <= 1000000; i++) {
  20.                         list.add(i);
  21.                 }
  22.                 int len=list.size()/threadCounts;//平均分割List
  23.                 //List中的数量没有线程数多(很少存在)
  24.                 if(len==0){
  25.                         threadCounts=list.size();//采用一个线程处理List中的一个元素
  26.                         len=list.size()/threadCounts;//重新平均分割List
  27.                 }
  28.                 for(int i=0;i<threadCounts;i++){
  29.                         final List<Integer> subList;
  30.                         if(i==threadCounts-1){
  31.                                 subList=list.subList(i*len,list.size());
  32.                         }else{
  33.                                 subList=list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1));
  34.                         }
  35.                         //采用匿名内部类实现
  36.                         callList.add(new Callable<Long>(){
  37.                                 public Long call() throws Exception {
  38.                                         long subSum=0L;
  39.                                         for(Integer i:subList){
  40.                                                 subSum+=i;
  41.                                         }
  42.                                         System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);
  43.                                         return subSum;
  44.                                 }
  45.                         });
  46.                 }
  47.                 List<Future<Long>> futureList=exec.invokeAll(callList);
  48.                 for(Future<Long> future:futureList){
  49.                         sum+=future.get();
  50.                 }
  51.                 exec.shutdown();
  52.                 System.out.println(sum);
  53.         }

  54. }
复制代码

Rank: 3Rank: 3Rank: 3

发表于 2010-10-15 16:37 |显示全部帖子
饿,这个难度不小吧,我还是看不懂啊,不过好东西我就收藏了,慢慢学习!
你需要登录后才可以回帖 登录 | 注册

Archiver|开发者的天空

GMT+8, 2012-2-8 19:52

Powered by Discuz! X1.5

© 2001-2010 Comsenz Inc.