Java8 Stream源码精讲中间操作原理详解

2022年6月4日10:16:41

通过分析创建Stream的过程,详细介绍了Spliterator接口定义,Spliterator子类的实现细节,Spliterator在Stream中的调用时机,以及代表源阶段Stream的Head类结构。本章将继续带着大家深入理解什么是Stream中间操作,进入每一个中间操作的源码了解我们定义的lambda表达式是如何在流上处理数据的。

中间操作

Stream是惰性流,中间操作只是将lambda表达式记录下来,返回一个新的Stream,只有终止操作被调用时,才会触发计算。这样可以保证数据被尽量少的遍历,这也是Stream高效的原因之一。中间操作分为无状态操作和有状态操作,无状态操作不会存储元素的中间状态,有状态操作会保存元素中间状态。

我看到有这样的说法:无状态操作是指元素的处理不受之前元素的影响;有状态操作是指该操作只有拿到所有元素之后才能继续下去。实际上真的是这样吗?我们后面会通过分析源码寻找答案。现在先来看看中间操作的划分吧:

操作分类 方法
无状态操作 filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek() unordered()
有状态操作 distinct() sorted() limit() skip()

无状态操作

Java8 Stream源码精讲(一):从一个简单的例子入手 中提到,map()和filter()中间操作被调用之后,返回的是一个StatelessOp匿名子类的实例。通过类继承结构可以看到,它跟Head一样都是继承ReferencePipeline,不同的是它是一个抽象类,所以具体的逻辑还是放在子类中的。

实际上StatelessOp就代表无状态中间操作,它将操作声明的lambda函数保存在某个地方,在适当的时机调用。

abstract static class StatelessOp<E_IN, E_OUT>
        extends ReferencePipeline<E_IN, E_OUT> {
    
    //传入上一个阶段的Stream,构建双向链表
    StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                StreamShape inputShape,
                int opFlags) {
        super(upstream, opFlags);
        assert upstream.getOutputShape() == inputShape;
    }

    //标识当前是一个无状态操作
    @Override
    final boolean opIsStateful() {
        return false;
    }
}
复制代码

可以看到StatelessOp并没有多少内容,只是可以通过构造器与上一个Stream连接成一个链表,然后实现了opIsStateful()方法。还得看一下它的父类的父类AbstractPipeline的构造函数:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    //构建双向链表的过程
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    //每一次中间操作,如果是有状态的,那么sourceStage的sourceAnyStateful会被标记为true
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}
复制代码

下面我们来看看每一个中间操作方法有哪些异同吧。

filter()方法

filter()方法大家都很熟悉了,返回一个匹配predicate函数的元素组成的Stream。

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    //把当前阶段的Stream传入,将新返回的StatelessOp加入链表末尾
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                //predicate不会被立即被调用,会在恰当的时机触发
                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}
复制代码

跟上面分析的一样,调用filter()会返回一个新的StatelessOp,与上一个Stream组成链表,lambda表达式不会被马上调用,只是保存在内部。 它被调用的地方是Sink.ChainedReference的accept()方法。

这里大家可能被绕晕,Sink是什么?ChainedReference又是什么,它内部的downstream又是什么?

Sink接口扩展自Consumer,用于在流管道的各个阶段传递元素,并使用其begin()、end()和cancellationRequested()方法来管理大小信息、控制流。

interface Sink<T> extends Consumer<T> {
    
    default void begin(long size) {}

    default void end() {}

    default boolean cancellationRequested() {
        return false;
    }
}
复制代码
  • begin()方法:在Stream中的元素传递给sink之前会调用这个方法,对于有状态操作,通常会做一些初始化工作,参数是元素大小,如果大小不确定,传递-1。
  • accept()方法:Stream中的每一个元素都会经过accept()方法进行逻辑处理。
  • end()方法:当所有元素都被sink处理了,会调用这个方法表示结束,对于有状态操作,会做清理工作以及将结果发送给下一个sink。
  • cancellationRequested():如果返回true,表示sink不再处理Stream中后续的元素,用于短路操作。

ChainedReference实现了Sink接口,通过一个downstream变量来连接下游的sink形成一个sink链。begin()、end()、cancellationRequested()方法都直接向下游传递。

static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
    protected final Sink<? super E_OUT> downstream;

    //downstream为传入下游的sink对象,形成链表
    public ChainedReference(Sink<? super E_OUT> downstream) {
        this.downstream = Objects.requireNonNull(downstream);
    }

    @Override
    public void begin(long size) {
        downstream.begin(size);
    }

    @Override
    public void end() {
        downstream.end();
    }

    @Override
    public boolean cancellationRequested() {
        return downstream.cancellationRequested();
    }
}
复制代码

