线程同步工具(四)在同一个点同步任务

声明:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 译者:郑玉婷

在同一个点同步任务

Java 并发 API 提供了可以允许2个或多个线程在在一个确定点的同步应用。它是 CyclicBarrier 类。此类与在此章节的等待多个并发事件完成指南中的 CountDownLatch 类相似,但是它有一些特殊性让它成为更强大的类。

CyclicBarrier 类有一个整数初始值,此值表示将在同一点同步的线程数量。当其中一个线程到达确定点,它会调用await() 方法来等待其他线程。当线程调用这个方法,CyclicBarrier阻塞线程进入休眠直到其他线程到达。当最后一个线程调用CyclicBarrier 类的await() 方法,它唤醒所有等待的线程并继续执行它们的任务。

CyclicBarrier 类有个有趣的优势是,你可以传递一个外加的 Runnable 对象作为初始参数,并且当全部线程都到达同一个点时,CyclicBarrier类 会把这个对象当做线程来执行。此特点让这个类在使用 divide 和 conquer 编程技术时,可以充分发挥任务的并行性,

在这个指南,你将学习如何使用 CyclicBarrier 类来让一组线程在一个确定点同步。你也将使用 Runnable 对象,它将会在全部线程都到达确定点后被执行。在这个例子里,你将在数字矩阵中查找一个数字。矩阵会被分成多个子集(使用divide 和 conquer 技术),所以每个线程会在一个子集中查找那个数字。一旦全部行程运行结束,会有一个最终任务来统一他们的结果。

准备

指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java项目。

怎么做呢

按照这些步骤来实现下面的例子::

