Stream
标签 : Java基础
再次回到阿里, 感觉变化好大: 一是服务资源Docker化, 最牛逼的阿里DB团队竟然把DB放到了容器中, 还放到了线上环境; 二是全集团Java8(记得离开时还是1.6、1.5, 甚至还有1.4), 在外面创业公司都还停留在1.7的时代, 阿里竟率先使用了Java8, 而且还做了高性能的定制, 因此阿里人也就有机会在生产环境体验到Java8如丝般的顺滑流畅. 而本篇就从对Java8影响最大的
Stream
开始说起.
引入
如果说
Runnable
接口是将执行逻辑从Thread
中剥离了的话, 那Stream
则是将数据计算逻辑从Collection
中抽离了出来, 使Collection
只专注于数据的存储, 而不用分心计算.
打开Collection
Api可以看到多了一个stream()
default
接口:
/**
Stream
允许以声明方式处理集合等可以转换为Stream<T>
的数据, 他有很多特点:
内部迭代
与原有的Iterator不同, Stream将迭代操作(类似for/for-each
)全部固化到了Api内部实现, 用户只需传入表达计算逻辑的lambda表达式(可以理解为Supplier、Function这些的@FunctionalInterface的实现),
Stream
便会自动迭代数据触发计算逻辑并生成结果. 内部迭代主要解决了两方面的问题: 避免集合处理时的套路和晦涩; 便于库内部实现的多核并行优化.
流水线
很多Stream操作会再返回一个Stream
, 这样多个操作就可以链接起来, 形成一个大的流水线, 使其看起来像是对数据源进行数据库式查询, 这也就让自动优化成为可能, 如隐式并行.
隐式并行 如将
.stream()替换为.parallelStream(), Stream
则会自动启用Fork/Join框架, 并行执行各条流水线, 并最终自动将结果进行合并.
延迟计算
由于Stream大部分的操作(如filter()、generate()、map()
…)都是接受一段lambda表达式, 逻辑类似接口实现(可以看成是回调), 因此代码并不是立即执行的, 除非流水线上触发一个终端操作, 否则中间操作不会执行任何处理.
短路求值
有些操作不需要处理整个流就能够拿到结果, 很多像anyMatch()、allMatch()、limit()
, 只要找到一个元素他们的工作就可以结束, 也就没有必要执行后面的操作, 因此如果后面有大量耗时的操作, 此举可大大节省性能.
下面一个示例直观的感受下Stream
带来的便利:
public void joiningList() { // 生成一段[0,20)序列
Stream 构成
一个流管道(Stream pipeline)通常由3部分构成: 数据源(Source) -> 中间操作/转换(Transforming) -> 终端操作/执行(Operations): Stream
由数据源生成, 经由中间操作串联起来的一条流水线的转换, 最后由终端操作触发执行拿到结果.
Source - 对应Stream的生成: -> 如何生成一个Stream;
Transforming - 对应Stream的转换: -> 如前面的
map()
、filter()
、limit()
, 将原Stream
转换为另一形态;Operations - 对应Stream的执行: -> 他会真正引发前面一系列Transforming的执行, 并生成一个结果(如
List
、Array
、Optional<T>
), 或一个side effect.
我们分别来介绍这些Stream的构成部分:
数据源-Stream生成
除了前面介绍过的collection.stream()
, 流的生成方式多种多样, 可简单概括为3类: 通用流、数值流、其他, 其中以通用流最为常用, 数值流是Java为int、long、double
三种数值类型防拆装箱成本所做的优化:
1. 通用流
API | description |
---|---|
Arrays.stream(T[] array) | Returns a sequential Stream with the specified array as its source. |
Stream.empty() | Returns an empty sequential Stream. |
Stream.generate(Supplier<T> s) | Returns an infinite sequential unordered stream where each element is generated by the provided Supplier<T> . |
Stream.iterate(T seed, UnaryOperator<T> f) | Returns an infinite sequential ordered Stream produced by iterative application of a function f to an initial element seed, producing a Stream consisting of seed, f(seed), f(f(seed)), etc. |
Stream.of(T... values) | Returns a sequential ordered stream whose elements are the specified values. |
Stream.concat(Stream<? extends T> a, Stream<? extends T> b) | Creates a lazily concatenated stream whose elements are all the elements of the first stream followed by all the elements of the second stream. |
StreamSupport.stream(Spliterator<T> spliterator, boolean parallel) | Creates a new sequential or parallel Stream from a Spliterator. |
2. 数值流
API | description |
---|---|
Arrays.stream(Xxx[] array) | Returns a sequential Int/Long/DoubleStream with the specified array as its source. |
XxxStream.empty() | Returns an empty sequential Int/Long/DoubleStream . |
XxxStream.generate(XxxSupplier s) | Returns an infinite sequential unordered stream where each element is generated by the provided Int/Long/DoubleSupplier . |
XxxStream.iterate(Xxx seed, XxxUnaryOperator f) | Returns an infinite sequential ordered Int/Long/DoubleStream like as Stream.iterate(T seed, UnaryOperator<T> f) |
XxxStream.of(Xxx... values) | Returns a sequential ordered stream whose elements are the specified values. |
XxxStream.concat(XxxStream a, XxxStream b) | Creates a lazily concatenated stream whose elements are all the elements of the first stream followed by all the elements of the second stream. |
Int/LongStream.range(startInclusive, endExclusive) | Returns a sequential ordered Int/LongStream from startInclusive (inclusive) to endExclusive (exclusive) by an incremental step of 1. |
Int/LongStream.rangeClosed(startInclusive, endInclusive) | Returns a sequential ordered Int/LongStream from startInclusive (inclusive) to endInclusive (inclusive) by an incremental step of 1. |
3. 其他
I/O Stream
BufferedReader.lines()
File Stream
Files.lines(Path path)
Files.find(Path start, int maxDepth, BiPredicate<Path,BasicFileAttributes> matcher, FileVisitOption... options)
DirectoryStream<Path> newDirectoryStream(Path dir)
Files.walk(Path start, FileVisitOption... options)
Jar
JarFile.stream()
Random
Random.ints()
Random.longs()
Random.doubles()
Pattern
splitAsStream(CharSequence input)
…
另外, 三种数值流之间, 以及数值流与通用流之间都可以相互转换:
1. 数值流转换:
doubleStream.mapToInt(DoubleToIntFunction mapper)
、intStream.asLongStream()
…2. 数值流转通用流:
longStream.boxed()
、intStream.mapToObj(IntFunction<? extends U> mapper)
…3. 通用流转数值流:
stream.flatMapToInt(Function<? super T,? extends IntStream> mapper)
、stream.mapToDouble(ToDoubleFunction<? super T> mapper)
…
中间操作-Stream转换
所有的中间操作都会返回另一个Stream
, 这让多个操作可以链接起来组成中间操作链, 从而形成一条流水线, 因此它的特点就是前面提到的延迟执行: 触发流水线上触发一个终端操作, 否则中间操作不执行任何处理.
API | Description |
---|---|
filter(Predicate<? super T> predicate) | Returns a stream consisting of the elements of this stream that match the given predicate. |
distinct() | Returns a stream consisting of the distinct elements (according to Object.equals(Object) ) of this stream. |
limit(long maxSize) | Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length. |
skip(long n) | Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the stream. |
sorted(Comparator<? super T> comparator) | Returns a stream consisting of the elements of this stream, sorted according to the provided Comparator . |
map(Function<? super T,? extends R> mapper) | Returns a stream consisting of the results of applying the given function to the elements of this stream. |
flatMap(Function<? super T,? extends Stream<? extends R>> mapper) | Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element. |
peek(Consumer<? super T> action) | Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream. |
这里着重讲解下flatMap()
, 因为我在第一次接触他时也没明白他到底能做什么: 假设我们有这样一个字符串list:List<String> strs = Arrays.asList("hello", "alibaba", "world");
如何列出里面各不相同的字符呢?
首先我们想到的是String包含一个split()
方法, 将字符串分解为子串, 于是我们这样写:
Stream<Stream<String>> streamStream = strs.stream() .map(str -> Arrays.stream(str.split("")));1212
我们将String分解成String[]后再由Arrays.stream()将String[]映射成Stream<String>, 但这个结果是我们不想看到的: 我们明明想要的是Stream<String>却得到的是Stream<Stream<String>>
, 他把我们想要的结果包到Stream里面了. 这时候就需要我们的flatMap()
出场了:
Stream<String> stringStream = strs.stream() .flatMap(str -> Arrays.stream(str.split("")));1212
flatMap()
把Stream
中的层级结构扁平化了, 将内层Stream
内的元素抽取出来, 最终新的Stream
就没有内层Stream
了.
可以简单概括为:
flatMap()
方法让你把一个流中的每个值都换成另一个Stream
, 然后把所有的Stream
连接起来成为一个Stream
.
终端操作-Stream执行
终端操作不仅担负着触发流水线执行的任务, 他还需要拿到流水线执行的结果, 其结果为任何不是流的值, 如List、Array、boolean、Optional<T>, 甚至是void(forEach()):
Api | Description |
---|---|
count() | Returns the count of elements in this stream. |
max(Comparator<? super T> comparator) | Returns the maximum element of this stream according to the provided Comparator . |
min(Comparator<? super T> comparator) | Returns the minimum element of this stream according to the provided Comparator . |
allMatch(Predicate<? super T> predicate) | Returns whether all elements of this stream match the provided predicate . |
anyMatch(Predicate<? super T> predicate) | Returns whether any elements of this stream match the provided predicate . |
noneMatch(Predicate<? super T> predicate) | Returns whether no elements of this stream match the provided predicate . |
findAny() | Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty. |
findFirst() | Returns an Optional describing the first element of this stream, or an empty Optional if the stream is empty. |
reduce(BinaryOperator<T> accumulator) | Performs a reduction on the elements of this stream, using an associative accumulation function, and returns an Optional describing the reduced value, if any. |
toArray() | Returns an array containing the elements of this stream. |
forEach(Consumer<? super T> action) | Performs an action for each element of this stream. |
forEachOrdered(Consumer<? super T> action) | Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order. |
collect(Collector<? super T,A,R> collector) | Performs a mutable reduction operation on the elements of this stream using a Collector . |
像
IntStream
/LongStream
/DoubleStream
还提供了average()
、sum()
、summaryStatistics()
这样的操作, 拿到一个对Stream
进行汇总了的结果.
other
java.util.stream.Stream
接口集成自java.util.stream.BaseStream
接口, 而BaseStream
接口也提供了很多工具方法(如将串行流转换为并行流的parallel()
方法)供我们使用:
Api | Description |
---|---|
S onClose(Runnable closeHandler) | Returns an equivalent stream with an additional close handler. |
void close() | Closes this stream, causing all close handlers for this stream pipeline to be called. |
S unordered() | Returns an equivalent stream that is unordered. |
Iterator<T> iterator() | Returns an iterator for the elements of this stream. |
Spliterator<T> spliterator() | Returns a spliterator for the elements of this stream. |
S sequential() | Returns an equivalent stream that is sequential. |
S parallel() | Returns an equivalent stream that is parallel. |
boolean isParallel() | Returns whether this stream, if a terminal operation were to be executed, would execute in parallel. |
综合实战
下面, 我们针对一系列交易提出一些问题综合实践上面列举的Api:
DO定义
/**
Stream操作
/**