前两章讲过,调用终止操作时会从Pipeline链表的尾部向前遍历直到Head头节点(不包含),每遍历到一个节点,就会调用它的opWrapSink()方法,通过downstream连接传入的后一个节点的sink对象,返回一个新的sink对象。最终形成一个从第一个中间操作到终止操作(终止操作就是一个特殊的sink)的sink链。

比如调用stream.map().filter().forEach()就会形成下面这样一个结构:

我们再来看filter()方法中的sink:

return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
    @Override
    public void begin(long size) {
        downstream.begin(-1);
    }

    @Override
    public void accept(P_OUT u) {
        if (predicate.test(u))
            downstream.accept(u);
    }
};
复制代码
  • 重写了begin()方法,因为经过filter过滤,传递给下游sink的元素大小是未知的,所以这里传入的是-1。
  • 实现了accept()方法,元素只有匹配predicate,才会传递给下游sink。

map()方法

map()方法,返回一个原Stream中的元素经过参数mapper函数转换的结果组成的Stream。

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                //其它的都跟filter()类似,重点关注这里
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}
复制代码
  • 实现了accept()方法,将元素经过mapper函数转化之后再传递给下游的sink处理。

flatMap()方法

flatMap()方法,返回一个Stream,这个Stream中的元素是原Stream中的每一个元素经过mapper函数转换为一个新的Stream中的元素合并的结果。这样说很拗口,我们还是来看源码吧。

public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                //重写了begin()方法
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }
                
                //重点关注这里
                @Override
                public void accept(P_OUT u) {
                    //每一个元素都会经过mapper映射为一个Stream
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            //然后遍历Stream中的元素,传递给下游sink处理
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}
复制代码

源码还是比较容易理解的。

  • 重写了begin()方法,因为每个元素都被转换成一个Stream,所以经过flatMap()处理之后的元素大小是未知的,所以传递的参数是-1。
  • 实现了accept()方法,每一个元素都会被mapper函数映射成一个新的Stream,然后遍历这个Stream,将其中的元素传递给下游的sink继续处理。

peek()方法

peek()方法,返回一个新的Stream,其中的元素就是原Stream中的元素,只是每一个元素都会被action额外的处理。这个方法大家应该用得少,主要是用于Stream调试,千万别理解成forEach()方法哈,这是一个中间操作,没有调用终止操作的话是不会被触发的。

public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
    Objects.requireNonNull(action);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 0) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                //关注这里
                @Override
                public void accept(P_OUT u) {
                    action.accept(u);
                    downstream.accept(u);
                }
            };
        }
    };
}
复制代码
  • 每一个元素都会被action的accept()方法处理一下,然后继续传递给下游sink。

unordered()方法

unordered()方法,返回一个无序的Stream。

public Stream<P_OUT> unordered() {
    if (!isOrdered())
        return this;
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return sink;
        }
    };
}
复制代码

有状态操作

调用有状态操作,返回的是StatefulOp的子类实例,它跟StatelessOp一样,都继承自ReferencePipeline。

abstract static class StatefulOp<E_IN, E_OUT>
        extends ReferencePipeline<E_IN, E_OUT> {

    StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
               StreamShape inputShape,
               int opFlags) {
        super(upstream, opFlags);
        assert upstream.getOutputShape() == inputShape;
    }

    @Override
    final boolean opIsStateful() {
        return true;
    }

    @Override
    abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
                                                   Spliterator<P_IN> spliterator,
                                                   IntFunction<E_OUT[]> generator);
}
复制代码

唯一不同的是opIsStateful()返回true。

distinct()方法

distinct()方法,返回一个原Stream中不同元素组成的Stream,也就是将元素去重。

public final Stream<P_OUT> distinct() {
    return DistinctOps.makeRef(this);
}
复制代码

distinct()调用了DistinctOps#makeRef()工厂方法,进入里面详细分析:

static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
    //省略了并行流处理相关的方法
    return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                  StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {

        @Override
        Sink<T> opWrapSink(int flags, Sink<T> sink) {
            Objects.requireNonNull(sink);
            
            //1.如果原Stream中的元素已经不重复了,则直接返回下一个节点的sink
            if (StreamOpFlag.DISTINCT.isKnown(flags)) {
                return sink;
            //2.原Stream中的元素已经是有序的
            } else if (StreamOpFlag.SORTED.isKnown(flags)) {
               ...
            //3.原Stream中的元素是无序的
            } else {
               ...
            }
        }
    };
}
复制代码

opWrapSink()方法内部非常复杂,主要是根据原Stream的标记是否重复、是否有序来判断,然后返回不同的Sink,我们分开来看。

  1. 原Stream元素已经不重复,则直接返回下游的Sink对象,不再经过额外处理。
if (StreamOpFlag.DISTINCT.isKnown(flags)) {
        return sink;
    }
复制代码
  1. 原Stream中的元素是有序的。
