JAVA8 stream 中Spliterator的使用(二)

JAVA8 stream 中Spliterator的使用(一)给出了Spliterator的两种使用,但是遗憾的是,代码并不正确。这篇说明下原因,并对Spliterator进行更深入的分析。

  1. 首先来看下sorted方法,将代码调用countNum处注释掉,改为如下方法:
parallelStream.sorted().forEach(System.out::print);

代码将报错。

Exception in thread "main" java.lang.NullPointerException
    at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
    at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:1)

    at java.util.TimSort.binarySort(TimSort.java:296)
    at java.util.TimSort.sort(TimSort.java:239)
    at java.util.Arrays.parallelSort(Arrays.java:1113)
    at java.util.stream.SortedOps$OfRef.opEvaluateParallel(SortedOps.java:158)
    at java.util.stream.AbstractPipeline.opEvaluateParallelLazy(AbstractPipeline.java:704)
    at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at T2.main(T2.java:13)

跟进代码中看,在Comparators处打断点进行调试(默认jdk的source包是不能打断点的,需要自己重新打包出含有调试信息的source包),在

java.util.stream.SortedOps.OfRef<T>类的T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);

方法中,解析出的flattenedData数组中有null的元素。跟进helper.evaluate方法,如果是并行流,在

java.util.stream.Nodes.collect(PipelineHelper<P_OUT>, Spliterator<P_IN>, boolean, IntFunction<P_OUT[]>)

方法中会首先初始化一个全null的数组。后面的逻辑是将数据中的元素根据Spliterator分割后的各元素插入到这个数组里面。

问题就出在(一)中的第二个Spliterator修改了原始的char[]数组的内容,因为在trySplit方法中,将char[]数组中不是数字的char给忽略了,trySplit分割后的两个Spliterator都没有处理非数字的char,这样会导致上文中全null数组中在非数字index的位置没有被元素填充,导致在sorted比较的时候报出空指针错误。

这里暴露的问题说明,在编写Spliterator的时候,不能修改stream的元素内容,这和stream不可修改性也是一脉相承的。
2. 修改代码,改成在trySplit方法中将非数字的char划归到分割后的第一个Spliterator中。
运行代码,报如下错误

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
    at java.util.stream.SortedOps$SizedRefSortingSink.accept(SortedOps.java:364)
    at NumCounterSpliterator2.tryAdvance(NumCounterSpliterator2.java:24)
    at java.util.Spliterator.forEachRemaining(Spliterator.java:326)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:1)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at NumCounterTest2.main(NumCounterTest2.java:32)

跟进源码

    java.util.stream.Nodes.SizedCollectorTask.OfRef.accept(P_OUT)中
            public void accept(P_OUT value) {
                if (index >= fence) {
                    throw new IndexOutOfBoundsException(Integer.toString(index));
                }
                array[index++] = value;
            }

发现stream的sorted实现时,会根据estimateSize返回的值赋值给fence,如果进行排序比较的元素的index值超过estimateSize返回值,就会抛出异常。因此,在sorted使用过程中,estimateSize方法并不是一个可以随意返回值的。

修改方式有两种,另一种修改方式在后面将stream的characteristics参数时介绍。
1.将estimateSize方法改成准确的计算方式即可:
@Override
public long estimateSize() {
return end – currentChar + 1;
}

编码过程中还发现一个小问题,用parallel并行stream的时候,遍历元素是需要采用forEachOrdered而不是forEach方法,具体可以参见【1】

3. stream的characteristics参数介绍

基本参数使用,大家可以参见源码注释,这里介绍下一些注意的地方:
3.1 java.util.Spliterator.DISTINCT 属性 表明该stream已经是distinct的了,因此,如果Spliterator含有此属性,则在stream.distinct()调用的时候,是直接输出该stream的,也就是distinct方法不进行通常意义上的唯一性过滤。
举例:
将文末示例代码中的characteristics方法返回值,加入DISTINCT属性。即:

    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE |DISTINCT;
    }

