JAVA8 stream 中Spliterator的使用(一)

java8 stream大家用的比较多,但是发现,其实stream的底层构造函数中,还需要传入Spliterator。查了一下,竟然发现网上对这个类讲的几乎没有。唯一一篇文章写得Spliterator使用有问题的,其实他的并行流是没有用到的。因为

[code lang=”text”]
for (int pos = currentSize/2 + currentSize; pos < str.length(); pos++){
…..
[/code]

这段逻辑没有执行 , pos < str.length() 为false
因此,这里面的Spliterator返回的是null,返回null说明不用进行分割,因此原文中的代码也就是单线程的,并没有用到多线程。

以此问中的例子,给出两种写法,两种写法,都可以使用到多线程。

写stream的代码越复杂,对技术的要求其实是越高的。需要对递归,分治有一定的理解,不然无法有效的进行stream代码的debug。

来看下Spliterator接口方法,需要实现以下一些方法:

  • boolean tryAdvance(Consumer action); 该方法会处理每个元素,如果没有元素处理,则应该返回false,否则返回true。
  • default void forEachRemaining(Consumer action) 该方法有默认实现,功能后面会介绍。
  • Spliterator trySplit(); 将一个Spliterator分割成多个Spliterator。分割的Spliterator被用于每个子线程进行处理,从而达到并发处理的效果。
  • long estimateSize(); 该方法返回值并不会对代码正确性产生影响,但是会影响代码的执行线程数,后续会介绍一下
  • int characteristics(); 给出stream流具有的特性,不同的特性,不仅是会对流的计算有优化作用,更可能对计算结果会产生影响,后续会稍作介绍。
  • default Comparator getComparator() 对sorted的流,给出比较器。后续给出研究代码。

1.先来看下forEachRemaining的实现。

[code lang=”text”]
default void forEachRemaining(Consumer&amp;amp;lt;? super T&amp;amp;gt; action) {
do { } while (tryAdvance(action));
}
[/code]

该方法循环遍历调用tryAdvance方法,直到返回false。因为tryAdvance是必须实现的方法,因此重写forEachRemaining
只有对优化代码有作用,无法做到不写tryAdvance方法实现。
2.estimateSize的使用场景场景:
在estimateSize处打断点,跟踪线程栈信息,可以看出estimateSize在这里用到:

[code]
java.util.stream.AbstractTask&amp;amp;lt;P_IN, P_OUT, R, K&amp;amp;gt;的
public void compute() {
Spliterator&amp;amp;lt;P_IN&amp;amp;gt; rs = spliterator, ls; // right, left spliterators
long sizeEstimate = rs.estimateSize();
long sizeThreshold = getTargetSize(sizeEstimate);
boolean forkRight = false;
@SuppressWarnings(&amp;amp;quot;unchecked&amp;amp;quot;) K task = (K) this;
while (sizeEstimate &amp;lt; sizeThreshold &amp;amp; (ls = rs.trySplit()) != null) {
K leftChild, rightChild, taskToFork;
task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
task.setPendingCount(1);
if (forkRight) {
[/code]

如果sizeEstimate < sizeThreshold, 则线程是不会再调用trySplit()方法,则就不会再细分子线程了。
可以将estimateSize返回结果固定为1,将只会用到主线程在跑任务,没有子线程。

当Spliterator的trySplit返回null的时候,说明当前这段分割不能再进行分割了,就会调用到
forEachRemaining方法。

仿照oricle源码示例,即可写出示例代码。该示例代码较好,原因是通过f/j的代码,大致阐述了
stream底层使用Spliterator的方式,是如何使用Spliterator中各个接口的。

这里面需要说明的是,tryAdvance方法中的Consumer.accept调用,最终将调用到reduce 操作的ccumulate方法。
也就是说,我们看到Consumer.accept返回一个void,其实就是对变量T做一个操作。这个操作将直接影响到stream的内部状态,但是不会有返回值。

附上分别两种方式实现的源码:

[code lang=”text”]
公用类:
public class NumCounter {
private int num;
private int sum;
// 是否当前是个完整的数字
private boolean isWholeNum;

public NumCounter(int num, int sum, boolean isWholeNum) {
this.num = num;
this.sum = sum;
this.isWholeNum = isWholeNum;
}

public NumCounter accumulate(Character c){
System.out.println(Thread.currentThread().getName());
if (Character.isDigit(c)){
return isWholeNum ? new NumCounter(Integer.parseInt("" + c), sum, false) : new NumCounter(Integer.parseInt("" + num + c), sum, false);
}else {
return new NumCounter(0, sum + num, true);
}
}

public NumCounter combine(NumCounter numCounter){
return new NumCounter(0, this.getSum() + numCounter.getSum(), numCounter.isWholeNum);
}

public int getSum() {
return sum + num;
}
}
[/code]

方法1:

[code lang=”text”]
NumCounterSpliterator

public class NumCounterSpliterator implements Spliterator&amp;lt;Character&amp;gt; {

private String str;
private int currentChar = 0;
private boolean canSplit = true;

public NumCounterSpliterator(int currentChar,String str,boolean canSplit) {
this.str = str;
this.currentChar = currentChar;
this.canSplit = canSplit;
}

public void forEachRemaining(Consumer&amp;lt;? super Character&amp;gt; action) {
do {
} while (tryAdvance(action));
}

@Override
public boolean tryAdvance(Consumer&amp;lt;? super Character&amp;gt; action) {
if(str.equals(&amp;amp;quot;&amp;amp;quot;)){
return false;
}
action.accept(str.charAt(currentChar++));
return currentChar &amp;amp;lt; str.length();
}

@Override
public Spliterator&amp;lt;Character&amp;gt; trySplit() {
int i = currentChar;
for(;canSplit &amp;amp;amp;&amp;amp;amp; i &amp;amp;lt; str.length(); ++i){

//第一个不是数字的pos,进行分割
if(!Character.isDigit(str.charAt(i))){
String str1 = str;
this.str = str1.substring(currentChar, i);
canSplit = false;
if(i + 1 &amp;amp;lt; str1.length()){
return new NumCounterSpliterator(0,str1.substring(i+1, str1.length()),true);
}else{
return null;
}
}
}

canSplit = false;
return null;
}

@Override
public long estimateSize() {
return str.length() – currentChar;
}

@Override
public int characteristics() {
return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE;
}
}

public class NumCounterTest {
public static void main(String[] args) {
String arr = &amp;amp;quot;12%3 21sdas s34d dfsdz45 R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;amp;quot;;
Spliterator&lt;Character&gt; stream = IntStream.range(0, arr.length()).mapToObj(arr::charAt);
System.out.println("ordered total: " + countNum(stream));

Spliterator&lt;Character&gt; spliterator = new NumCounterSpliterator(0,arr,true);
// 传入true表示是并行流
Stream&lt;Character&gt; parallelStream = StreamSupport.stream(spliterator, true);
System.out.println("parallel total: " + countNum(parallelStream));
}

private static int countNum(Stream&lt;Character&gt; stream){
NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine);
return numCounter.getSum();
}
}
[/code]

该方法使用的是string,string在不同子线程间传递时候,采用了substring方法,效率不高。
方法二,改为char数组:

[code lang=”text”]
public class NumCounterSpliterator2 implements Spliterator&lt;Character&gt; {

private char[] str;
private int currentChar = 0;
private int end = Integer.MAX_VALUE;
private boolean canSplit = true;

public NumCounterSpliterator2(int currentChar,int end,char[] str,boolean canSplit) {
this.str = str;
this.currentChar = currentChar;
this.canSplit = canSplit;
this.end = end;
}

@Override
public boolean tryAdvance(Consumer&amp;amp;lt;? super Character&amp;amp;gt; action) {
action.accept( str[currentChar++] );
return currentChar &amp;amp;lt; end;
}

@Override
public Spliterator<Character> trySplit() {
int i = currentChar;
for(;canSplit ; i < end; ++i){
if(!Character.isDigit(str[i])){
int splitBeforeEnd = end;
end = i ;
canSplit = false;
if(i + 1 &amp;amp;lt; splitBeforeEnd){
return new NumCounterSpliterator2(i+1,splitBeforeEnd,str,true);
}else{
return null;
}
}
}

canSplit = false;
return null;
}

@Override
public long estimateSize() {
return end – currentChar;
}

@Override
public int characteristics() {
return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE;
}
}

public class NumCounterTest2 {
public static void main(String[] args) {
String arr = &amp;amp;quot;12%3 21sdas s34d dfsdz45 R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;amp;quot;;

Spliterator&amp;amp;lt;Character&amp;amp;gt; spliterator = new NumCounterSpliterator2(0,arr.length(),arr.toCharArray(),true);
// 传入true表示是并行流
Stream&amp;amp;lt;Character&amp;amp;gt; parallelStream = StreamSupport.stream(spliterator, true);
System.out.println(&amp;amp;quot;parallel total: &amp;amp;quot; + countNum(parallelStream));
}

private static int countNum(Stream&amp;amp;lt;Character&amp;amp;gt; stream){
NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine);
return numCounter.getSum();
}
}
[/code]

感谢您看到这里,但是不幸的是该代码并不是最合适的代码,会有一些问题。请移步到java8 stream 中Spliterator的使用(二) 更深入的讨论。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: JAVA8 stream 中Spliterator的使用(一)

  • Trackback 关闭
  • 评论 (2)
    • qwop69
    • 2017/10/27 9:57上午

    &lt; 没有被转义;本地看了jquery 文件404 ,估计是被墙了

    • 黑烨
    • 2018/03/15 3:03下午

    能把&amp;quot;&amp;quot; 这些转移就更好了

return top