获取流的最后一个元素的最有效方法

Stream 没有 last()方法:

Stream<T> stream;
T last = stream.last(); // No such method

获取最后一个元素(或空 Stream 为 null)的最优雅和/或有效的方法是什么?

56191 次浏览

执行只返回当前值的约简操作:

Stream<T> stream;
T last = stream.reduce((a, b) -> b).orElse(null);

这在很大程度上取决于 Stream的性质。请记住,“简单”并不一定意味着“高效”。如果您怀疑流非常大,带有繁重的操作,或者有一个提前知道大小的源,那么下面的方法可能比简单的解决方案更有效:

static <T> T getLast(Stream<T> stream) {
Spliterator<T> sp=stream.spliterator();
if(sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) {
for(;;) {
Spliterator<T> part=sp.trySplit();
if(part==null) break;
if(sp.getExactSizeIfKnown()==0) {
sp=part;
break;
}
}
}
T value=null;
for(Iterator<T> it=recursive(sp); it.hasNext(); )
value=it.next();
return value;
}


private static <T> Iterator<T> recursive(Spliterator<T> sp) {
Spliterator<T> prev=sp.trySplit();
if(prev==null) return Spliterators.iterator(sp);
Iterator<T> it=recursive(sp);
if(it!=null && it.hasNext()) return it;
return recursive(prev);
}

你可以用下面的例子来说明区别:

String s=getLast(
IntStream.range(0, 10_000_000).mapToObj(i-> {
System.out.println("potential heavy operation on "+i);
return String.valueOf(i);
}).parallel()
);
System.out.println(s);

它将印刷:

potential heavy operation on 9999999
9999999

换句话说,它没有对第一个9999999元素执行操作,而只是对最后一个元素执行操作。

下面是另一种解决方案(效率并不高) :

List<String> list = Arrays.asList("abc","ab","cc");
long count = list.stream().count();
list.stream().skip(count-1).findFirst().ifPresent(System.out::println);

这只是对 霍格的答案进行了重构,因为这些代码虽然很棒,但是有点难以阅读/理解,特别是对于那些在 Java 之前不是 C 程序员的人来说。希望我的重构示例类对于那些不熟悉拆分器、它们是做什么的或者它们是如何工作的人来说更容易学习。

public class LastElementFinderExample {
public static void main(String[] args){
String s = getLast(
LongStream.range(0, 10_000_000_000L).mapToObj(i-> {
System.out.println("potential heavy operation on "+i);
return String.valueOf(i);
}).parallel()
);
System.out.println(s);
}


public static <T> T getLast(Stream<T> stream){
Spliterator<T> sp = stream.spliterator();
if(isSized(sp)) {
sp = getLastSplit(sp);
}
return getIteratorLastValue(getLastIterator(sp));
}


private static boolean isSized(Spliterator<?> sp){
return sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED);
}


private static <T> Spliterator<T> getLastSplit(Spliterator<T> sp){
return splitUntil(sp, s->s.getExactSizeIfKnown() == 0);
}


private static <T> Iterator<T> getLastIterator(Spliterator<T> sp) {
return Spliterators.iterator(splitUntil(sp, null));
}


private static <T> T getIteratorLastValue(Iterator<T> it){
T result = null;
while (it.hasNext()){
result = it.next();
}
return result;
}


private static <T> Spliterator<T> splitUntil(Spliterator<T> sp, Predicate<Spliterator<T>> condition){
Spliterator<T> result = sp;
for (Spliterator<T> part = sp.trySplit(); part != null; part = result.trySplit()){
if (condition == null || condition.test(result)){
result = part;
}
}
return result;
}
}

使用“跳过”方法的并行非大小流很棘手,而@Holger 的实现给出了错误的答案。另外@Holger 的实现稍慢一些,因为它使用迭代器。

对@Holger 答案的优化:

public static <T> Optional<T> last(Stream<? extends T> stream) {
Objects.requireNonNull(stream, "stream");


Spliterator<? extends T> spliterator = stream.spliterator();
Spliterator<? extends T> lastSpliterator = spliterator;


// Note that this method does not work very well with:
// unsized parallel streams when used with skip methods.
// on that cases it will answer Optional.empty.


// Find the last spliterator with estimate size
// Meaningfull only on unsized parallel streams
if(spliterator.estimateSize() == Long.MAX_VALUE) {
for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) {
lastSpliterator = prev;
}
}


