JAVA8 stream 中Spliterator的使用(二)

JAVA8 stream 中Spliterator的使用(一)给出了Spliterator的两种使用,但是遗憾的是,代码并不正确。这篇说明下原因,并对Spliterator进行更深入的分析。

  1. 首先来看下sorted方法,将代码调用countNum处注释掉,改为如下方法:

[code lang=”text”]
parallelStream.sorted().forEach(System.out::print);
[/code]

代码将报错。

[code lang=”text”]
Exception in thread "main" java.lang.NullPointerException
at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:1)

at java.util.TimSort.binarySort(TimSort.java:296)
at java.util.TimSort.sort(TimSort.java:239)
at java.util.Arrays.parallelSort(Arrays.java:1113)
at java.util.stream.SortedOps$OfRef.opEvaluateParallel(SortedOps.java:158)
at java.util.stream.AbstractPipeline.opEvaluateParallelLazy(AbstractPipeline.java:704)
at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at T2.main(T2.java:13)
[/code]

跟进代码中看,在Comparators处打断点进行调试(默认jdk的source包是不能打断点的,需要自己重新打包出含有调试信息的source包),在

[code lang=”text”]
java.util.stream.SortedOps.OfRef<T>类的T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
[/code]

方法中,解析出的flattenedData数组中有null的元素。跟进helper.evaluate方法,如果是并行流,在

[code lang=”text”]
java.util.stream.Nodes.collect(PipelineHelper<P_OUT>, Spliterator<P_IN>, boolean, IntFunction<P_OUT[]>)
[/code]

方法中会首先初始化一个全null的数组。后面的逻辑是将数据中的元素根据Spliterator分割后的各元素插入到这个数组里面。

问题就出在(一)中的第二个Spliterator修改了原始的char[]数组的内容,因为在trySplit方法中,将char[]数组中不是数字的char给忽略了,trySplit分割后的两个Spliterator都没有处理非数字的char,这样会导致上文中全null数组中在非数字index的位置没有被元素填充,导致在sorted比较的时候报出空指针错误。

这里暴露的问题说明,在编写Spliterator的时候,不能修改stream的元素内容,这和stream不可修改性也是一脉相承的。
2. 修改代码,改成在trySplit方法中将非数字的char划归到分割后的第一个Spliterator中。
运行代码,报如下错误

[code lang=”text”]
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
at java.util.stream.SortedOps$SizedRefSortingSink.accept(SortedOps.java:364)
at NumCounterSpliterator2.tryAdvance(NumCounterSpliterator2.java:24)
at java.util.Spliterator.forEachRemaining(Spliterator.java:326)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:1)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at NumCounterTest2.main(NumCounterTest2.java:32)
[/code]

跟进源码

[code lang=”text”]
java.util.stream.Nodes.SizedCollectorTask.OfRef.accept(P_OUT)中
public void accept(P_OUT value) {
if (index >= fence) {
throw new IndexOutOfBoundsException(Integer.toString(index));
}
array[index++] = value;
}
[/code]

发现stream的sorted实现时,会根据estimateSize返回的值赋值给fence,如果进行排序比较的元素的index值超过estimateSize返回值,就会抛出异常。因此,在sorted使用过程中,estimateSize方法并不是一个可以随意返回值的。

修改方式有两种,另一种修改方式在后面将stream的characteristics参数时介绍。
1.将estimateSize方法改成准确的计算方式即可:
@Override
public long estimateSize() {
return end – currentChar + 1;
}

编码过程中还发现一个小问题,用parallel并行stream的时候,遍历元素是需要采用forEachOrdered而不是forEach方法,具体可以参见【1】

3. stream的characteristics参数介绍

基本参数使用,大家可以参见源码注释,这里介绍下一些注意的地方:
3.1 java.util.Spliterator.DISTINCT 属性 表明该stream已经是distinct的了,因此,如果Spliterator含有此属性,则在stream.distinct()调用的时候,是直接输出该stream的,也就是distinct方法不进行通常意义上的唯一性过滤。
举例:
将文末示例代码中的characteristics方法返回值,加入DISTINCT属性。即:

[code lang=”text”]
public int characteristics() {
return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE |DISTINCT;
}
[/code]

运行如下代码:

[code lang=”text”]
String arr = "123123";
System.out.println(arr);

Spliterator<Character> spliterator = new NumCounterSpliterator2(0,arr.length()-1,arr.toCharArray(),true);
// 传入true表示是并行流
Stream<Character> parallelStream = StreamSupport.stream(spliterator, true);

parallelStream.distinct().forEach(System.out::print);
[/code]

结果输出:
123123
123123

可见并未做唯一性处理。如果去掉distinct属性,则输出结果:
123123
132

3.2 SORTED属性

3.2.1. SORTED属性和getComparator一起使用。如果该流已经按照默认字典序(natural order)排序好了,则返回null。
如果将Spliterator的getComparator返回null,并且设置SORTED属性,则sorted()方法直接返回原stream流,不会做任何排序,原因和distinct相同,因为此流已经排好序了。

如果stream是非并行流,则返回直接和原stream流相同。如果是并行流,注意,因为并行流是会被trysplit处理的,每个分割后的Spliterator是sorted的。因为流属性已经是sorted并且返回的getComparator是null,已经是排好序的了,因此每个子线程分割后的Spliterator直接输出即可。

但是这里注意,stream本质上底层是f/j代码,而f/j分割时候,是基于trySplit进行分割的。查看了java.util.stream.Streams.RangeIntSpliterator源码后发现,trySplit的分割是需要从[begin,end]返回一个以begin 为开始的Spliterator,例如分割为[begin,end1],将当前Spliterator的begin修改为end1+1,即分割为[end1+1,end].
原因应该是f/j的子线程fork和join有关,因为我们直到fork和join应该是相反序来写的。例如:

[code lang=”text”]
f1.fork();
f2.fork();
f2.join();
f1.join();
[/code]

因此,从f/j的多线程栈来说,f2 在 f1的上面,f2.join会导致f2先执行。return的[begin,end1]保证了先执行,
而f1的[end1+1,end] 任务后执行,这样才是以encounter order顺序执行的并发。
因此,代码中trysplit应该这么写:

[code lang=”text”]
public Spliterator&lt;Character&gt; trySplit() {
int i = currentChar;
int currentCharOld = currentChar;
for(;canSplit &amp;&amp; i &lt;= end; ++i){
if(!Character.isDigit(str[i])){
int splitBeforeEnd = end;
canSplit = false;
if(i + 1 &lt;= splitBeforeEnd){
currentChar = i + 1;
return new NumCounterSpliterator3(currentCharOld,i,str,true);
}else{
return null;
}
}
}
canSplit = false;
return null;
}
[/code]

而不能写成

[code lang=”text”]
@Override
public Spliterator&lt;Character&gt; trySplit() {
int i = currentChar;
for(;canSplit &amp;&amp; i &lt;= end; ++i){
if(!Character.isDigit(str[i])){
int splitBeforeEnd = end;
end = i ;
canSplit = false;
if(i + 1 &lt;= splitBeforeEnd){
return new NumCounterSpliterator2(i+1,splitBeforeEnd,str,true);
}else{
return null;
}
}
}

canSplit = false;
return null;
}
[/code]

。当然如上文说,如果不是并行流,不涉及trysplit方法,则getComparator返回null,直接就返回的原始流

3.2.2 3.2.1讲了如果返回的是null的情况,那么如果返回的不是null呢?很不幸,那设置了sorted和没设置没有任何区别,
即使你用hasCharacteristics(Spliterator.SORTED)方法,的确返回true。
为什么?
看下源码:

[code lang=”text”]
java.util.stream.StreamOpFlag.fromCharacteristics(Spliterator)
if ((characteristics &amp; Spliterator.SORTED) != 0 &amp;&amp; spliterator.getComparator() != null) {
// Do not propagate the SORTED characteristic if it does not correspond
// to a natural sort order
return characteristics &amp; SPLITERATOR_CHARACTERISTICS_MASK &amp; ~Spliterator.SORTED;
}
else {
return characteristics &amp; SPLITERATOR_CHARACTERISTICS_MASK;
}
[/code]