运行如下代码:

        String arr = "123123";
        System.out.println(arr);

        Spliterator<Character> spliterator = new NumCounterSpliterator2(0,arr.length()-1,arr.toCharArray(),true);
        // 传入true表示是并行流
        Stream<Character> parallelStream = StreamSupport.stream(spliterator, true);

        parallelStream.distinct().forEach(System.out::print);

结果输出:
123123
123123

可见并未做唯一性处理。如果去掉distinct属性,则输出结果:
123123
132

3.2 SORTED属性

3.2.1. SORTED属性和getComparator一起使用。如果该流已经按照默认字典序(natural order)排序好了,则返回null。
如果将Spliterator的getComparator返回null,并且设置SORTED属性,则sorted()方法直接返回原stream流,不会做任何排序,原因和distinct相同,因为此流已经排好序了。

如果stream是非并行流,则返回直接和原stream流相同。如果是并行流,注意,因为并行流是会被trysplit处理的,每个分割后的Spliterator是sorted的。因为流属性已经是sorted并且返回的getComparator是null,已经是排好序的了,因此每个子线程分割后的Spliterator直接输出即可。

但是这里注意,stream本质上底层是f/j代码,而f/j分割时候,是基于trySplit进行分割的。查看了java.util.stream.Streams.RangeIntSpliterator源码后发现,trySplit的分割是需要从[begin,end]返回一个以begin 为开始的Spliterator,例如分割为[begin,end1],将当前Spliterator的begin修改为end1+1,即分割为[end1+1,end].
原因应该是f/j的子线程fork和join有关,因为我们直到fork和join应该是相反序来写的。例如:

f1.fork();
f2.fork();
f2.join();
f1.join();

因此,从f/j的多线程栈来说,f2 在 f1的上面,f2.join会导致f2先执行。return的[begin,end1]保证了先执行,
而f1的[end1+1,end] 任务后执行,这样才是以encounter order顺序执行的并发。
因此,代码中trysplit应该这么写:

    public Spliterator&lt;Character&gt; trySplit() {
        int i = currentChar;
        int currentCharOld = currentChar;
        for(;canSplit &amp;&amp; i &lt;= end; ++i){
            if(!Character.isDigit(str[i])){
                int splitBeforeEnd = end;
                canSplit = false;
                if(i + 1 &lt;= splitBeforeEnd){
                    currentChar = i + 1;
                    return new NumCounterSpliterator3(currentCharOld,i,str,true);
                }else{
                    return null;
                }
            }
        }
        canSplit = false;
        return null;
    }

而不能写成

    @Override
    public Spliterator&lt;Character&gt; trySplit() {
        int i = currentChar;
        for(;canSplit &amp;&amp; i &lt;= end; ++i){
            if(!Character.isDigit(str[i])){
                int splitBeforeEnd = end;
                end = i ;
                canSplit = false;
                if(i + 1 &lt;= splitBeforeEnd){
                    return new NumCounterSpliterator2(i+1,splitBeforeEnd,str,true);
                }else{
                    return null;
                }
            }
        }

        canSplit = false;
        return null;
    }

。当然如上文说,如果不是并行流,不涉及trysplit方法,则getComparator返回null,直接就返回的原始流

3.2.2 3.2.1讲了如果返回的是null的情况,那么如果返回的不是null呢?很不幸,那设置了sorted和没设置没有任何区别,
即使你用hasCharacteristics(Spliterator.SORTED)方法,的确返回true。
为什么?
看下源码:

java.util.stream.StreamOpFlag.fromCharacteristics(Spliterator)
        if ((characteristics &amp; Spliterator.SORTED) != 0 &amp;&amp; spliterator.getComparator() != null) {
            // Do not propagate the SORTED characteristic if it does not correspond
            // to a natural sort order
            return characteristics &amp; SPLITERATOR_CHARACTERISTICS_MASK &amp; ~Spliterator.SORTED;
        }
        else {
            return characteristics &amp; SPLITERATOR_CHARACTERISTICS_MASK;
        }

如果具有SORTED属性,同时getComparator()返回的不为null,则& ~Spliterator.SORTED会将sorted属性抹去,
则此stream不具有sorted属性。不具有sorted属性,则stream的sorted方法,就直接按照字典序排序了。

