《Java 7并发编程实战手册》第六章并发集合

Snip20140120_1
人民邮电出版社出版的《Java 7并发编程实战手册》终于出版了,译者是俞黎敏和申绍勇,该书将于近期上架。之前并发编程网组织翻译过此书,由于邮电出版社在并发网联系他们之前就找到了译者,所以没有采用并发网的译稿,但邮电出版社将于并发网展开合作,发布该书的样章(样章由并发网挑选,你也可以回帖告诉我们你想看哪一章的样章),并组织赠书活动回馈给活跃读者。活动详情请时刻关注并发网的微博和微信(微信号:ifeves),最后祝各位用餐愉快!:)

本章将介绍下列内容:

  • 使用非阻塞式线程安全列表
  • 使用阻塞式线程安全列表
  • 使用按优先级排序的阻塞式线程安全列表
  • 使用带有延迟元素的线程安全列表
  • 使用线程安全可遍历映射
  • 生成并发随机数
  • 使用原子变量
  • 使用原子数组

6.1 简介

数据结构(Data Structure)是编程中的基本元素,几乎每个程序都使用一种或多种数据结构来存储和管理数据。Java API提供了包含接口、类和算法的Java集合框架(Java Collection Framework),它实现了可用在程序中的大量数据结构。

当需要在并发程序中使用数据集合时,必须要谨慎地选择相应的实现方式。大多数集合类不能直接用于并发应用,因为它们没有对本身数据的并发访问进行控制。如果一些并发任务共享了一个不适用于并发任务的数据结构,将会遇到数据不一致的错误,并将影响程序的准确运行。这类数据结构的一个例子是ArrayList类。

Java提供了一些可以用于并发程序中的数据集合,它们不会引起任何问题。一般来说,Java提供了两类适用于并发应用的集合。

  • 阻塞式集合(Blocking Collection):这类集合包括添加和移除数据的方法。当集合已满或为空时,被调用的添加或者移除方法就不能立即被执行,那么调用这个方法的线程将被阻塞,一直到该方法可以被成功执行。
  • 非阻塞式集合(Non-Blocking Collection):这类集合也包括添加和移除数据的方法。如果方法不能立即被执行,则返回null或抛出异常,但是调用这个方法的线程不会被阻塞。

通过本章的各个小节,你将学会如何在并发应用中使用一些Java集合。

  • 非阻塞式列表对应的实现类:ConcurrentLinkedDeque类;
  • 阻塞式列表对应的实现类:LinkedBlockingDeque 类;
  • 用于数据生成或消费的阻塞式列表对应的实现类:LinkedTransferQueue类;
  • 按优先级排序列表元素的阻塞式列表对应的实现类:PriorityBlockingQueue类;
  • 带有延迟列表元素的阻塞式列表对应的实现类:DelayQueue类;
  • 非阻塞式可遍历映射对应的实现类:ConcurrentSkipListMap类;
  • 随机数字对应的实现类:ThreadLocalRandom类;
  • 原子变量对应的实现类:AtomicLong和AtomicIntegerArray类。

6.2 使用非阻塞式线程安全列表

最基本的集合类型是列表(List)。一个列表包含的元素数量不定,可以在任何位置添加、读取或移除元素。并发列表允许不同的线程在同一时间添加或移除列表中的元素,而不会造成数据不一致。
在本节,将会学到如何在并发程序中使用非阻塞式列表。非阻塞式列表提供了一些操作,如果被执行的操作不能够立即运行(例如,在列表为空时,从列表取出一个元素),方法会抛出异常或返回null。Java 7引入了ConcurrentLinkedDeque类来实现非阻塞式并发列表。
将要实现的范例包括以下两个不同的任务:

  • 添加大量的数据到一个列表中;
  • 从同一个列表中移除大量的数据。

准备工作

本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。

范例实现

按照接下来的步骤实现本节的范例。
1.创建一个名为AddTask的类,实现Runnable接口。

