使用stream操作表达更高级的数据处理请求, Part 1

使用stream操作表达更高级的数据处理请求,Part 1

原文链接 作者:Raoul-Gabriel Urma 译者:石头狮子(v1.lion@qq.com) 校对:吴京润

没有了集合你会怎么做?几乎每一个Java应用都建立和处理集合。对于许多编程任务而言,这是基础的技术:集合分组和处理数据。例如,你可能想要建立一个银行交易集合来代表用户的账户记录。然后,你想要处理所有的集合找出用户花费了多少金额。尽管集合如此重要,但是Java的实现远非完美。

首先,典型的集合处理模式有点像SQL操作,例如”查找”(查找最大值的交易)或”分组”(编组所有与杂货购买有关的交易)。大部分的数据库可以允许我们声明式地指定这些操作。例如,后面的SQL查询可以让我们找出最高值的交易ID:”SELECT id, MAX(value) from transactions”。

正如所见,我们并不需要去实现如何计算最大值(例如,使用循环,一个变量跟踪最大的值)。我仅需要表达我们需要的。这个原则意味着,你并不需要担忧如何明确地实现这些查询–这完全不需要你处理。为什么我们不能让集合做相同的事情呢?想想你使用循环一次又一次的重新实现了这些操作几次?

其次,我们怎样才能有效率的处理大型的集合?理论上讲,需要加快处理的速度,可能要使用多核架构。然而,写出并行处理的代码并不容易,而且也容易出错。

Java SE 8 解决了这个问题。 Java API的设计者使用新的称为Stream的抽象更新了API,使得可以声明式的处理数据。此外,streams可以使用多核心架构,而你并不需要写任何一行关于多核心处理的代码。听起来很美,确实是这样吗?这就是本系列文章要表述的内容。

在我们详细表述使用streams可以做什么之前,先让我们看看一个例子。以便有一个使用Java SE 8 streams新的编程方式的概念。假设我们需要找出所有类型为grocery的交易,返回以交易金额为降序的交易ID列表。Java SE 7中,我们所做的如Listing 1Java SE 8中,我们所做的如Listing 2

[code lang=”java”]
List<Transaction> groceryTransactions = new Arraylist<>();
for(Transaction t: transactions){
if(t.getType() == Transaction.GROCERY){
groceryTransactions.add(t);
}
}
Collections.sort(groceryTransactions, new Comparator(){
public int compare(Transaction t1, Transaction t2){
return t2.getValue().compareTo(t1.getValue());
}
});
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
transactionsIds.add(t.getId());
}
[/code]

Listing 1

[code lang=”java”]
List<Integer> transactionsIds =
transactions.stream()
.filter(t -> t.getType() == Transaction.GROCERY)
.sorted(comparing(Transaction::getValue).reversed())
.map(Transaction::getId)
.collect(toList());
[/code]

Listing 2

Figure 1 描述了Java SE 8的代码。首先,我们使用List上可用的stream()方法从transactions(数据源)列表上取到stream。随后,几个操作(filter,sorted,map,collect)串联起来形成pipeline(管道),pipeline可以看成是对数据查询的一种形式。

Figure 1

可是,如何才能并行执行代码呢?对于Java SE 8来说,这是否容易做到:只要使用parallelStream()替换stream()方法,正如Listing 3所示Streams API内部会分解你的查询,使用你电脑上的多个核心。

[code lang=”java”]</pre>
List<Integer> transactionsIds = transactions.parallelStream()
.filter(t -> t.getType() == Transaction.GROCERY)
.sorted(comparing(Transaction::getValue).reversed())
.map(Transaction::getId)
.collect(toList());
[/code]

Listing 3

不必担忧这段代码是否无法理解。我们会在下一章中继续探究代码是如何工作的。注意到lambda 表达式(例如, t-> t.getCategory() == Transaction.GROCERY),和方法引用(例如,Transaction::getId)的使用。这些概念目前你应该是熟悉的。

现在,已经看到stream作为有效表达的抽象,就像集合数据上的SQL操作。此外,这些操作可以简洁的使用lambda 表达式参数化。

在学习Java SE 8 streams系列文章之后,你应该能够使用Streams API写出类似Listing 3上的代码,表达出强有力的查询。

使用Streams基础
我们先从一些理论开始。一个stream的定义是什么?简短的定义是”从一个支持聚集操作的源上获取的一序列元素”。让我们逐个解释:

序列元素:stream为特定元素类型值集合提供了一个接口。但是,stream并不实际存储元素;元素只在需要的时候被计算。
:Stream从数据提供源上消费数据,源可以是集合、数组、I/O资源等。
聚集操作,Stream支持类SQL的操作,和函数式编程语言的共通操作,例如 filter, map, reduce, find, match, sorted等等。