// Find the last spliterator on sized streams
// Meaningfull only on parallel streams (note that unsized was transformed in sized)
for (Spliterator<? extends T> prev = lastSpliterator.trySplit(); prev != null; prev = lastSpliterator.trySplit()) {
if (lastSpliterator.estimateSize() == 0) {
lastSpliterator = prev;
break;
}
}


// Find the last element of the last spliterator
// Parallel streams only performs operation on one element
AtomicReference<T> last = new AtomicReference<>();
lastSpliterator.forEachRemaining(last::set);


return Optional.ofNullable(last.get());
}

使用 junit 5进行单元测试:

@Test
@DisplayName("last sequential sized")
void last_sequential_sized() throws Exception {
long expected = 10_000_000L;
AtomicLong count = new AtomicLong();
Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed();
stream = stream.skip(50_000).peek(num -> count.getAndIncrement());


assertThat(Streams.last(stream)).hasValue(expected);
assertThat(count).hasValue(9_950_000L);
}


@Test
@DisplayName("last sequential unsized")
void last_sequential_unsized() throws Exception {
long expected = 10_000_000L;
AtomicLong count = new AtomicLong();
Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed();
stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
stream = stream.skip(50_000).peek(num -> count.getAndIncrement());


assertThat(Streams.last(stream)).hasValue(expected);
assertThat(count).hasValue(9_950_000L);
}


@Test
@DisplayName("last parallel sized")
void last_parallel_sized() throws Exception {
long expected = 10_000_000L;
AtomicLong count = new AtomicLong();
Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
stream = stream.skip(50_000).peek(num -> count.getAndIncrement());


assertThat(Streams.last(stream)).hasValue(expected);
assertThat(count).hasValue(1);
}


@Test
@DisplayName("getLast parallel unsized")
void last_parallel_unsized() throws Exception {
long expected = 10_000_000L;
AtomicLong count = new AtomicLong();
Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
stream = stream.peek(num -> count.getAndIncrement());


assertThat(Streams.last(stream)).hasValue(expected);
assertThat(count).hasValue(1);
}


@Test
@DisplayName("last parallel unsized with skip")
void last_parallel_unsized_with_skip() throws Exception {
long expected = 10_000_000L;
AtomicLong count = new AtomicLong();
Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
stream = stream.skip(50_000).peek(num -> count.getAndIncrement());


// Unfortunately unsized parallel streams does not work very well with skip
//assertThat(Streams.last(stream)).hasValue(expected);
//assertThat(count).hasValue(1);


// @Holger implementation gives wrong answer!!
//assertThat(Streams.getLast(stream)).hasValue(9_950_000L); //!!!
//assertThat(count).hasValue(1);


// This is also not a very good answer better
assertThat(Streams.last(stream)).isEmpty();
assertThat(count).hasValue(0);
}

支持这两种情况的唯一解决方案是避免检测不大的并行流上的最后一个分裂器。其结果是,解决方案将对所有元素执行操作,但它将始终给出正确的答案。

请注意,在顺序流中,它无论如何都会对所有元素执行操作。

public static <T> Optional<T> last(Stream<? extends T> stream) {
Objects.requireNonNull(stream, "stream");


Spliterator<? extends T> spliterator = stream.spliterator();


// Find the last spliterator with estimate size (sized parallel streams)
if(spliterator.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) {
// Find the last spliterator on sized streams (parallel streams)
for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) {
if (spliterator.getExactSizeIfKnown() == 0) {
spliterator = prev;
break;
}
}
}


// Find the last element of the spliterator
//AtomicReference<T> last = new AtomicReference<>();
//spliterator.forEachRemaining(last::set);


//return Optional.ofNullable(last.get());


// A better one that supports native parallel streams
return (Optional<T>) StreamSupport.stream(spliterator, stream.isParallel())
.reduce((a, b) -> b);
}

关于该实现的单元测试,前三个测试完全相同(顺序和大小并行)。无尺寸平行测试如下:

@Test
@DisplayName("last parallel unsized")
void last_parallel_unsized() throws Exception {
long expected = 10_000_000L;
AtomicLong count = new AtomicLong();
Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
stream = stream.peek(num -> count.getAndIncrement());


assertThat(Streams.last(stream)).hasValue(expected);
assertThat(count).hasValue(10_000_000L);
}


