你能把一条溪流分成两条溪流吗?

我有一个由 Java8流表示的数据集:

Stream<T> stream = ...;

我可以看到如何过滤它得到一个随机子集-例如

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

我还可以看到如何减少这个流以获得,例如,表示数据集的两个随机部分的两个列表,然后将它们返回到流中。 但是,有没有一种直接的方法可以从最初的一个生成两个流呢

(heads, tails) = stream.[some kind of split based on filter]

谢谢你的建议。

153305 次浏览

不完全是。您无法从一个 Stream中获得两个; 这没有意义——如何在不需要同时生成另一个 Stream的情况下迭代一个 Stream?一条流只能操作一次。

但是,如果你想把它们转储到一个列表或者其他什么东西中,你可以做什么

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));

这违背了 Stream 的一般机制。假设你可以像你想要的那样把 S0流分成 Sa 和 Sb。在 Sa 上执行任何终端操作(比如 count())都必然会“消耗”S0中的所有元素。因此,某人丢失了它的数据源。

以前,Stream 有一个 tee()方法,我想,它将一个流复制到两个。现在已经删除了。

Stream 有一个 eek ()方法,您可以使用它来实现您的需求。

不幸的是,你的要求在 流媒体的 JavaDoc中直接遭到了反对:

应该操作流(调用中间件或终端) 流操作)一次,这就排除了“分叉”的可能性 流,其中相同的源为两个或多个管道提供信息,或者 同一流的多次遍历。

您可以使用 peek或其他方法来解决这个问题,如果您真的希望这种类型的行为。在这种情况下,您应该做的不是尝试使用分叉过滤器来支持来自同一原始 Stream 源的两个流,而是复制您的流并适当地过滤每个重复。

但是,您可能希望重新考虑 Stream是否适合您的用例。

不完全是,但是您可以通过调用 Collectors.groupingBy()来完成您需要的任务。创建一个新的集合,然后可以在该新集合上实例化流。

收藏家可以用于此。

  • 对于两个类别,使用 Collectors.partitioningBy()工厂。

这将创建一个 Map<Boolean, List>,并根据 Predicate将项目放入一个或另一个列表中。

注意: 由于流需要被整个消费,这不能在无限的流上工作。而且因为流无论如何都会被使用,所以这个方法只是将它们放在 List 中,而不是创建一个带内存的新流。如果需要流作为输出,总是可以对这些列表进行流处理。

而且,不需要迭代器,即使在您提供的只有头部的示例中也不需要。

  • 二进制分解看起来像这样:
Random r = new Random();


Map<Boolean, List<String>> groups = stream
.collect(Collectors.partitioningBy(x -> r.nextBoolean()));


System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • 对于更多类别,请使用 Collectors.groupingBy()工厂。
Map<Object, List<String>> groups = stream
.collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

如果流不是 Stream,而是像 IntStream这样的基本流之一,则此 .collect(Collectors)方法不可用。没有收集器工厂,你只能手工操作。它的实现方式如下:

[例2.0自2020-04-16]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
IntPredicate predicate = ignored -> r.nextBoolean();


Map<Boolean, List<Integer>> groups = intStream.collect(
() -> Map.of(false, new ArrayList<>(100000),
true , new ArrayList<>(100000)),
(map, value) -> map.get(predicate.test(value)).add(value),
(map1, map2) -> {
map1.get(false).addAll(map2.get(false));
map1.get(true ).addAll(map2.get(true ));
});

在这个示例中,我使用初始集合的完整大小初始化 ArrayList (如果已知的话)。这甚至可以防止在绝境求生手册中调整事件的大小,但是可能会占用2NT 空间(N = 元素的初始数量,T = 线程的数量)。为了在空间和速度之间做出权衡,你可以省略它,或者使用你最好的猜测,比如一个分区中预期的最高元素数(平衡分割通常刚好超过 N/2)。

我希望使用 Java9方法不会冒犯到任何人。对于 Java8版本,查看编辑历史记录。

我偶然发现了这个问题,我觉得一个分叉的流有一些可以证明有效的用例。我以使用者的身份编写了下面的代码,因此它不做任何事情,但是您可以将它应用于函数和可能遇到的任何其他事情。

class PredicateSplitterConsumer<T> implements Consumer<T>
{
private Predicate<T> predicate;
private Consumer<T>  positiveConsumer;
private Consumer<T>  negativeConsumer;


public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
{
this.predicate = predicate;
this.positiveConsumer = positive;
this.negativeConsumer = negative;
}


@Override
public void accept(T t)
{
if (predicate.test(t))
{
positiveConsumer.accept(t);
}
else
{
negativeConsumer.accept(t);
}
}
}

现在,您的代码实现可以是这样的:

personsArray.forEach(
new PredicateSplitterConsumer<>(
person -> person.getDateOfBirth().isPresent(),
person -> System.out.println(person.getName()),
person -> System.out.println(person.getName() + " does not have Date of birth")));

这是我能想到的最好的答案了。

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;


public class Test {


public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {


Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());


return new ImmutablePair<L, R>(trueResult, falseResult);
}


public static void main(String[] args) {


Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);


Pair<List<Integer>, String> results = splitStream(stream,
n -> n > 5,
s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));


System.out.println(results);
}


}

这需要一个整数流,并在5分割它们。对于大于5的数字,它只过滤偶数,并将它们放在一个列表中。剩下的就和他们一起了。

产出:

 ([6, 8],0|1|2|3|4|5)

它并不理想,因为它将所有内容收集到中间集合中,从而破坏了流(并且有太多的参数!)

这个怎么样:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
() -> (new Random()).ints(0, 2).boxed();


Stream<Integer> tails =
randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
randomIntsStreamSupplier.get().filter(x->x.equals(1));

我在寻找从流中过滤某些元素并将它们作为错误记录的方法时偶然发现了这个问题。因此,我实际上并不需要分割流,只需要将一个提前终止操作附加到一个语法不引人注目的谓词。这是我想到的:

public class MyProcess {
/* Return a Predicate that performs a bail-out action on non-matching items. */
private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
return x -> {
if (pred.test(x)) {
return true;
}
altAction.accept(x);
return false;
};


/* Example usage in non-trivial pipeline */
public void processItems(Stream<Item> stream) {
stream.filter(Objects::nonNull)
.peek(this::logItem)
.map(Item::getSubItems)
.filter(withAltAction(SubItem::isValid,
i -> logError(i, "Invalid")))
.peek(this::logSubItem)
.filter(withAltAction(i -> i.size() > 10,
i -> logError(i, "Too large")))
.map(SubItem::toDisplayItem)
.forEach(this::display);
}
}

使用龙目岛的简短版本

import java.util.function.Consumer;
import java.util.function.Predicate;


import lombok.RequiredArgsConstructor;


/**
* Forks a Stream using a Predicate into postive and negative outcomes.
*/
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
Predicate<T> predicate;
Consumer<T> positiveConsumer;
Consumer<T> negativeConsumer;


@Override
public void accept(T t) {
(predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
}
}

你从一个中得到两个
从 Java 12到 teeing
在100次抛硬币中计算正反面

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
List<Long> list = Stream.iterate(0, i -> coin.nextInt())
.limit(100).collect(teeing(
filtering(i -> i == 1, counting()),
filtering(i -> i == 0, counting()),
(heads, tails) -> {
return(List.of(heads, tails));
}));
System.err.println("heads:" + list.get(0) + " tails:" + list.get(1));

得到例如: heads:51 tails:49