public class AddTask implements Runnable {

2.声明一个私有的ConcurrentLinkedDeque属性list,并指定它的泛型参数是String型的。

private ConcurrentLinkedDeque list;

3.实现类的构造器来初始化属性。

public AddTask(ConcurrentLinkedDeque list) {
this.list=list;
}

4.实现run()方法。这个方法将10,000个字符串存放到列表中,这些字符串由当前执行任务的线程的名称和数字组成。

@Override
public void run() {
String name=Thread.currentThread().getName();
for (int i=0; i<10000; i++){
list.add(name+": Element "+i);
}
}

5.创建名为PollTask的类,并实现Runnable接口。

public class PollTask implements Runnable {

6.声明一个私有的ConcurrentLinkedDeque属性list,并指定它的泛型参数是String型的。

private ConcurrentLinkedDeque list;

7.实现类的构造器来初始化属性。

public PollTask(ConcurrentLinkedDeque list) {
this.list=list;
}

8.实现run()方法。这个方法将列表中的10,000个字符串取出,总共取5,000次,每次取两个元素。

@Override
public void run() {
for (int i=0; i<5000; i++) {
list.pollFirst();
list.pollLast();
}
}

9.创建范例的主类Main,并添加main()方法。

public class Main {
public static void main(String[] args) {

10.创建ConcurrentLinkedDeque对象,并指定它的泛型参数是String型的。

ConcurrentLinkedDeque list=new
ConcurrentLinkedDeque<>();

11.创建线程数组threads,它包含100个线程。

Thread threads[]=new Thread[100];

12.创建100个AddTask对象及其对应的运行线程。将每个线程存放到上一步创建的数组中,然后启动线程。

for (int i=0; i AddTask task=new AddTask(list);
threads[i]=new Thread(task);
threads[i].start();
}
System.out.printf("Main: %d AddTask threads have been
launched\n",threads.length);

13.使用join()方法等待线程完成。

for (int i=0; i<threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

14.将列表的元素数量打印到控制台。

System.out.printf("Main: Size of the List: %d\n",list.size());

15.创建100个PollTask对象及其对应的运行线程。将每个线程存放到上一步创建的数组中,然后启动线程。

for (int i=0; i< threads.length; i++){
PollTask task=new PollTask(list);
threads[i]=new Thread(task);
threads[i].start();
}
System.out.printf("Main: %d PollTask threads have been
launched\n",threads.length);

16.使用join()方法等待线程完成。

for (int i=0; i<threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

17.将列表的元素数量打印到控制台。

System.out.printf("Main: Size of the List: %d\n",list.size());

工作原理

本节使用的泛型参数是String类的ConcurrentLinkedDeque对象,用来实现一个非阻塞式并发数据列表。下面的截屏显示了程序的运行结果。

首先,执行100个AddTask任务将元素添加到ConcurrentLinkedDeque对象list中。每个任务使用add()方法向这个列表中插入10,000个元素。add()方法将新元素添加到列表尾部。当所有任务运行完毕,列表中的元素数量将被打印到控制台。在这一刻,列表中有1,000,000个元素。
接下来,执行100个PollTask任务将元素从列表中移除。每个任务使用pollFirst()和pollLast()方法从列表中移除10,000个元素。pollFirst()方法返回并移除列表中的第一个元素,pollLast()方法返回并移除列表中的最后一个元素。如果列表为空,这些方法返回null。当所有任务运行完毕,列表中的元素数量将被打印到控制台。在这一刻,列表中有0个元素。

使用size()方法输出列表中的元素数量。需要注意的是,这个方法返回的值可能不是真实的,尤其当有线程在添加数据或移除数据时,这个方法需要遍历整个列表来计算元素数量,而遍历过的数据可能已经改变。仅当没有任何线程修改列表时,才能保证返回的结果是准确的。

更多信息

ConcurrentLinkedDeque类提供了其他从列表中读取数据的方法。

  • getFirst()和getLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这两个方法抛出NoSuchElementExcpetion异常。
  • peek()、peekFirst()和peekLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这些方法返回null。
  • remove()、removeFirst()和removeLast():分别返回列表中第一个和最后一个元素,返回的元素将会从列表中移除。如果列表为空,这些方法抛出NoSuchElementExcpetion异常。

6.3 使用阻塞式线程安全列表

最基本的集合类型是列表。一个列表包含的元素数量不定,可以在任何位置添加、读取或移除元素。并发列表允许不同的线程在同一时间添加或移除列表中的元素,而不会造成数据不一致。
在本节,你会学到如何在并发程序中使用阻塞式列表。阻塞式列表与非阻塞式列表的主要差别是:阻塞式列表在插入和删除操作时,如果列表已满或为空,操作不会被立即执行,而是将调用这个操作的线程阻塞队列直到操作可以执行成功。Java引入了LinkedBlocking Deque类来实现阻塞式列表。
将要实现的范例包括以下两个不同的任务:

  • 添加大量的数据到一个列表中;
  • 从同一个列表中移除大量的数据。

准备工作

本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。

范例实现

按照接下来的步骤实现本节的范例。
1.创建名为Client的类,并实现Runnable接口。

public class Client implements Runnable{

2.声明一个私有的LinkedBlockingDeque属性requestList,并指定它的泛型参数是String型的。

private LinkedBlockingDeque requestList;

3.实现类的构造器来初始化属性。

public Client (LinkedBlockingDeque requestList) {
this.requestList=requestList;
}

4.实现run()方法。使用requestList对象的put()方法,每两秒向列表requestList中插入5个字符串。重复3次。

@Override
public void run() {
for (int i=0; i<3; i++) {
for (int j=0; j<5; j++) {
StringBuilder request=new StringBuilder();
request.append(i);
request.append(":");
request.append(j);
try {
requestList.put(request.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Client: %s at %s.\n",request,new
Date());
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("Client: End.\n");
}

5. 创建范例的主类Main,并添加main()方法。

public class Main {
public static void main(String[] args) throws Exception {

6. 声明并创建LinkedBlockingDeque属性list,并指定它的泛型参数是String型的。

LinkedBlockingDeque list=new LinkedBlockingDeque<>(3);

7.将client作为传入参数创建线程Thread并启动。

Client client=new Client(list);
Thread thread=new Thread(client);
thread.start();

8.使用list对象的take()方法,每300毫秒从列表中取出3个字符串对象,重复5次。在控制台输出字符串。

for (int i=0; i for (int j=0; j<3; j++) {
String request=list.take();
System.out.printf("Main: Request: %s at %s. Size:
%d\n",request,new Date(),list.size());
}
TimeUnit.MILLISECONDS.sleep(300);
}

9.输出一条表示程序结束的消息。

System.out.printf("Main: End of the program.\n");

工作原理

本节使用的泛型参数是String的LinkedBlockingDeque对象,用来实现一个阻塞式并发数据列表。
Client类使用put()方法将字符串插入到列表中。如果列表已满(列表生成时指定了固定的容量),调用这个方法的线程将被阻塞直到列表中有了可用的空间。

Main类使用take()方法从列表中取字符串。如果列表为空,调用这个方法的线程将被阻塞直到列表不为空(即有可用的元素)。

这个例子中使用了LinkedBlockingDeque对象的两个方法,调用它们的线程可能会被阻塞,在阻塞时如果线程被中断,方法会抛出InterruptedException异常,所以必须捕获和处理这个异常。

更多信息

LinkedBlockingDeque类也提供了其他存取元素的方法,这些方法不会引起阻塞,而是抛出异常或返回null。

  • takeFirst()和takeLast():分别返回列表中第一个和最后一个元素,返回的元素会从列表中移除。如果列表为空,调用方法的线程将被阻塞直到列表中有可用的元素出现。
  • getFirst()和getLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,则抛出NoSuchElementExcpetinon异常。
  • peek()、peekFirst()和peekLast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,返回null。
  • poll()、pollFirst()和pollLast():分别返回列表中第一个和最后一个元素,返回的元素将会从列表中移除。如果列表为空,返回null。
  • add()、addFirst()和addLast(): 分别将元素添加到列表中第一位和最后一位。如果列表已满(列表生成时指定了固定的容量),这些方法将抛出IllegalStateException异常。

参见

参见6.3节。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《Java 7并发编程实战手册》第六章并发集合

方 腾飞

花名清英,并发网(ifeve.com)创始人,畅销书《Java并发编程的艺术》作者,蚂蚁金服技术专家。目前工作于支付宝微贷事业部,关注互联网金融,并发编程和敏捷实践。微信公众号aliqinying。

Latest posts by 方 腾飞 (see all)

FavoriteLoading添加本文到我的收藏
  • Trackback are closed
  • Comments (3)
  1. 哦,好评,求赠书。。。。

  2. 非常精彩,很棒的网站,学到很多东西。能赠书吗?

    • yhw7del
    • 01/28. 2014 3:03pm

    求关于并发测试的样章

You must be logged in to post a comment.

return top