此外,stream操作有两个基本的特征,使得其和集合操作有极大的不同。

管道:许多stream 操作返回stream自身。这可以让操作串联成一个大的管道。这也使得某些优化技术,例如惰性(laziness)和短路(short-circuiting)得以实现,这些概念我们都会在后面阐释。
内部迭代:与集合相比,集合的迭代是明确地(外部迭代),而stream操作执行的迭代你无法感知到。

让我们重新看看之前的代码来阐述这个概念。Figure 2表述了Listing 2的更多细节。

Figure 2
首先,我们从transactions list上调用stream()获取到stream。数据源是transaction list,并且提供元素序列给stream。接下来,我们使用一系列stream上的聚合操作:filter (使用给定的predicate过滤元素), sorted (使用给定的comparator排序元素), and map (抽取信息)。所有这些操作除了collect之外,都返回stream。所以,这些操作可以串联形成一个管道,管道可以看成是对源查询的视图。

所有的操作只有在调用collect的时候才会执行。collect操作会开始处理管道,返回结果(一些不是stream;例子上是List)。不要太关心collect;我们会在之后的文章中详细阐述。现在,你可以把collect看成一个需要指定如何聚集stream元素汇总成结果的操作。例子中,toList()则描述了需要从Stream转换为List。

在我们阐述stream的方法之前,暂停并回顾一下stream 和collection之间的不同。

Streams Versus Collections

集合与stream在序列元素上所提供接口的新概念,都同时在java上存在。所以,不同的是什么?简而言之,集合是关于数据的,stream是关于计算的。想想存储在DVD上的电影。这就是集合(可能是字节,又可能是帧–这里,我们并不关心),因为其包含所有的数据结构。现在我们想想相同的视频,当视频是互联网上的流的情况。则这个时候就是stream(比特或帧)。视频流播放器只需要下载用户现在观看位置之前的几帧,所以你才可以从流的起始开始播放,在这之前,流里面的数据已经是被计算过了(想象下足球直播流)。

粗略的讲,集合和stream之间的不同则是在处理计算的事情时。集合是一个内存上的数据结构,持有所有的这个数据结构的值–集合上的每个元素在要添加进集合之前都需要被计算。相反,stream概念上是固定的数据结构,流内的每个元素只在需要的时候计算。

使用Collection接口则需要用户来完成迭代(例如,使用称为foreach的增强for循环);这个被叫做外部迭代。

相反,Streams库使用内部迭代–为你执行迭代操作并且在某处维护执行结果;你仅仅只要提供一个函数说我要完成这个。Listing 4里面的的代码(使用集合的外部迭代)和Listing 5(使用stream的内部迭代)则阐述了这点不同。

[code lang=”java”]
List<String> transactionIds = new ArrayList<>();
for(Transaction t: transactions){
transactionIds.add(t.getId());
}
[/code]

Listing 4

[code lang=”java”]
List<Integer> transactionIds =
transactions.stream()
.map(Transaction::getId)
.collect(toList());
[/code]

Listing 5

Listing 4上,我们明确地顺序迭代transactions list,抽取出每个交易ID并添加给聚集器。相反,当使用stream,并没有明确地迭代。Listing 5上的代码建立一个查询,其中map操作参数化为抽取交易ID,collect操作转换结果Stream到List。

到目前为止,你应该明确知道stream是什么,并且你可以使用它。现在,让我们看看stream提供的其他操作,这些操作可以让你表达你自己的数据处理查询。

Stream Operations: Exploiting Streams to Process Data

java.util .stream.Stream中的Stream接口定义了许多操作,主要可以分成两类。正如Figure 1里面的例子,可以看到如下的操作:

filter, sorted, 和map, 这些可以从管道上连接在一起的。
collect 关闭管道并放回结果。

Stream 上可以连接的操作称为中间操作。因为其返回的类型是Stream。关闭stream管道的操作称为结束操作。其从管道上产生结果,例如List,一个整数,甚至是void(任何非stream类型)。

你也许会疑惑这些物质的重要性。当然,中间操作在stream管道上执行结束之前是不会执行;中间操作是惰性的(Lazy),主要是因为中间操作通常是合并的,并且被结束操作处理进通道。

[code lang=”java”]
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
List<Integer> twoEvenSquares =
numbers.stream()
.filter(n -> {
System.out.println("filtering " + n);
return n % 2 == 0;
})
.map(n -> {
System.out.println("mapping " + n);
return n * n;
})
.limit(2)
.collect(toList());
[/code]

Listing 6

例如,看看Listing 6上的代码,计算给定number list上两个偶数的平方:

filtering 1
filtering 2
mapping 2
filtering 3
filtering 4
mapping 4

因为limit(2)使用短路特性;我们需要只处理stream的部分,并非全部地返回结果。这和计算用and串联操作的布尔表达式有点类似:只要一个表达式返回false,我们可以推断出整个表达式返回false,而不用全部计算。这里,limit操作返回大小为2的stream。

当然,filter和map操作合并到相同的通道中。

总结下我们目前学习到的,宏观上处理stream包括这三件事:

一个数据源(例如集合),在数据源上执行的查询
串联的中间操作,这些操作形成stream管道
一个结束操作, 执行stream管道,并且产生结果。

现在,先看看stream上可用的一些操作。查阅java.util .stream.Stream接口获取全部的列表,同样也是这篇文章后面引用的资源。

Filtering. 有几个操作可以用来从stream中过滤元素:
filter(Predicate): 使用predicate (java.util.function.Predicate)作为参数,并返回包含所有匹配给定predict元素的stream。

distinct: 返回一个有唯一元素的stream(根据stream中元素的equals实现)。
limit(n): 返回一个不长于给定大小n的stream。
skip(n): 返回一个丢弃了前面n个元素的stream。

Finding and matching. 一个通常的数据处理模式是决定是否某些元素匹配给定的属性。你可以使用anyMatch,allMatch和noneMatch操作来帮助你完成这些操作。所有这些操作使用Predicate作为参数,返回一个布尔值作为结果(因此,这些是决定式的操作)。例如,你可以使用allMatch检查transaction stream中所有交易额大于100的元素,如 Listing 7所示的。

[code lang=”java”]
boolean expensive = transactions.stream()
.allMatch(t -> t.getValue() > 100);
[/code]

Listing 7

Stream接口提供 findFirst 和findAny操作,用于从stream中取回任意的元素。主要可以用于连接其他的stream操作,例如filter。
findFirst 和findAny返回Optional对象,如Listing 8所示。

[code lang=”java”]
Optional<Transaction> = transactions.stream()
.filter(t -> t.getType() == Transaction.GROCERY)
.findAny();
[/code]

Listing 8

Optional<T>类(java.util .Optional)是一个容器类,用于代表一个值存在或不存在。Listing 8中,findAny可能并不会返回任何grocery类型的交易。

Optional类有一些方法用于测试元素是否存在。例如,如果有交易存在,我们可以选择使用ifPresent方法选择对optional对象上应用操作,如Listing 9(我们只是打印交易)。

[code lang=”java”]
transactions.stream()
.filter(t -> t.getType() == Transaction.GROCERY)
.findAny()
.ifPresent(System.out::println);
[/code]

Listing 9

Mapping. Stream支持map方法,使用function(java.util.function.Function)作为参数用于映射stream中的元素到另外一种形式。function会应用到每一个元素,映射元素到新的元素。

例如,你可能想要从stream的每个元素中抽出信息。Listing 10的例子中,我们从一个list上返回每个词长度的list。Reducing. 目前,我们所见的结束操作返回boolean(allMatch等),void(forEach),或一个Optional对象(findAny等)。并且同样已经使用collect组合所有stream中的元素为List。

[code lang=”java”]
List<String> words = Arrays.asList("Oracle", "Java", "Magazine");
List<Integer> wordLengths =
words.stream()
.map(String::length)
.collect(toList());
[/code]

Listing 10

当然,你同样可以组合stream中的所有元素表述成更复杂的处理请求,例如,最高ID的交易是什么?或计算所有交易额的总数。

这可以使用stream上的reduce操作,这个操作重复地为每个元素应用操作(例如,添加两个数字),直到产生结果。函数式程序中一般称这操作为折叠操作(fold),你可以把这个操作看成是重复地折叠纸张的一部分(你的stream),直到形成一个小正方形,这就是折叠操作的结果。

先看下我们如何使用for循环计算list的和:

[code lang=”java”]
int sum = 0;
for (int x : numbers) {
sum += x;
}
[/code]

Numbers list上的每个元素重复地使用添加操作来产生一个结果。实际上,我们缩小numbers list到一个数值。代码中则有两个参数:sum变量的初始值,例子上为0,和组合所有list元素的操作,例子上为+。

使用stream的reduce方法,我们可以累加所有的stream元素。如 Listing 11所示的。

reduce方法使用两个参数:

[code lang=”java”]
int sum = numbers.stream().reduce(0, (a, b) -> a + b);
[/code]

Listing 11

一个初始值,0

BinaryOperator<T>,用于组合两个元素并产生一个新的值。

reduce方法本质上抽象了重复的应用模式。其他查询例如”计算产品”或”计算最大值(见Listing 12)”则是成为reduce方法的特定例子。