return new Sink.ChainedReference<T, T>(sink) {
                    //是否已经有为null的元素被当前sink处理
                    boolean seenNull;
                    //最后被当前sink处理的元素
                    T lastSeen;
                    
                    //Stream中的元素被处理之前被调用
                    @Override
                    public void begin(long size) {
                        //表示还没有为null的元素被处理   
                        seenNull = false;
                        //没有最后被当前sink处理的元素
                        lastSeen = null;
                        //调用下一下sink的begin()方法,结果distinct()处理过后元素大小未知,所以传-1
                        downstream.begin(-1);
                    }
                    
                    //清理状态
                    @Override
                    public void end() {
                        seenNull = false;
                        lastSeen = null;
                        downstream.end();
                    }

                    @Override
                    public void accept(T t) {
                        //这里的逻辑是如果传入的元素是null,并且之前没有处理过,
                        //则向下游sink传递,并且更改seenNull和lastSeen的值;否则不向下游传递
                        if (t == null) {
                            if (!seenNull) {
                                seenNull = true;
                                downstream.accept(lastSeen = null);
                            }
                        //元素为非null的值,则判断是否跟最后一次被处理的元素相同,
                        //不同则表示不重复,向下游sink传递;否则元素重复,不再传递
                        } else if (lastSeen == null || !t.equals(lastSeen)) {
                            downstream.accept(lastSeen = t);
                        }
                    }
                };
复制代码

元素是按顺序被sink处理的,所以可以通过seenNull和lastSeen两个变量判断元素是否重复。

  1. 原Stream中的元素是无序的
return new Sink.ChainedReference<T, T>(sink) {
                    Set<T> seen;

                    //元素被处理之前,做初始化工作
                    @Override
                    public void begin(long size) {
                        //初始化一个HashSet
                        seen = new HashSet<>();
                        //去重之后元素大小未知,所以传递-1
                        downstream.begin(-1);
                    }

                    //清理工作
                    @Override
                    public void end() {
                        seen = null;
                        downstream.end();
                    }

                    @Override
                    public void accept(T t) {
                        //Set中没有相同的元素,则添加,向下游sink传递
                        if (!seen.contains(t)) {
                            seen.add(t);
                            downstream.accept(t);
                        }
                    }
                };
复制代码

对于无序的元素,是通过使用一个HashSet来去重的。

sorted()方法

sorted()方法,返回一个将元素排序之后的Stream,有两个重载方法,一个表示按照自然顺序排序,一个利用比较器排序。

我们来分析带比较器的方法:

public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
    return SortedOps.makeRef(this, comparator);
}
复制代码

SortedOps#makeRef():

static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
                            Comparator<? super T> comparator) {
    return new OfRef<>(upstream, comparator);
}
复制代码

返回OfRef对象,OfRef继承自ReferencePipeline.StatefulOp:

private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> 
复制代码

我们还是看它的opWrapSink()方法:

public Sink<T> opWrapSink(int flags, Sink<T> sink) {
    Objects.requireNonNull(sink);
    
    //1.原Stream是有序的,并且排序规则是自然顺序
    // If the input is already naturally sorted and this operation
    // also naturally sorted then this is a no-op
    if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
        return sink;
    //2.元素大小确定
    else if (StreamOpFlag.SIZED.isKnown(flags))
        return new SizedRefSortingSink<>(sink, comparator);
    //3.元素大小不确定
    else
        return new RefSortingSink<>(sink, comparator);
}
复制代码

还是根据原Stream中元素的标记情况走不同的逻辑,我们分开看:

  1. 原Stream是有序的,并且需要排序的规则是自然顺序,则直接返回下游的sink节点,不做操作
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
    return sink;
复制代码
  1. 元素大小确定的情况,返回SizedRefSortingSink对象,我们进入SizedRefSortingSink类分析:
private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
    //临时保存元素的数组
    private T[] array;
    //数组下标偏移量
    private int offset;

    SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
        super(sink, comparator);
    }

    @Override
    @SuppressWarnings("unchecked")
    public void begin(long size) {
        if (size >= Nodes.MAX_ARRAY_SIZE)
            throw new IllegalArgumentException(Nodes.BAD_SIZE);
        //初始化用于排序的数组
        array = (T[]) new Object[(int) size];
    }

    @Override
    public void end() {
        //利用比较器排序数组元素
        Arrays.sort(array, 0, offset, comparator);
        //排序完毕,调用下游sink
        downstream.begin(offset);
        //判断是否短路,没有取消请求则非短路,将排序之后的元素依次发送给下游sink
        if (!cancellationWasRequested) {
            for (int i = 0; i < offset; i++)
                downstream.accept(array[i]);
        }
        //短路,可能发送部分元素就结束
        else {
            for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
                downstream.accept(array[i]);
        }
        downstream.end();
        array = null;
    }

    @Override
    public void accept(T t) {
        //每一个元素经过排序sink处理,只是将元素缓存到数组上
        array[offset++] = t;
    }
}
复制代码
  • 对于已知元素大小的sorted()操作,会利用一个数组来排序,在元素被发送给排序sink之前,先初始化一个数组,然后每一个元素发送给sink都保存到数组上,这个时候不会传递给下游sink,当sink#end()被调用时,表示再没有元素了,开始排序,然后按顺序将数组元素传递给下游的sink。

