从流中收集连续的对

给定像 { 0, 1, 2, 3, 4 }这样的流,

我怎样才能最优雅地把它变成既定的形式:

{ new Pair(0, 1), new Pair(1, 2), new Pair(2, 3), new Pair(3, 4) }

(假设,当然,我已经定义了类对) ?

编辑: 这不是严格关于 int 或者基本流的,对于任何类型的流,答案应该是通用的。

81295 次浏览

运行从流的0到 length-1for循环

for(int i = 0 ; i < stream.length-1 ; i++)
{
Pair pair = new Pair(stream[i], stream[i+1]);
// then add your pair to an array
}

这并不优雅,这是一个骇客式的解决方案,但适用于无限的流

Stream<Pair> pairStream = Stream.iterate(0, (i) -> i + 1).map( // natural numbers
new Function<Integer, Pair>() {
Integer previous;


@Override
public Pair apply(Integer integer) {
Pair pair = null;
if (previous != null) pair = new Pair(previous, integer);
previous = integer;
return pair;
}
}).skip(1); // drop first null

现在可以将流限制为所需的长度

pairStream.limit(1_000_000).forEach(i -> System.out.println(i));

我希望有更好的解决方案,比如 clojure (partition 2 1 stream)

在您的例子中,我将编写我的自定义 IntFunction,它跟踪传递的最后一个 int,并使用它映射原始 IntStream。

import java.util.function.IntFunction;
import java.util.stream.IntStream;


public class PairFunction implements IntFunction<PairFunction.Pair> {


public static class Pair {


private final int first;
private final int second;


public Pair(int first, int second) {
this.first = first;
this.second = second;
}


@Override
public String toString() {
return "[" + first + "|" + second + "]";
}
}


private int last;
private boolean first = true;


@Override
public Pair apply(int value) {
Pair pair = !first ? new Pair(last, value) : null;
last = value;
first = false;
return pair;
}


public static void main(String[] args) {


IntStream intStream = IntStream.of(0, 1, 2, 3, 4);
final PairFunction pairFunction = new PairFunction();
intStream.mapToObj(pairFunction)
.filter(p -> p != null) // filter out the null
.forEach(System.out::println); // display each Pair


}


}

我已经实现了一个 spliterator 包装器,它从原来的 spliterator 获取每个 n元素 T并生成 List<T>:

public class ConsecutiveSpliterator<T> implements Spliterator<List<T>> {


private final Spliterator<T> wrappedSpliterator;


private final int n;


private final Deque<T> deque;


private final Consumer<T> dequeConsumer;


public ConsecutiveSpliterator(Spliterator<T> wrappedSpliterator, int n) {
this.wrappedSpliterator = wrappedSpliterator;
this.n = n;
this.deque = new ArrayDeque<>();
this.dequeConsumer = deque::addLast;
}


@Override
public boolean tryAdvance(Consumer<? super List<T>> action) {
deque.pollFirst();
fillDeque();
if (deque.size() == n) {
List<T> list = new ArrayList<>(deque);
action.accept(list);
return true;
} else {
return false;
}
}


private void fillDeque() {
while (deque.size() < n && wrappedSpliterator.tryAdvance(dequeConsumer))
;
}


@Override
public Spliterator<List<T>> trySplit() {
return null;
}


@Override
public long estimateSize() {
return wrappedSpliterator.estimateSize();
}


@Override
public int characteristics() {
return wrappedSpliterator.characteristics();
}
}

可以使用下列方法创建连续的流:

public <E> Stream<List<E>> consecutiveStream(Stream<E> stream, int n) {
Spliterator<E> spliterator = stream.spliterator();
Spliterator<List<E>> wrapper = new ConsecutiveSpliterator<>(spliterator, n);
return StreamSupport.stream(wrapper, false);
}

使用方法:

consecutiveStream(Stream.of(0, 1, 2, 3, 4, 5), 2)
.map(list -> new Pair(list.get(0), list.get(1)))
.forEach(System.out::println);

An elegant solution would be to use zip. Something like:

List<Integer> input = Arrays.asList(0, 1, 2, 3, 4);
Stream<Pair> pairStream = Streams.zip(input.stream(),
input.stream().substream(1),
(a, b) -> new Pair(a, b)
);

这非常简洁和优雅,但是它使用列表作为输入。无限流源不能以这种方式处理。