[code lang=”java”]
int product = numbers.stream().reduce(1, (a, b) -> a * b);
int product = numbers.stream().reduce(1, Integer::max);
[/code]

Listing 12

Numeric Streams

现在,已经看过了使用reduce方法用于计算整数stream和的例子。但是,这其中还是有一定的开销:我们执行多次装箱(boxing)操作,重复的在integer对象上求和。如果可以调用一个sum方法,可能会更好一点,正如Listing 13所示,是否更明确我们代码的目的?

[code lang=”java”]
int statement = transactions.stream()
.map(Transaction::getValue)
.sum(); // error since Stream has no sum method
[/code]

Listing 13

Java SE 8 引入3个特定的primitive stream接口用于处理这个问题–IntStream,DoubleStream和LongStream–各自代表stream中的元素是int,double和long。

通常要转换stream到特定版本的stream所执行的方法是mapToInt,mapToDouble和mapToLong。这些方法工作起来完全像是我们之前见到的map方法,不同的是这些方法返回特定的stream而不是Stream<T>。例如,我们可以改进Listing 13的代码,如Listing 14所展示的。你同样可以通过装箱(boxed)操作从primitive stream转换为某个对象stream。

[code lang=”java”]
int statementSum =
transactions.stream()
.mapToInt(Transaction::getValue)
.sum(); // works!
[/code]

Listing 14

最后,另一个numeric streams有用的形式是数字范围(numeric ranges)。例如,你可能想要产生所有1到100之间的数值。Java SE 8则引入了 IntStream, DoubleStream, 和LongStream上可用的2个静态方法辅助产生这样的范围:range和rangeClosed。

这两个方法都使用范围的起始作为首个参数,范围的结束作为第二个参数。range方法是开区间,而rangeClosed是闭区间的。 Listing 15则是一个使用rangeClose方法的例子,返回10到30之间数值的stream。

[code lang=”java”]
IntStream oddNumbers =
IntStream.rangeClosed(10, 30)
.filter(n -> n % 2 == 1);
[/code]

Listing 15

Building Streams

有几种方式用于构建stream。我们已经看到如何从集合上获取到stream。同样,我也使用了number stream。你同样可以从值、数组或文件上建立stream。此外甚至可以从一个函数上获取stream 来产生无限的stream。

从值或从数组上建立stream十分简单:只要为值调用Stream.of的静态方法和为数组调用Arrays.stream生成。如 Listing 16所示。

[code lang=”java”]
Stream<Integer> numbersFromValues = Stream.of(1, 2, 3, 4);
int[] numbers = {1, 2, 3, 4};
IntStream numbersFromArray = Arrays.stream(numbers);
[/code]

Listing 16

同样也可以使用Files.lines静态方法将文件转换为一个stream。例如,Listing 17计算文件中的行数。

[code lang=”java”]
long numberOfLines =
     Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset())
         .count();
[/code]

Listing 17

Infinite streams. 最后,在我们结束关于stream的这篇文章之前,还有一个令人兴奋的概念。到目前为止,应该理解stream内的元素是按需产生的。这里有两个静态方法–Stream.iterate 和 Stream.generate可以从函数上建立stream。然而,由于元素是按需计算的,这两个操作可以一直产生元素。这就是为什么称为 infinite stream:没有固定大小的stream,与我们从固定集合建立的流相比。

Listing 18 是使用iterate的例子,创建一个所有10倍数的数字stream。Iterate方法使用一个初始值(例子上是,0)和一个用于连续地产生每个新值的lambda(类型为UnaryOperator<T>)。

[code lang=”java”]
Stream<Integer> numbers = Stream.iterate(0, n -> n + 10);
[/code]

Listing 18
我们可以把这个无限的stream转换成固定大小的stream,通过使用limit操作。例如,我们可以限制stream的大小为5,如Listing 19所示。

[code lang=”java”]
numbers.limit(5).forEach(System.out::println); // 0, 10, 20, 30, 40
[/code]

Listing 19

Conclusion

Java SE 8 引入的stream API,可以让我们表达更复杂的数据处理逻辑。本文中,你已经看到stream支持许多方法,例如filter,map,reduce和iterate,这些方法组合可以写出简洁的代码并表达数据处理查询。这种新的代码编写方式与Java SE8 之前你要处理的集合十分的不同。显然,这有许多好处。首先,Stream API使用了许多技术,例如惰性和短路来优化数据处理查询。其次,stream可以是并行自动地使用多核心架构。本系列的下一章节中,我们会表述更高级的操作,例如flatMap和collect。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 使用stream操作表达更高级的数据处理请求, Part 1

  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

return top