为什么 latMap()之后的 filter()在 Java 流中“不完全”惰性?

我有以下示例代码:

System.out.println(
"Result: " +
Stream.of(1, 2, 3)
.filter(i -> {
System.out.println(i);
return true;
})
.findFirst()
.get()
);
System.out.println("-----------");
System.out.println(
"Result: " +
Stream.of(1, 2, 3)
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.filter(i -> {
System.out.println(i);
return true;
})
.findFirst()
.get()
);

产出如下:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

从这里我看到,在第一种情况下,stream确实表现懒惰-我们使用 findFirst(),所以一旦我们有了第一个元素,我们的过滤 lambda 就不会被调用。 然而,在使用 flatMap的第二种情况中,我们看到,尽管第一个元素满足过滤条件,但仍然可以找到(它只是任何第一个元素,因为 lambda 总是返回 true)流的进一步内容仍然通过过滤函数提供。

我试图理解它为什么会这样,而不是像第一种情况那样,在计算出第一个元素之后就放弃。 如有任何有用的信息将不胜感激。

9850 次浏览

输入流的元素被一个一个地懒惰地消耗掉。第一个元素 1由两个 flatMap转换为流 -1, 0, 1, 0, 1, 2, 1, 2, 3,因此整个流对应于第一个输入元素。嵌套流通过管道被急切地物化,然后被压平,然后被输送到 filter级。这解释了您的输出。

上面的内容并非源于基本的限制,但是对于嵌套流来说,全面的惰性可能会使事情变得更加复杂。我怀疑让它表演起来会是一个更大的挑战。

作为比较,Clojure 的惰性子集为每个这样的嵌套级别获得了另一层包装。由于这种设计,当嵌套运行到极致时,操作甚至可能失败。

DR,这已经在 JDK-8075939中解决,并在 Java10中修复(并在 JDK-8225328中向后移植到 爪哇8)。

当查看实现(ReferencePipeline.java)时,我们看到方法[ 链接]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

它将被 findFirst操作调用。需要特别注意的是 sink.cancellationRequested(),它允许在第一次匹配时结束循环。比较[ 链接]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}


@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
};
}

推进一个项目的方法最终在子流上调用 forEach,而没有任何提前终止的可能性,而 flatMap方法开头的注释甚至告诉我们这个缺失的特性。

由于这不仅仅是一个优化问题,因为它意味着当子流是无限的时候代码就会中断,我希望开发人员很快证明他们“可以做得比这更好”..。


为了说明其含义,虽然 Stream.iterate(0, i->i+1).findFirst()按预期工作,但是 Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst()将以无限循环结束。

关于规范,大部分可以在

包规范 的“流操作和管道”一章:

...

中间操作返回一个新的流。它们总是 懒惰;

...

懒惰还可以避免在不必要的时候检查所有数据; 对于“查找第一个长于1000个字符的字符串”这样的操作,只需要检查足够多的字符串就可以找到一个具有所需特征的字符串,而不需要检查源代码中可用的所有字符串。(当输入流是无限的,而不仅仅是大的时候,这种行为变得更加重要。)

...

此外,一些操作被认为是 短路操作。中间操作是短路的,如果当遇到无穷大的输入时,它可能产生有限的流。一个终端操作是短路的,如果当遇到无穷大的输入时,它可能在有限的时间内终止。在管道中进行短路操作是无限流处理在有限时间内正常终止的必要条件,但不是充分条件。

很明显,短路操作并不能保证有限时间终止,例如,当一个过滤器不匹配处理无法完成的任何项目时,但是一个简单地忽略操作的短路性质而不支持任何有限时间终止的实现远远不符合规范。

关于无限子流的中断,当引入 中级(而不是终端)短路操作时,latMap 的行为变得更加令人惊讶。

虽然下面的工作与预期的一样,打印出无限的整数序列

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).forEach(System.out::println);

下面的代码只打印出“1”,但仍然终止 没有:

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).limit(1).forEach(System.out::println);

我无法想象在阅读规范的过程中不存在这样的错误。

在我的免费 StreamEx库中,我介绍了短路收集器。当使用短路收集器(如 MoreCollectors.first())收集顺序流时,从源中只消耗一个元件。在内部,它是以一种非常肮脏的方式实现的: 使用自定义异常来中断控制流。使用我的库,您的示例可以这样重写:

System.out.println(
"Result: " +
StreamEx.of(1, 2, 3)
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.filter(i -> {
System.out.println(i);
return true;
})
.collect(MoreCollectors.first())
.get()
);

结果如下:

-1
Result: -1

我同意其他人这是一个错误打开在 JDK-8075939。而且因为一年多以后还没有修好。我想向您推荐: AbacusUtil

N.println("Result: " + Stream.of(1, 2, 3).peek(N::println).first().get());


N.println("-----------");


N.println("Result: " + Stream.of(1, 2, 3)
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.peek(N::println).first().get());


// output:
// 1
// Result: 1
// -----------
// -1
// Result: -1

披露: 我是 AbacusUtil 的开发者。

不幸的是,.flatMap()并不懒惰。但是,这里有一个定制的 flatMap解决方案: 为什么.latMap ()在 java 8和 java 9中如此低效(非惰性)

今天我也偶然发现了这个 bug。行为没有那么拘谨,导致简单的情况下,如下,工作正常,但类似的生产代码不工作。

 stream(spliterator).map(o -> o).flatMap(Stream::of).flatMap(Stream::of).findAny()

对于那些不能再等几年才迁移到 JDK-10的人来说,还有另一种真正的懒惰流。它不支持并行。它专门用于 JavaScript 翻译,但对我来说很有用,因为界面是一样的。

StreamHelper 是基于集合的,但是很容易适应 Spliterator。

Https://github.com/yaitskov/j4ts/blob/stream/src/main/java/javaemul/internal/stream/streamhelper.java

虽然 JDK-8075939在 Java11中已经被修复,并且被回移到了10和8u222,但仍然存在一个边缘情况,即在使用仍然存在于 Java17中的 Stream.iterator(): JDK-8267359时,flatMap()并没有真正懒惰。

这个

Iterator<Integer> it =
Stream.of("a", "b")
.flatMap(s -> Stream
.of(1, 2, 3, 4)
.filter(i -> { System.out.println(i); return true; }))
.iterator();


it.hasNext(); // This consumes the entire flatmapped stream
it.next();

指纹

1
2
3
4

而这个:

Iterator<Integer> it =
Stream.of("a", "b")
.flatMap(s -> Stream
.iterate(1, i -> i)
.filter(i -> { System.out.println(i); return true; }))
.iterator();


it.hasNext();
it.next();

永远不会终止