3.元素大小不确定的情况,返回的是RefSortingSink对象。

private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList<T> list;

    RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
        super(sink, comparator);
    }

    @Override
    public void begin(long size) {
        if (size >= Nodes.MAX_ARRAY_SIZE)
            throw new IllegalArgumentException(Nodes.BAD_SIZE);
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }

    @Override
    public void end() {
        list.sort(comparator);
        downstream.begin(list.size());
        if (!cancellationWasRequested) {
            list.forEach(downstream::accept);
        }
        else {
            for (T t : list) {
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);
            }
        }
        downstream.end();
        list = null;
    }

    @Override
    public void accept(T t) {
        list.add(t);
    }
}
复制代码
  • 可以看到它跟SizedRefSortingSink类似,不同的是由于元素大小未知,所以是利用一个ArrayList来缓存元素、排序的。

limit()方法

limit()方法,返回一个由原Stream中的元素组成的新的Stream,Stream中的元素是原Stream中从第一个元素开始,不超过maxSize的部分元素。

public final Stream<P_OUT> limit(long maxSize) {
    if (maxSize < 0)
        throw new IllegalArgumentException(Long.toString(maxSize));
    return SliceOps.makeRef(this, 0, maxSize);
}
复制代码

limit()方法校验了maxSize参数,然后还是利用工厂方法SliceOps#makeRef()来创建一个StatefulOp:

public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
                                    long skip, long limit) {
    if (skip < 0)
        throw new IllegalArgumentException("Skip must be non-negative: " + skip);

    return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                  flags(limit)) {
        //省略了方法
        ......
        };
}
复制代码

注意makeRef()的参数,upstream代表上一个Stream,skip代表跳过多少元素,limit代表传递给下游sink的元素最大值。我们还是来看ReferencePipeline.StatefulOp实现的opWrapSink()方法:

Sink<T> opWrapSink(int flags, Sink<T> sink) {
    return new Sink.ChainedReference<T, T>(sink) {
        //上面传递的参数赋值,对于limit()操作,skip是0
        long n = skip;
        long m = limit >= 0 ? limit : Long.MAX_VALUE;

        @Override
        public void begin(long size) {
            //计算经过limit()操作之后元素大小,传递给下游
            downstream.begin(calcSize(size, skip, m));
        }

        @Override
        public void accept(T t) {
            //limit()操作n是0,将元素传递给下游sink,同时m-1,直到m等于0,后面的元素不再传递
            if (n == 0) {
                if (m > 0) {
                    m--;
                    downstream.accept(t);
                }
            }
            //这是skip()的逻辑,开始的n个元素不传递,直到n等于0之后,再走上面的逻辑
            else {
                n--;
            }
        }

        @Override
        public boolean cancellationRequested() {
            //判断短路
            return m == 0 || downstream.cancellationRequested();
        }
    };
}
复制代码
  • begin()方法就是将经过limit()处理之后的元素大小传递给下游sink,看下它的计算逻辑:
private static long calcSize(long size, long skip, long limit) {
    //如果大小未知,直接返回-1,否则比较limit和size-skip的最小值
    return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
}
复制代码
  • accept()方法的逻辑注释在代码上了。

skip()方法

skip()方法,返回一个丢弃了Stream中前n个元素之后,剩余的元素组成的新的Stream。

public final Stream<P_OUT> skip(long n) {
    if (n < 0)
        throw new IllegalArgumentException(Long.toString(n));
    if (n == 0)
        return this;
    else
        return SliceOps.makeRef(this, n, -1);
}
复制代码

可以看到跟limit()一样,返回的同样是由SliceOps#makeRef()创建的StatefulOp,注意传入参数的区别,不再展开讲解。

总结

本章先解释了Stream中间操作的概念,然后说明了无状态操作和有状态操作怎样划分,以及哪些方法属于无状态操作,哪些属于有状态操作。通过分析源码的方式,深入理解了分别代表无状态和有状态操作的StatelessOp和StatefulOp,最后分析了每一个中间操作方法是如何通过继承StatelessOp或StatefulOp而具备相应功能的。

  • 作者:JavaShark
  • 原文链接:https://blog.csdn.net/JavaShark/article/details/124990664
    更新时间:2022年6月4日10:16:41 ,共 13828 字。