另一个(更麻烦的)问题是,最近从 API 中删除了包含整个 Streams 类的 zip。上面的代码只适用于 B95或更老的版本。因此,对于最新的 JDK,我认为没有优雅的 FP 风格的解决方案,现在我们只能希望在某种程度上将 zip 重新引入到 API 中。

Java 8流库主要面向将流分割成更小的块进行并行处理,因此有状态流水线阶段非常有限,并且不支持获取当前流元素的索引和访问相邻流元素等操作。

当然,解决这些问题的一个典型方法是通过索引驱动流,并依赖于在随机访问数据结构中处理值,比如可以从中检索元素的数组列表(ArrayList)。如果这些值在 arrayList中,那么可以按照请求生成这些对,方法如下:

    IntStream.range(1, arrayList.size())
.mapToObj(i -> new Pair(arrayList.get(i-1), arrayList.get(i)))
.forEach(System.out::println);

当然,限制在于输入不能是无限的流。但是,这个管道可以并行运行。

操作本质上是有状态的,所以并不是真正意味着流要解决什么问题——参见 Javadoc中的“无状态行为”部分:

最好的方法是完全避免使用有状态行为参数来进行流操作

这里的一个解决方案是通过外部计数器在流中引入状态,尽管它只能对顺序流起作用。

public static void main(String[] args) {
Stream<String> strings = Stream.of("a", "b", "c", "c");
AtomicReference<String> previous = new AtomicReference<>();
List<Pair> collect = strings.map(n -> {
String p = previous.getAndSet(n);
return p == null ? null : new Pair(p, n);
})
.filter(p -> p != null)
.collect(toList());
System.out.println(collect);
}




static class Pair<T> {
private T left, right;
Pair(T left, T right) { this.left = left; this.right = right; }
@Override public String toString() { return "{" + left + "," + right + '}'; }
}

质子包图书馆质子包图书馆提供窗口功能。给定一个 Pair 类和一个 Stream,你可以这样做:

Stream<Integer> st = Stream.iterate(0 , x -> x + 1);
Stream<Pair<Integer, Integer>> pairs = StreamUtils.windowed(st, 2, 1)
.map(l -> new Pair<>(l.get(0), l.get(1)))
.moreStreamOps(...);

Now the pairs stream contains:

(0, 1)
(1, 2)
(2, 3)
(3, 4)
(4, ...) and so on

您可以使用滑动运算符在 剑水蚤-反应中完成此操作(我为此库做出了贡献)。

  LazyFutureStream.of( 0, 1, 2, 3, 4 )
.sliding(2)
.map(Pair::new);

或者

   ReactiveSeq.of( 0, 1, 2, 3, 4 )
.sliding(2)
.map(Pair::new);

假设 Pair 构造函数可以接受包含2个元素的 Collection。

如果希望按4分组,并按2递增,也支持这种方法。

     ReactiveSeq.rangeLong( 0L,Long.MAX_VALUE)
.sliding(4,2)
.forEach(System.out::println);

用于在 java.util.Stream. Stream 上创建滑动视图的等价静态方法也在 cyclops-Stream StreamUtils类中提供。

       StreamUtils.sliding(Stream.of(1,2,3,4),2)
.map(Pair::new);

注意:-对于单线程操作 ReactiveSeq 更合适。LazyFutureStream 扩展了 ReactiveSeq,但主要用于并发/并行使用(它是未来流)。

LazyFutureStream 扩展了 ReactiveSeq,它从令人敬畏的 jOOλ 扩展了 Seq (它扩展了 java.util.stream)。Stream) ,因此卢卡斯提供的解决方案也可以用于 Stream 类型。对于任何感兴趣的人来说,窗口/滑动操作符之间的主要区别在于明显的相对功耗/复杂性权衡,以及对于无限流的适用性(滑动操作不会消耗流,而是在流的过程中消耗缓冲区)。

我同意@aepurniet 但是你必须使用 mapToObj 来代替 map

range(0, 100).mapToObj((i) -> new Pair(i, i+1)).forEach(System.out::println);

我的扩展了文本数据流的 StreamEx库为所有流类型提供了一个 pairMap方法。对于基本流,它不更改流类型,但可用于进行一些计算。最常见的用法是计算差异:

int[] pairwiseDiffs = IntStreamEx.of(input).pairMap((a, b) -> (b-a)).toArray();

对于对象流,您可以创建任何其他对象类型。我的库不提供任何新的用户可见的数据结构,如 Pair(这是库概念的一部分)。但是,如果您有自己的 Pair类并想使用它,您可以执行以下操作:

Stream<Pair> pairs = IntStreamEx.of(input).boxed().pairMap(Pair::new);

或者如果你已经有了一些 Stream:

Stream<Pair> pairs = StreamEx.of(stream).pairMap(Pair::new);

此功能是使用 定制分裂器实现的。它具有相当低的开销,并且可以很好地并行化。当然,它适用于任何流源,而不像其他许多解决方案那样只适用于随机访问列表/数组。在许多测试中,它表现得非常好。给你一个 JMH 基准,我们使用不同的方法找到所有输入值之前的较大值(参见 这个问题)。

寻找连续的对

If you're willing to use a third party library and don't need parallelism, then JoOλ offers SQL-style window functions as follows

System.out.println(
Seq.of(0, 1, 2, 3, 4)
.window()
.filter(w -> w.lead().isPresent())
.map(w -> tuple(w.value(), w.lead().get())) // alternatively, use your new Pair() class
.toList()
);

屈服

[(0, 1), (1, 2), (2, 3), (3, 4)]

The lead() function accesses the next value in traversal order from the window.

查找连续的三元组/四元组/n 元组

评论中的一个问题是寻求一个更通用的解决方案,在这个解决方案中,应该收集的不是对而是 n 元组(或者可能是列表)。这里有一个替代方法:

int n = 3;


System.out.println(
Seq.of(0, 1, 2, 3, 4)
.window(0, n - 1)
.filter(w -> w.count() == n)
.map(w -> w.window().toList())
.toList()
);

列出名单

[[0, 1, 2], [1, 2, 3], [2, 3, 4]]

如果没有 filter(w -> w.count() == n),结果将是

[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]

免责声明: 我为 joOOλ 背后的公司工作

We can use RxJava (very powerful reactive extension library)

IntStream intStream  = IntStream.iterate(1, n -> n + 1);


Observable<List<Integer>> pairObservable = Observable.from(intStream::iterator).buffer(2,1);


pairObservable.take(10).forEach(b -> {
b.forEach(n -> System.out.println(n));
System.out.println();
});

缓冲器 接线员转换为可观察到的 将项发出到一个可观察的,该观察的发出缓冲的 那些东西。

这是一个有趣的问题。我的 hybrid尝试低于任何好?

public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3);
Iterator<Integer> first = list.iterator();
first.next();
if (first.hasNext())
list.stream()
.skip(1)
.map(v -> new Pair(first.next(), v))
.forEach(System.out::println);
}

我认为它不适合并行处理,因此可能被取消资格。

正如其他人所观察到的,由于问题的本质,需要一些有状态。

我也遇到过类似的问题,我想要的是 Oracle SQL 函数 LEAD。下面是我实现这一点的尝试。

/**
* Stream that pairs each element in the stream with the next subsequent element.
* The final pair will have only the first item, the second will be null.
*/
<T> Spliterator<Pair<T>> lead(final Stream<T> stream)
{
final Iterator<T> input = stream.sequential().iterator();


final Iterable<Pair<T>> iterable = () ->
{
return new Iterator<Pair<T>>()
{
Optional<T> current = getOptionalNext(input);


@Override
public boolean hasNext()
{
return current.isPresent();
}


@Override
public Pair<T> next()
{
Optional<T> next = getOptionalNext(input);
final Pair<T> pair = next.isPresent()
? new Pair(current.get(), next.get())
: new Pair(current.get(), null);
current = next;


return pair;
}
};
};


return iterable.spliterator();
}


private <T> Optional<T> getOptionalNext(final Iterator<T> iterator)
{
return iterator.hasNext()
? Optional.of(iterator.next())
: Optional.empty();
}

可以通过使用有界队列来存储流过流的元素来实现这一点(这是基于我在这里详细描述的想法: 是否有可能在流中获得下一个元素?)

下面的例子首先定义了 BoundedQueue 类的实例,它将存储流中的元素(如果您不喜欢扩展 LinkedList 的想法,请参考上面提到的链接,以获得其他更通用的方法)。之后,您只需将两个后续元素组合到 Pair 的实例中:

public class TwoSubsequentElems {
public static void main(String[] args) {
List<Integer> input = new ArrayList<Integer>(asList(0, 1, 2, 3, 4));


class BoundedQueue<T> extends LinkedList<T> {
public BoundedQueue<T> save(T curElem) {
if (size() == 2) { // we need to know only two subsequent elements
pollLast(); // remove last to keep only requested number of elements
}


offerFirst(curElem);


return this;
}


public T getPrevious() {
return (size() < 2) ? null : getLast();
}


public T getCurrent() {
return (size() == 0) ? null : getFirst();
}
}


BoundedQueue<Integer> streamHistory = new BoundedQueue<Integer>();


final List<Pair<Integer>> answer = input.stream()
.map(i -> streamHistory.save(i))
.filter(e -> e.getPrevious() != null)
.map(e -> new Pair<Integer>(e.getPrevious(), e.getCurrent()))
.collect(Collectors.toList());


answer.forEach(System.out::println);
}
}

您可以使用 Stream.reduce ()方法完成这项工作(我还没有看到使用该技术的其他任何答案)。

public static <T> List<Pair<T, T>> consecutive(List<T> list) {
List<Pair<T, T>> pairs = new LinkedList<>();
list.stream().reduce((a, b) -> {
pairs.add(new Pair<>(a, b));
return b;
});
return pairs;
}

为了计算时间序列的连续差异(x 值) ,我使用 streamcollect(...)方法:

final List< Long > intervals = timeSeries.data().stream()
.map( TimeSeries.Datum::x )
.collect( DifferenceCollector::new, DifferenceCollector::accept, DifferenceCollector::combine )
.intervals();

差分收集器是这样的:

public class DifferenceCollector implements LongConsumer
{
private final List< Long > intervals = new ArrayList<>();
private Long lastTime;


@Override
public void accept( final long time )
{
if( Objects.isNull( lastTime ) )
{
lastTime = time;
}
else
{
intervals.add( time - lastTime );
lastTime = time;
}
}


public void combine( final DifferenceCollector other )
{
intervals.addAll( other.intervals );
lastTime = other.lastTime;
}


public List< Long > intervals()
{
return intervals;
}
}

您也许可以修改它以满足您的需要。

我最终找到了一种欺骗 Stream.reduce 的方法,使其能够巧妙地处理成对的值; 有大量的用例需要这种在 JDK 8中没有出现 很自然的功能:

public static int ArithGeo(int[] arr) {
//Geometric
List<Integer> diffList = new ArrayList<>();
List<Integer> divList = new ArrayList<>();
Arrays.stream(arr).reduce((left, right) -> {
diffList.add(right-left);
divList.add(right/left);
return right;
});
//Arithmetic
if(diffList.stream().distinct().count() == 1) {
return 1;
}
//Geometric
if(divList.stream().distinct().count() == 1) {
return 2;
}
return -1;
}

我使用的技巧是 返还权;语句。

Streams.zip(..)可在番石榴 ,为那些谁依赖它。

Example:

Streams.zip(list.stream(),
list.stream().skip(1),
(a, b) -> System.out.printf("%s %s\n", a, b));

这里的解决方案似乎有点复杂,或者依赖于第三方库。这个问题可以通过收集对的中间流来解决:

public static <T> Stream<List<T>> pairs(Stream<T> stream) {
Iterator<T> iterator = stream.iterator();
if (iterator.hasNext()) {
T first = iterator.next();
if (iterator.hasNext()) {
return Stream.iterate(
List.of(first, iterator.next()),
prev -> iterator.hasNext() ? List.of(prev.get(1), iterator.next()) : null)
.takeWhile(prev -> prev != null);
}
}
return Stream.empty();
}

例子:

pairs(Stream.of()).toList();      // []
pairs(Stream.of(1)).toList();     // []
pairs(Stream.of(1,2)).toList();   // [[1, 2]]
pairs(Stream.of(1,2,3)).toList(); // [[1, 2], [2, 3]]
pairs(Stream.of("a","b","c","d")).toList();  // [[a, b], [b, c], [c, d]]

在这个解决方案中,Stream.iterate使用累加器的方式与 reduce非常相似,只是它创建了一个中间流,而不是一个终端操作。因此支持懒惰和无限流。