[code language=”java”]
//1. 我们从实现2个辅助类开始。首先,创建一个类名为 MatrixMock。此类随机生成一个在1-10之间的 数字矩阵,我们将从中查找数字。
public class MatrixMock {

//2. 声明私有 int matrix,名为 data。
private int data[][];

//3. 实现类的构造函数。此构造函数将接收矩阵的行数,行的长度,和我们将要查找的数字作为参数。3个参数全部int 类型。
public MatrixMock(int size, int length, int number){

//4. 初始化构造函数将使用的变量和对象。
int counter=0;
data=new int[size][length];
Random random=new Random();

//5. 用随机数字填充矩阵。每生成一个数字就与要查找的数字对比,如果相等,就增加counter值。
for (int i=0; i<size; i++) {
for (int j=0; j<length; j++){
data[i][j]=random.nextInt(10);
if (data[i][j]==number){
counter++;
}
}
}

//6. 最后,在操控台打印一条信息,表示查找的数字在生成的矩阵里的出现次数。此信息是用来检查线程们获得的正确结果的。
System.out.printf("Mock: There are %d ocurrences of number in generated data.\n",counter,number); //译者注:把字符串里的number改为%d.

//7. 实现 getRow() 方法。此方法接收一个 int为参数,是矩阵的行数。返回行数如果存在,否则返回null。
public int[] getRow(int row){
if ((row>=0)&&(row<data.length)){
return data[row];
}
return null;
}

//8. 现在,实现一个类名为 Results。此类会在array内保存被查找的数字在矩阵的每行里出现的次数。
public class Results {

//9. 声明私有 int array 名为 data。
private int data[];

//10. 实现类的构造函数。此构造函数接收一个表明array元素量的整数作为参数。
public Results(int size){
data=new int[size];
}

//11. 实现 setData() 方法。此方法接收array的某个位置和一个值作为参数,然后把array的那个位置设定为那个值。
public void setData(int position, int value){
data[position]=value;
}

//12. 实现 getData() 方法。此方法返回结果 array。
public int[] getData(){
return data;
}

//13. 现在你有了辅助类,是时候来实现线程了。首先,实现 Searcher 类。这个类会在随机数字的矩阵中的特定的行里查找数字。创建一个类名为Searcher 并一定实现 Runnable 接口.
public class Searcher implements Runnable {

//14. 声明2个私有int属性名为 firstRow 和 lastRow。这2个属性是用来确定将要用的子集的行。
private int firstRow;
private int lastRow;

//15. 声明一个私有 MatrixMock 属性,名为 mock。
private MatrixMock mock;

//16. 声明一个私有 Results 属性,名为 results。
private Results results;

//17. 声明一个私有 int 属性名为 number,用来储存我们要查找的数字。
private int number;

//18. 声明一个 CyclicBarrier 对象,名为 barrier。
private final CyclicBarrier barrier;

//19. 实现类的构造函数,并初始化之前声明的全部属性。
public Searcher(int firstRow, int lastRow, NumberMock mock, Results results, int number, CyclicBarrier barrier){
this.firstRow=firstRow;
this.lastRow=lastRow;
this.mock=mock;
this.results=results;
this.number=number;
this.barrier=barrier;
}

//20. 实现 run() 方法,用来查找数字。它使用内部变量,名为counter,用来储存数字在每行出现的次数。
@Override
public void run() {
int counter;

//21. 在操控台打印一条信息表明被分配到这个对象的行。
System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow);

//22. 处理分配给这个线程的全部行。对于每行,记录正在查找的数字出现的次数,并在相对于的 Results 对象中保存此数据。
for (int i=firstRow; i<lastRow; i++){
int row[]=mock.getRow(i);
counter=0;
for (int j=0; j<row.length; j++){
if (row[j]==number){
counter++;
}
}

results.setData(i, counter);
}

//23. 打印信息到操控台表明此对象已经结束搜索。
System.out.printf("%s: Lines processed.\n",Thread. currentThread().getName());

//24. 调用 CyclicBarrier 对象的 await() 方法 ,由于可能抛出的异常,要加入处理 InterruptedException and BrokenBarrierException 异常的必需代码。
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

//25. 现在,实现一个类来计算数字在这个矩阵里出现的总数。它使用储存了矩阵中每行里数字出现次数的 Results 对象来进行运算。创建一个类,名为 Grouper 并一定实现 Runnable 接口.
public class Grouper implements Runnable {

//26. 声明一个私有 Results 属性,名为 results。
private Results results;

//27. 实现类的构造函数,并初始化 Results 属性。
public Grouper(Results results){
this.results=results;
}

//28.实现 run() 方法,用来计算结果array里数字出现次数的总和。
@Override
public void run() {

//29. 声明一个 int 变量并写在操控台写一条信息表明开始处理了。
int finalResult=0;
System.out.printf("Grouper: Processing results…\n");

//30. 使用 results 对象的 getData() 方法来获得每行数字出现的次数。然后,处理array的全部元素,把每个元素的值加给 finalResult 变量。
int data[]=results.getData();
for (int number:data){
finalResult+=number;
}

//31. 在操控台打印结果。
System.out.printf("Grouper: Total result: %d.\n",finalResult);

//32. 最后, 实现例子的 main 类,通过创建一个类,名为 Main 并为其添加 main() 方法。
public class Main {

public static void main(String[] args) {

//33. 声明并初始5个常熟来储存应用的参数。
final int ROWS=10000;
final int NUMBERS=1000;
final int SEARCH=5;
final int PARTICIPANTS=5;
final int LINES_PARTICIPANT=2000;

//34. Create a MatrixMock 对象,名为 mock. 它将有 10,000 行,每行1000个元素。现在,你要查找的数字是5。
MatrixMock mock=new MatrixMock(ROWS, NUMBERS,SEARCH);

//35. 创建 Results 对象,名为 results。它将有 10,000 元素。
Results results=new Results(ROWS);

//36. 创建 Grouper 对象,名为 grouper。
Grouper grouper=new Grouper(results);

//37. 创建 CyclicBarrier 对象,名为 barrier。此对象会等待5个线程。当此线程结束后,它会执行前面创建的 Grouper 对象。
CyclicBarrier barrier=new CyclicBarrier(PARTICIPANTS,grouper);

//38. 创建5个 Searcher 对象,5个执行他们的线程,并开始这5个线程。
Searcher searchers[]=new Searcher[PARTICIPANTS];
for (int i=0; i<PARTICIPANTS; i++){
searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_ PARTICIPANT)+LINES_PARTICIPANT, mock, results, 5,barrier);
Thread thread=new Thread(searchers[i]);
thread.start();
}
System.out.printf("Main: The main thread has finished.\n");
[/code]

它是怎么工作的…

以下裁图是例子的运行结果:

例子中解决的问题比较简单。我们有一个很大的随机的整数矩阵,然后你想知道这矩阵里面某个数字出现的次数。为了更好的执行,我们使用了 divide 和 conquer 技术。我们 divide 矩阵成5个子集,然后在每个子集里使用一个线程来查找数字。这些线程是 Searcher 类的对象。

我们使用 CyclicBarrier 对象来同步5个线程的完成,并执行 Grouper 任务处理个别结果,最后计算最终结果。

如我们之前提到的,CyclicBarrier 类有一个内部计数器控制到达同步点的线程数量。每次线程到达同步点,它调用 await() 方法告知 CyclicBarrier 对象到达同步点了。CyclicBarrier 把线程放入睡眠状态直到全部的线程都到达他们的同步点。

当全部的线程都到达他们的同步点,CyclicBarrier 对象叫醒全部正在 await() 方法中等待的线程们,然后,选择性的,为CyclicBarrier的构造函数 传递的 Runnable 对象(例子里,是 Grouper 对象)创建新的线程执行外加任务。

更多…

CyclicBarrier 类有另一个版本的 await() 方法:

  • await(long time, TimeUnit unit): 线程会一直休眠直到被中断;内部计数器到达0,或者特定的时间过去了。TimeUnit类有多种常量: DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, and SECONDS.

此类也提供了 getNumberWaiting() 方法,返回被 await() 方法阻塞的线程数,还有 getParties() 方法,返回将与CyclicBarrier同步的任务数。

重置 CyclicBarrier 对象
CyclicBarrier 类与CountDownLatch有一些共同点,但是也有一些不同。最主要的不同是,CyclicBarrier对象可以重置到它的初始状态,重新分配新的值给内部计数器,即使它已经被初始过了。

可以使用 CyclicBarrier的reset() 方法来进行重置操作。当这个方法被调用后,全部的正在await() 方法里等待的线程接收到一个 BrokenBarrierException 异常。此异常在例子中已经用打印stack trace处理了,但是在一个更复制的应用,它可以执行一些其他操作,例如重新开始执行或者在中断点恢复操作。

破坏 CyclicBarrier 对象
CyclicBarrier 对象可能处于一个特殊的状态,称为 broken。当多个线程正在 await() 方法中等待时,其中一个被中断了,此线程会收到 InterruptedException 异常,但是其他正在等待的线程将收到 BrokenBarrierException 异常,并且 CyclicBarrier 会被置于broken 状态中。

CyclicBarrier 类提供了isBroken() 方法,如果对象在 broken 状态,返回true,否则返回false。

参见

第三章,线程同步应用:等待多个并发事件

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 线程同步工具(四)在同一个点同步任务

  • Trackback 关闭
  • 评论 (7)
    • yunnick
    • 2013/10/24 11:42上午

    内容出错了啊,这是第4节的

      • 郑玉婷
      • 2013/10/30 8:28上午

      恩。已改。

    • kun
    • 2013/10/25 8:43下午

    是的,关于CountDownLatch的章节丢掉了,CyclicBarrier重复了两次。

    • Iven
    • 2013/11/06 4:09下午

    代码 077 应该是 NumberMock mock —> MatrixMock mock

    • zwm512327
    • 2013/12/04 8:20上午

    我想问下,这种方式和在主线程中join等待5个线程结束后再统计有什么区别还是有什么优势?

      • yf
      • 2014/09/16 10:31上午

      Yes, “t.join()” makes the current thread waiting for “t” thread is finished and we can prepare a chain of threads when a thread is waiting for some other. But sometimes CountDownLatch/CyclicBarrier are more convenient.

      First of all, CountDownLatch/CyclicBarrier don’t require all working threads should be finished. The threads can be running all the time the application is running. They just let us say that “some work” is done a number of times. Moreover, if we have N jobs and M threads and N > M, some threads can do a job several times until their common barier N is 0. This example shows that CountDownLatch/CyclicBarrier are very useful primitives to share N tasks between M threads.

      Also, to use join(), each thread should have a reference to another thread to call join(). It makes your code a bit dirty especially when you have more than 2 working threads. Sharing of one instance of CountDownLatch/CyclicBarrier looks more clear.

      The main difference between CyclicBarrier and CountDownLatch is that CyclicBarrier is reusable and CountDownLatch is not. You can reuse CyclicBarrier by calling reset() method which resets the barrier to its initial state.

      CountDownLatch is good for one time event like application/module start-up time and CyclicBarrier can be used to in case of recurrent event e.g. concurrently (re-)calculating each time when the input data changed.

      You can find some good examples at:

      http://javarevisited.blogspot.sg/2012/07/countdownlatch-example-in-java.html http://javarevisited.blogspot.ru/2012/07/cyclicbarrier-example-java-5-concurrency-tutorial.html

    • 龙大叔
    • 2018/03/29 11:20下午

    所以这里为什么要 Results results=new Results(ROWS);
    而不是 new Results(PARTICIPANTS); ???

return top