用谓词限制流

是否有Java 8流操作限制(可能是无限的)Stream,直到第一个元素未能匹配谓词?

在Java 9中,我们可以像下面的例子一样使用takeWhile来打印所有小于10的数字。

IntStream
.iterate(1, n -> n + 1)
.takeWhile(n -> n < 10)
.forEach(System.out::println);

因为在Java 8中没有这样的操作,那么以通用的方式实现它的最佳方法是什么呢?

88886 次浏览

这样的操作应该是可能的和Java 8 Stream,但它不一定能有效地完成——例如,你不一定能并行化这样的操作,因为你必须按顺序查看元素。

API并没有提供一种简单的方法,但最简单的方法可能是使用Stream.iterator(),将Iterator包装成“take-while”;实现,然后返回Spliterator,然后返回Stream。或者——可能——包装Spliterator,尽管在这个实现中它不能再被拆分了。

下面是takeWhileSpliterator上未经测试的实现:

static <T> Spliterator<T> takeWhile(
Spliterator<T> splitr, Predicate<? super T> predicate) {
return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
boolean stillGoing = true;
@Override public boolean tryAdvance(Consumer<? super T> consumer) {
if (stillGoing) {
boolean hadNext = splitr.tryAdvance(elem -> {
if (predicate.test(elem)) {
consumer.accept(elem);
} else {
stillGoing = false;
}
});
return hadNext && stillGoing;
}
return false;
}
};
}


static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}

更新:Java 9 Stream现在带有takeWhile方法。

不需要黑客或其他解决方案。就用它吧!


我相信这可以大大提高: (有人可能会使它线程安全)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);


TakeWhile.stream(stream, n -> n < 10000)
.forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

肯定是黑客……不是很优雅,但是很有用~:D

class TakeWhile<T> implements Iterator<T> {


private final Iterator<T> iterator;
private final Predicate<T> predicate;
private volatile T next;
private volatile boolean keepGoing = true;


public TakeWhile(Stream<T> s, Predicate<T> p) {
this.iterator = s.iterator();
this.predicate = p;
}


@Override
public boolean hasNext() {
if (!keepGoing) {
return false;
}
if (next != null) {
return true;
}
if (iterator.hasNext()) {
next = iterator.next();
keepGoing = predicate.test(next);
if (!keepGoing) {
next = null;
}
}
return next != null;
}


@Override
public T next() {
if (next == null) {
if (!hasNext()) {
throw new NoSuchElementException("Sorry. Nothing for you.");
}
}
T temp = next;
next = null;
return temp;
}


public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
TakeWhile tw = new TakeWhile(s, p);
Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
return StreamSupport.stream(split, false);
}


}

这是在int上做的一个版本-正如问题中所问的那样。

用法:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

下面是StreamUtil的代码:

import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;


public class StreamUtil
{
public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
{
return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
}


private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
{
private final PrimitiveIterator.OfInt iterator;
private final IntPredicate predicate;


public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
{
super(Long.MAX_VALUE, IMMUTABLE);
this.iterator = stream.iterator();
this.predicate = predicate;
}


@Override
public boolean tryAdvance(IntConsumer action)
{
if (iterator.hasNext()) {
int value = iterator.nextInt();
if (predicate.test(value)) {
action.accept(value);
return true;
}
}


return false;
}
}
}

takeWhileprotonpack图书馆提供的函数之一。

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);


assertThat(finiteInts.collect(Collectors.toList()),
hasSize(10));

我有另一个快速的解决方案来实现这个(实际上是不干净的,但你知道的):

public static void main(String[] args) {
System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
.map(o -> o.toString()).collect(Collectors.joining(", ")));
}


static interface TerminatedStream<T> {
Stream<T> terminateOn(T e);
}


static class StreamUtil {
static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) {
return new TerminatedStream<T>() {
public Stream<T> terminateOn(T e) {
Builder<T> builder = Stream.<T> builder().add(seed);
T current = seed;
while (!current.equals(e)) {
current = op.apply(current);
builder.add(current);
}
return builder.build();
}
};
}
}

allMatch()是一个短路函数,所以你可以用它来停止处理。主要的缺点是您必须进行两次测试:一次是查看是否应该处理它,另一次是查看是否继续进行。

IntStream
.iterate(1, n -> n + 1)
.peek(n->{if (n<10) System.out.println(n);})
.allMatch(n->n < 10);

你可以使用java8 + rxjava

import java.util.stream.IntStream;
import rx.Observable;




// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
.takeWhile(n ->
{
System.out.println(n);
return n < 10;
}
).subscribe() ;




// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
.takeWhile(n -> n < 10)
.forEach( n -> System.out.println(n));

下面是我使用Java流库的尝试。

        IntStream.iterate(0, i -> i + 1)
.filter(n -> {
if (n < 10) {
System.out.println(n);
return false;
} else {
return true;
}
})
.findAny();

JDK 9中已经添加了takeWhiledropWhile操作。示例代码

IntStream
.iterate(1, n -> n + 1)
.takeWhile(n -> n < 10)
.forEach(System.out::println);

在JDK 9下编译和运行时,它的行为将完全符合您的预期。

JDK 9已经发布。它可以在这里下载:JDK 9发布