3.2.3 sorted()方法还有一个可以传入Comparator的重写方法,如果使用了传入Comparator的sorted方法,则以这个
Comparator进行排序,和原stream是否具有sorted属性无关。
源码如下:

java.util.stream.SortedOps.OfRef
        OfRef(AbstractPipeline&lt;?, T, ?&gt; upstream) {
            super(upstream, StreamShape.REFERENCE,
                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
            this.isNaturalSort = true;
            // Will throw CCE when we try to sort if T is not Comparable
            @SuppressWarnings(&quot;unchecked&quot;)
            Comparator&lt;? super T&gt; comp = (Comparator&lt;? super T&gt;) Comparator.naturalOrder();
            this.comparator = comp;
        }

        /**
         * Sort using the provided comparator.
         *
         * @param comparator The comparator to be used to evaluate ordering.
         */
        OfRef(AbstractPipeline&lt;?, T, ?&gt; upstream, Comparator&lt;? super T&gt; comparator) {
            super(upstream, StreamShape.REFERENCE,
                  StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
            this.isNaturalSort = false;
            this.comparator = Objects.requireNonNull(comparator);
        }

3.2.3 SIZED | SUBSIZED属性

SIZED | SUBSIZED可以和estimateSize返回Long.MAX_VALUE一起配合使用。
如果stream没有SIZED | SUBSIZED属性,则可以将estimateSize返回为Long.MAX_VALUE.这说明这个stream
的estimateSize计算很复杂或本身就是一个infinite的steam流。这样设置后,性能上会差一些,但是,不会对sorted
方法产生影响。2中提到的错误,也可也用这种方法处理。

最后附上全部代码:

NumCounterSpliterator3

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public class NumCounterSpliterator3 implements Spliterator<Character> {

    private char[] str;
    private int currentChar = 0;
    private int end = Integer.MAX_VALUE;
    private boolean canSplit = true;

    public NumCounterSpliterator3(int currentChar,int end,char[] str,boolean canSplit) {
        this.str = str;
        this.currentChar = currentChar;
        this.canSplit = canSplit;
        this.end = end;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        action.accept( str[currentChar++] );
        return currentChar <= end;
    }

    @Override
    public Spliterator<Character> trySplit() {
        int i = currentChar;
        int currentCharOld = currentChar;
        for(;canSplit && i <= end; ++i){
            if(!Character.isDigit(str[i])){
                int splitBeforeEnd = end;
                canSplit = false;
                if(i + 1 &lt;= splitBeforeEnd){
                    currentChar = i + 1;
                    return new NumCounterSpliterator3(currentCharOld,i,str,true);
                }else{
                    return null;
                }
            }
        }

        canSplit = false;
        return null;
    }

    @Override
    public long estimateSize() {
        return end - currentChar + 1 /*Long.MAX_VALUE*/ ;
    }

    public Comparator<? super Character> getComparator() {
        return null;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE /*|SORTED*/;
    }
}

NumCounterTest2

import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class NumCounterTest2 {
    public static void main(String[] args) {
        String arr = "12%3 21sdas s34d dfsdz45   R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd";
        System.out.println(arr);

        Spliterator<Character> spliterator = new NumCounterSpliterator3(0,arr.length()-1,arr.toCharArray(),true);
        // 传入true表示是并行流
        Stream<Character> parallelStream = StreamSupport.stream(spliterator, true);
        System.out.println("parallel total: " + countNum(parallelStream));
        }

    private static int countNum(Stream<Character> stream){
        NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine);
        return numCounter.getSum();
    }
}

【1】 java 8 parallelStream() with sorted()

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: JAVA8 stream 中Spliterator的使用(二)



章 筱虎

章筱虎,中国移动技术主管。
github:https://github.com/xiaohu-zhang/

Latest posts by 章 筱虎 (see all)

FavoriteLoading添加本文到我的收藏
  • Trackback are closed
  • Comments (0)
  1. No comments yet.

You must be logged in to post a comment.

return top