如果具有SORTED属性,同时getComparator()返回的不为null,则& ~Spliterator.SORTED会将sorted属性抹去,
则此stream不具有sorted属性。不具有sorted属性,则stream的sorted方法,就直接按照字典序排序了。

3.2.3 sorted()方法还有一个可以传入Comparator的重写方法,如果使用了传入Comparator的sorted方法,则以这个
Comparator进行排序,和原stream是否具有sorted属性无关。
源码如下:

[code lang=”text”]
java.util.stream.SortedOps.OfRef
OfRef(AbstractPipeline&lt;?, T, ?&gt; upstream) {
super(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
this.isNaturalSort = true;
// Will throw CCE when we try to sort if T is not Comparable
@SuppressWarnings(&quot;unchecked&quot;)
Comparator&lt;? super T&gt; comp = (Comparator&lt;? super T&gt;) Comparator.naturalOrder();
this.comparator = comp;
}

/**
* Sort using the provided comparator.
*
* @param comparator The comparator to be used to evaluate ordering.
*/
OfRef(AbstractPipeline&lt;?, T, ?&gt; upstream, Comparator&lt;? super T&gt; comparator) {
super(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
this.isNaturalSort = false;
this.comparator = Objects.requireNonNull(comparator);
}
[/code]

3.2.3 SIZED | SUBSIZED属性

SIZED | SUBSIZED可以和estimateSize返回Long.MAX_VALUE一起配合使用。
如果stream没有SIZED | SUBSIZED属性,则可以将estimateSize返回为Long.MAX_VALUE.这说明这个stream
的estimateSize计算很复杂或本身就是一个infinite的steam流。这样设置后,性能上会差一些,但是,不会对sorted
方法产生影响。2中提到的错误,也可也用这种方法处理。

最后附上全部代码:

[code lang=”text”]
NumCounterSpliterator3

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public class NumCounterSpliterator3 implements Spliterator<Character> {

private char[] str;
private int currentChar = 0;
private int end = Integer.MAX_VALUE;
private boolean canSplit = true;

public NumCounterSpliterator3(int currentChar,int end,char[] str,boolean canSplit) {
this.str = str;
this.currentChar = currentChar;
this.canSplit = canSplit;
this.end = end;
}

@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept( str[currentChar++] );
return currentChar <= end;
}

@Override
public Spliterator<Character> trySplit() {
int i = currentChar;
int currentCharOld = currentChar;
for(;canSplit && i <= end; ++i){
if(!Character.isDigit(str[i])){
int splitBeforeEnd = end;
canSplit = false;
if(i + 1 &lt;= splitBeforeEnd){
currentChar = i + 1;
return new NumCounterSpliterator3(currentCharOld,i,str,true);
}else{
return null;
}
}
}

canSplit = false;
return null;
}

@Override
public long estimateSize() {
return end – currentChar + 1 /*Long.MAX_VALUE*/ ;
}

public Comparator<? super Character> getComparator() {
return null;
}

@Override
public int characteristics() {
return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE /*|SORTED*/;
}
}

NumCounterTest2

import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class NumCounterTest2 {
public static void main(String[] args) {
String arr = "12%3 21sdas s34d dfsdz45 R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd";
System.out.println(arr);

Spliterator<Character> spliterator = new NumCounterSpliterator3(0,arr.length()-1,arr.toCharArray(),true);
// 传入true表示是并行流
Stream<Character> parallelStream = StreamSupport.stream(spliterator, true);
System.out.println("parallel total: " + countNum(parallelStream));
}

private static int countNum(Stream<Character> stream){
NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine);
return numCounter.getSum();
}
}
[/code]

【1】 java 8 parallelStream() with sorted()

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: JAVA8 stream 中Spliterator的使用(二)

  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

return top