@Test
@DisplayName("last parallel unsized with skip")
void last_parallel_unsized_with_skip() throws Exception {
long expected = 10_000_000L;
AtomicLong count = new AtomicLong();
Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
stream = stream.skip(50_000).peek(num -> count.getAndIncrement());


assertThat(Streams.last(stream)).hasValue(expected);
assertThat(count).hasValue(9_950_000L);
}

番石榴有 Streams find Last:

Stream<T> stream;
T last = Streams.findLast(stream);

我们需要 last的一个流在生产-我仍然不确定我们真的做了,但我的团队的各个团队成员说,我们做了,因为各种各样的“原因”。我最后写了这样的东西:

 private static class Holder<T> implements Consumer<T> {


T t = null;
// needed to null elements that could be valid
boolean set = false;


@Override
public void accept(T t) {
this.t = t;
set = true;
}
}


/**
* when a Stream is SUBSIZED, it means that all children (direct or not) are also SIZED and SUBSIZED;
* meaning we know their size "always" no matter how many splits are there from the initial one.
* <p>
* when a Stream is SIZED, it means that we know it's current size, but nothing about it's "children",
* a Set for example.
*/
private static <T> Optional<Optional<T>> last(Stream<T> stream) {


Spliterator<T> suffix = stream.spliterator();
// nothing left to do here
if (suffix.getExactSizeIfKnown() == 0) {
return Optional.empty();
}


return Optional.of(Optional.ofNullable(compute(suffix, new Holder())));
}




private static <T> T compute(Spliterator<T> sp, Holder holder) {


Spliterator<T> s;
while (true) {
Spliterator<T> prefix = sp.trySplit();
// we can't split any further
// BUT don't look at: prefix.getExactSizeIfKnown() == 0 because this
// does not mean that suffix can't be split even more further down
if (prefix == null) {
s = sp;
break;
}


// if prefix is known to have no elements, just drop it and continue with suffix
if (prefix.getExactSizeIfKnown() == 0) {
continue;
}


// if suffix has no elements, try to split prefix further
if (sp.getExactSizeIfKnown() == 0) {
sp = prefix;
}


// after a split, a stream that is not SUBSIZED can give birth to a spliterator that is
if (sp.hasCharacteristics(Spliterator.SUBSIZED)) {
return compute(sp, holder);
} else {
// if we don't know the known size of suffix or prefix, just try walk them individually
// starting from suffix and see if we find our "last" there
T suffixResult = compute(sp, holder);
if (!holder.set) {
return compute(prefix, holder);
}
return suffixResult;
}




}


s.forEachRemaining(holder::accept);
// we control this, so that Holder::t is only T
return (T) holder.t;


}

以及它的一些用法:

    Stream<Integer> st = Stream.concat(Stream.of(1, 2), Stream.empty());
System.out.println(2 == last(st).get().get());


st = Stream.concat(Stream.empty(), Stream.of(1, 2));
System.out.println(2 == last(st).get().get());


st = Stream.concat(Stream.iterate(0, i -> i + 1), Stream.of(1, 2, 3));
System.out.println(3 == last(st).get().get());


st = Stream.concat(Stream.iterate(0, i -> i + 1).limit(0), Stream.iterate(5, i -> i + 1).limit(3));
System.out.println(7 == last(st).get().get());


st = Stream.concat(Stream.iterate(5, i -> i + 1).limit(3), Stream.iterate(0, i -> i + 1).limit(0));
System.out.println(7 == last(st).get().get());


String s = last(
IntStream.range(0, 10_000_000).mapToObj(i -> {
System.out.println("potential heavy operation on " + i);
return String.valueOf(i);
}).parallel()
).get().get();


System.out.println(s.equalsIgnoreCase("9999999"));


st = Stream.empty();
System.out.println(last(st).isEmpty());


st = Stream.of(1, 2, 3, 4, null);
System.out.println(last(st).get().isEmpty());


st = Stream.of((Integer) null);
System.out.println(last(st).isPresent());


IntStream is = IntStream.range(0, 4).filter(i -> i != 3);
System.out.println(last(is.boxed()));

首先是返回类型的 Optional<Optional<T>>-它看起来 真奇怪,我同意。如果第一个 Optional是空的,这意味着 Stream 中没有元素; 如果第二个 Options 是空的,这意味着最后一个元素实际上是 null,即: Stream.of(1, 2, 3, null)(不像 guavaStreams::findLast在这种情况下抛出一个异常)。

我承认我的灵感主要来自霍尔格对我的一个类似的问题和番石榴的 Streams::findLast的回答。