复制一个流以避免“流已被操作或关闭”

我想复制一个 Java8流,这样我就可以处理它两次。我可以 collect作为一个列表,并从中获得新的流;

// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff

但我觉得应该有更高效更优雅的方法。

有没有办法在不将流转换为集合的情况下复制流?

我实际上是在处理一组 Either所以我想先用一种方法处理左边的投影然后再用另一种方法处理右边的投影。有点像这样(到目前为止,我不得不使用 toList技巧)。

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());


Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );


Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
93334 次浏览

您可以创建一个 runnables 流(例如) :

results.stream()
.flatMap(either -> Stream.<Runnable> of(
() -> failure(either.left()),
() -> success(either.right())))
.forEach(Runnable::run);

其中 failuresuccess是应用的操作。然而,这将创建相当多的临时对象,而且可能不会比从集合开始并对其进行两次流化/迭代更有效。

我觉得你对效率的假设有点落后。如果只使用一次数据,就会获得巨大的效率回报,因为不需要存储数据,而且流提供了强大的“循环融合”优化,让整个数据有效地通过管道流动。

如果您想重用相同的数据,那么根据定义,您要么必须(确定性地)生成它两次,要么必须存储它。如果它碰巧已经在一个集合中,那么很好; 那么迭代它两次是便宜的。

我们在“分叉流”的设计中进行了实验。我们发现,支持这种方法是有实际成本的; 它使普通用例(一次性使用)负担更重,而牺牲了不常见用例。最大的问题是处理“当两个管道不以相同的速率消耗数据时会发生什么”现在你又回到了缓冲阶段。这是一个明显没有承载其重量的功能。

如果希望重复操作相同的数据,可以将其存储,或者将操作结构设置为“消费者”,然后执行以下操作:

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

您还可以查看 RxJava 库,因为它的处理模型更适合这种“流分叉”。

我们在 JoOλ中为流实现了一个 duplicate()方法,这是一个开源库,我们创建它是为了改进 JooQ的集成测试。基本上,你可以这样写:

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();

在内部,存在一个缓冲区,存储从一个流消耗但不从另一个流消耗的所有值。如果您的两个流以相同的速率 如果你能忍受线程安全性的缺乏消耗,那么这可能是最有效的。

下面是这个算法的工作原理:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
final List<T> gap = new LinkedList<>();
final Iterator<T> it = stream.iterator();


@SuppressWarnings("unchecked")
final Iterator<T>[] ahead = new Iterator[] { null };


class Duplicate implements Iterator<T> {
@Override
public boolean hasNext() {
if (ahead[0] == null || ahead[0] == this)
return it.hasNext();


return !gap.isEmpty();
}


@Override
public T next() {
if (ahead[0] == null)
ahead[0] = this;


if (ahead[0] == this) {
T value = it.next();
gap.offer(value);
return value;
}


return gap.poll();
}
}


return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

这里有更多源代码

Tuple2可能类似于您的 Pair类型,而 SeqStream,并有一些增强。

您可以使用带有 Supplier的局部变量来设置流管道的公共部分。

来自 http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:

重用流

Java8流无法重用。一旦调用任何终端操作,流就会关闭:

Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception


Calling `noneMatch` after `anyMatch` on the same stream results in the following exception:
java.lang.IllegalStateException: stream has already been operated upon or closed
at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at
java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

为了克服这个限制,我们必须为我们想要执行的每个终端操作创建一个新的流链,例如,我们可以创建一个流供应商来构建一个新的流,所有的中间操作都已经建立:

Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));


streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

每次对 get()的调用都构造一个新的流,我们可以在其上保存调用所需的终端操作。

另一种多次处理元素的方法是使用 Pek (消费者):

doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));

peek(Consumer)可以根据需要链接多次。

doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));

我参与的一个库 cyclops-response 有一个静态方法,允许您复制一个 Stream (并返回一个 jOOλ Streams 元组)。

    Stream<Integer> stream = Stream.of(1,2,3);
Tuple2<Stream<Integer>,Stream<Integer>> streams =  StreamUtils.duplicate(stream);

请参阅注释,在现有的 Stream 上使用重复服务会带来性能损失。一个更好的选择是使用 Streamable:-

还有一个(惰性) Streamable 类,可以从 Stream、 Iterable 或 Array 构造,并重播多次。

    Streamable<Integer> streamable = Streamable.of(1,2,3);
streamable.stream().forEach(System.out::println);
streamable.stream().forEach(System.out::println);

SynizedFromStream (stream)-可以用来创建一个 Streamable,它将延迟地填充它的后备集合,以一种可以跨线程共享的方式。From Stream (stream)不会引起任何同步开销。

对于这个特殊的问题,你也可以使用分区

     // Partition Eighters into left and right
List<Either<Pair<A, Throwable>, A>> results = doSomething();
Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft()));
passingFailing.get(true) <- here will be all passing (left values)
passingFailing.get(false) <- here will be all failing (right values)

使用 Supplier为每个终止操作生成流。

Supplier<Stream<Integer>> streamSupplier = () -> list.stream();

无论什么时候你需要这些收藏, 使用 streamSupplier.get()获取新的流。

例子:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);

我们可以在读取或迭代流时使用 Stream Builder。 这是 溪流建造者的文件。

Https://docs.oracle.com/javase/8/docs/api/java/util/stream/stream

用例

假设我们有员工流,我们需要使用这个流在 excel 文件中写入员工数据,然后更新员工集合/表 [这只是用来展示 Stream Builder 用法的用例] :

Stream.Builder<Employee> builder = Stream.builder();


employee.forEach( emp -> {
//store employee data to excel file
// and use the same object to build the stream.
builder.add(emp);
});


//Now this stream can be used to update the employee collection
Stream<Employee> newStream = builder.build();

我遇到过类似的问题,可以想出三种不同的中间结构来创建流的副本: List、数组和 Stream.Builder。我写了一个小的基准测试程序,从性能的角度来看,List比其他两个相当相似的程序慢了30% 。

转换为数组的唯一缺点是,如果您的元素类型是泛型类型(在我的例子中就是泛型类型) ,那么转换过程会很麻烦; 因此我更喜欢使用 Stream.Builder

最后,我编写了一个小函数来创建 Collector:

private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
b2.build().forEach(b1);
return b1;
}, Stream.Builder::build);
}

然后,我可以通过做 str.collect(copyCollector())来复制任何流 str,这感觉非常符合流的惯用做法。