作为@StuartMarks回答的后续。我的StreamEx库具有与当前JDK-9实现兼容的takeWhile操作。当在JDK-9下运行时,它只会委托给JDK实现(通过MethodHandle.invokeExact,这非常快)。在JDK-8下运行时,将使用“polyfill”实现。所以使用我的库可以像这样解决问题:

IntStreamEx.iterate(1, n -> n + 1)
.takeWhile(n -> n < 10)
.forEach(System.out::println);

去获取库AbacusUtil。它提供了你想要的API和更多:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

声明:我是AbacusUtil的开发者。

除非通过短路终端操作,否则不能中止流,这将使一些流值无论其值如何都未得到处理。但是如果你只是想避免流上的操作,你可以添加一个转换和过滤器到流:

import java.util.Objects;


class ThingProcessor
{
static Thing returnNullOnCondition(Thing thing)
{    return( (*** is condition met ***)? null : thing);    }


void processThings(Collection<Thing> thingsCollection)
{
thingsCollection.stream()
*** regular stream processing ***
.map(ThingProcessor::returnNullOnCondition)
.filter(Objects::nonNull)
*** continue stream processing ***
}
} // class ThingProcessor

它会在满足某些条件时将数据流转换为null,然后过滤掉null。如果您愿意考虑副作用,可以在遇到某些事情时将条件值设置为true,这样所有后续事情都会被过滤掉,而不管它们的值是多少。但即使不是这样,您也可以通过从流中过滤不想处理的值来节省大量(如果不是全部)处理。

实际上,在Java 8中有两种方法可以做到这一点,不需要任何额外的库或使用Java 9。

如果你想在控制台上打印2到20的数字,你可以这样做:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

两种情况下的输出都是:

2
4
6
8
10
12
14
16
18
20

还没有人提到anyMatch。这就是我写这篇文章的原因。

如果你有不同的问题,可能需要不同的解决方案,但对于你当前的问题,我只想说:

IntStream
.iterate(1, n -> n + 1)
.limit(10)
.forEach(System.out::println);

这是从JDK 9 java.util.stream.Stream.takeWhile(Predicate)中复制的源代码。为了使用JDK 8,有一点不同。

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
private static final int CANCEL_CHECK_COUNT = 63;
private final Spliterator<T> s;
private int count;
private T t;
private final AtomicBoolean cancel = new AtomicBoolean();
private boolean takeOrDrop = true;


Taking(Spliterator<T> s) {
super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
this.s = s;
}


@Override
public boolean tryAdvance(Consumer<? super T> action) {
boolean test = true;
if (takeOrDrop &&               // If can take
(count != 0 || !cancel.get()) && // and if not cancelled
s.tryAdvance(this) &&   // and if advanced one element
(test = p.test(t))) {   // and test on element passes
action.accept(t);           // then accept element
return true;
} else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}


@Override
public Comparator<? super T> getComparator() {
return s.getComparator();
}


@Override
public void accept(T t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}


@Override
public Spliterator<T> trySplit() {
return null;
}
}
return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}

甚至我也有类似的需求——调用web服务,如果失败,重试3次。如果在多次尝试后仍然失败,请发送电子邮件通知。在谷歌了很多之后,anyMatch()成了救世主。我的示例代码如下。在下面的例子中,如果webServiceCall方法在第一次迭代中返回true,则stream不会继续迭代,因为我们已经调用了anyMatch()。我相信,这就是你想要的。

import java.util.stream.IntStream;


import io.netty.util.internal.ThreadLocalRandom;


class TrialStreamMatch {


public static void main(String[] args) {
if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
//Code for sending email notifications
}
}


public static boolean webServiceCall(int i){
//For time being, I have written a code for generating boolean randomly
//This whole piece needs to be replaced by actual web-service client code
boolean bool = ThreadLocalRandom.current().nextBoolean();
System.out.println("Iteration index :: "+i+" bool :: "+bool);


//Return success status -- true or false
return bool;
}

可能有点偏离主题,但这是我们为List<T>而不是Stream<T>所拥有的。

首先你需要有一个take util方法。该方法接受第一个n元素:

static <T> List<T> take(List<T> l, int n) {
if (n <= 0) {
return newArrayList();
} else {
int takeTo = Math.min(Math.max(n, 0), l.size());
return l.subList(0, takeTo);
}
}

它就像scala.List.take一样工作

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));


assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

现在基于take编写takeWhile方法就相当简单了

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
return l.stream().
filter(p.negate()).findFirst(). // find first element when p is false
map(l::indexOf).        // find the index of that element
map(i -> take(l, i)).   // take up to the index
orElse(l);  // return full list if p is true for all elements
}

它是这样工作的:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

这个实现部分迭代列表几次,但它不会添加add O(n^2)操作。希望你能接受。

如果您知道将要执行的重复的确切数量,您就可以执行

IntStream
.iterate(1, n -> n + 1)
.limit(10)
.forEach(System.out::println);
    IntStream.iterate(1, n -> n + 1)
.peek(System.out::println) //it will be executed 9 times
.filter(n->n>=9)
.findAny();

您可以使用mapToObj来返回最终对象或消息,而不是peak

    IntStream.iterate(1, n -> n + 1)
.mapToObj(n->{   //it will be executed 9 times
if(n<9)
return "";
return "Loop repeats " + n + " times";});
.filter(message->!message.isEmpty())
.findAny()
.ifPresent(System.out::println);