使用 JDK8和 lambda 压缩流(java.util.stream. Streams.zip)

在具有 lambda b93的 JDK 8中,有一个类 B93中的 java.util.stream. Streams.zip可用于压缩流(在教程 探索 Java8 Lambdas 第一部分作者: Dhananjay Nene中进行了说明)。这个功能:

创建一个延迟和顺序组合的 Stream,其元素是 结合两个流元素的结果。

但是在 b98中这个消失了。事实上 Streams类甚至在 B98中的 java.util.stream中都不能访问。

这个功能被移动了吗? 如果移动了,我如何使用 b98简洁地压缩流?

我想到的应用程序是 在这个 Java 中实现了 Shen,我在其中替换了

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

函数使用相当冗长的代码(不使用 b98的功能)。

95126 次浏览

Lazy-Seq 库提供了 zip 功能。

Https://github.com/nurkiewicz/lazyseq

这个库受到 scala.collection.immutable.Stream的启发,旨在提供不可变的、线程安全的、易于使用的延迟序列实现,可能是无限的。

您提到的类的方法已经移动到 Stream接口本身,以支持默认方法。但似乎 zip方法已被删除。也许是因为不清楚不同大小的流的默认行为应该是什么。但实现理想的行为很简单:

static <T> boolean every(
Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
Iterator<T> it=c2.iterator();
return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
}
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
Iterator<T> it=c2.iterator();
return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
.findFirst().orElse(null);
}

我也需要这个,所以我只是从 b93获取源代码,并把它放在一个“ util”类中。为了使用当前的 API,我不得不稍微修改一下它。

作为参考,下面是工作代码(自行承担风险... ...) :

public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
Stream<? extends B> b,
BiFunction<? super A, ? super B, ? extends C> zipper) {
Objects.requireNonNull(zipper);
Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();


// Zipping looses DISTINCT and SORTED characteristics
int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
~(Spliterator.DISTINCT | Spliterator.SORTED);


long zipSize = ((characteristics & Spliterator.SIZED) != 0)
? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
: -1;


Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
Iterator<C> cIterator = new Iterator<C>() {
@Override
public boolean hasNext() {
return aIterator.hasNext() && bIterator.hasNext();
}


@Override
public C next() {
return zipper.apply(aIterator.next(), bIterator.next());
}
};


Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
return (a.isParallel() || b.isParallel())
? StreamSupport.stream(split, true)
: StreamSupport.stream(split, false);
}

Zip 是 质子包图书馆质子包图书馆提供的函数之一。

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");


List<String> zipped = StreamUtils.zip(streamA,
streamB,
(a, b) -> a + " is for " + b)
.collect(Collectors.toList());


assertThat(zipped,
contains("A is for Apple", "B is for Banana", "C is for Carrot"));
public class Tuple<S,T> {
private final S object1;
private final T object2;


public Tuple(S object1, T object2) {
this.object1 = object1;
this.object2 = object2;
}


public S getObject1() {
return object1;
}


public T getObject2() {
return object2;
}
}




public class StreamUtils {


private StreamUtils() {
}


public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) {
Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
Iterator<Integer> integerIterator = integerStream.iterator();
return stream.map(x -> new Tuple<>(integerIterator.next(), x));
}
}

使用 JDK8和 lambda (大意)压缩两个流。

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
final Iterator<A> iteratorA = streamA.iterator();
final Iterator<B> iteratorB = streamB.iterator();
final Iterator<C> iteratorC = new Iterator<C>() {
@Override
public boolean hasNext() {
return iteratorA.hasNext() && iteratorB.hasNext();
}


@Override
public C next() {
return zipper.apply(iteratorA.next(), iteratorB.next());
}
};
final boolean parallel = streamA.isParallel() || streamB.isParallel();
return iteratorToFiniteStream(iteratorC, parallel);
}


public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
final Iterable<T> iterable = () -> iterator;
return StreamSupport.stream(iterable.spliterator(), parallel);
}

我参与的 AOL 的 剑水蚤-反应也提供了压缩功能,包括通过 扩展流实现扩展流实现实现的反应流接口 ReactiveSeq,以及通过 StreamUtils 通过静态方法为标准 Java 流提供大量相同功能。

 List<Tuple2<Integer,Integer>> list =  ReactiveSeq.of(1,2,3,4,5,6)
.zip(Stream.of(100,200,300,400));




List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
Stream.of(100,200,300,400));

它还提供了更广泛的基于 Applicative 的 zip。

   ReactiveSeq.of("a","b","c")
.ap3(this::concat)
.ap(of("1","2","3"))
.ap(of(".","?","!"))
.toList();


//List("a1.","b2?","c3!");


private String concat(String a, String b, String c){
return a+b+c;
}

甚至可以将一个流中的每个项目与另一个流中的每个项目配对

   ReactiveSeq.of("a","b","c")
.forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);


//ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")

太棒了。我必须将两个流压缩到一个 Map 中,其中一个流是键,另一个流是值

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
streamB,
(a, b) -> {
final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
return entry;
});


System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));

产出: { A = 苹果,B = 香蕉,C = 胡萝卜}

因为我不能想象除了索引集合(List)之外还有什么其他的压缩方法,而且我非常喜欢简单,所以下面就是我的解决方案:

<A,B,C>  Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){
int shortestLength = Math.min(lista.size(),listb.size());
return IntStream.range(0,shortestLength).mapToObj( i -> {
return zipper.apply(lista.get(i), listb.get(i));
});
}

如果项目中有番石榴,可以使用 Streams.zip方法(在番石榴21中添加的) :

返回一个流,其中的每个元素都是将 stream A 和 stream B 中每个元素的对应元素传递给函数的结果。产生的流将只有两个输入流中较短的那个流那么长; 如果一个流较长,它的额外元素将被忽略。产生的流不能有效地进行分割。这可能会损害并行性能。

 public class Streams {
...


public static <A, B, R> Stream<R> zip(Stream<A> streamA,
Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
...
}
}

我谦虚地建议这个实现。产生的流被截断为两个输入流中较短的一个。

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
Spliterator<L> lefts = leftStream.spliterator();
Spliterator<R> rights = rightStream.spliterator();
return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
@Override
public boolean tryAdvance(Consumer<? super T> action) {
return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
}
}, leftStream.isParallel() || rightStream.isParallel());
}

如果有人需要这个,在 Streamex库中有 StreamEx.zipWith函数:

StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);


fullNames.forEach(System.out::println);  // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"

使用最新的番石榴库(对于 Streams类) ,您应该能够做到这一点

final Map<String, String> result =
Streams.zip(
collection1.stream(),
collection2.stream(),
AbstractMap.SimpleEntry::new)
.collect(Collectors.toMap(e -> e.getKey(), e  -> e.getValue()));

这样可以吗?这是一个简短的函数,它对正在压缩的流进行惰性计算,因此您可以为它提供无限的流(它不需要采用正在压缩的流的大小)。

如果流是有限的,那么当其中一个流耗尽元素时,它就会停止。

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;


class StreamUtils {
static <ARG1, ARG2, RESULT> Stream<RESULT> zip(
Stream<ARG1> s1,
Stream<ARG2> s2,
BiFunction<ARG1, ARG2, RESULT> combiner) {
final var i2 = s2.iterator();
return s1.map(x1 -> i2.hasNext() ? combiner.apply(x1, i2.next()) : null)
.takeWhile(Objects::nonNull);
}
}

下面是一些单元测试代码(比代码本身要长得多!)

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;


import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;


import static org.junit.jupiter.api.Assertions.assertEquals;


class StreamUtilsTest {
@ParameterizedTest
@MethodSource("shouldZipTestCases")
<ARG1, ARG2, RESULT>
void shouldZip(
String testName,
Stream<ARG1> s1,
Stream<ARG2> s2,
BiFunction<ARG1, ARG2, RESULT> combiner,
Stream<RESULT> expected) {
var actual = StreamUtils.zip(s1, s2, combiner);


assertEquals(
expected.collect(Collectors.toList()),
actual.collect(Collectors.toList()),
testName);
}


private static Stream<Arguments> shouldZipTestCases() {
return Stream.of(
Arguments.of(
"Two empty streams",
Stream.empty(),
Stream.empty(),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.empty()),
Arguments.of(
"One singleton and one empty stream",
Stream.of(1),
Stream.empty(),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.empty()),
Arguments.of(
"One empty and one singleton stream",
Stream.empty(),
Stream.of(1),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.empty()),
Arguments.of(
"Two singleton streams",
Stream.of("blah"),
Stream.of(1),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.of(pair("blah", 1))),
Arguments.of(
"One singleton, one multiple stream",
Stream.of("blob"),
Stream.of(2, 3),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.of(pair("blob", 2))),
Arguments.of(
"One multiple, one singleton stream",
Stream.of("foo", "bar"),
Stream.of(4),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.of(pair("foo", 4))),
Arguments.of(
"Two multiple streams",
Stream.of("nine", "eleven"),
Stream.of(10, 12),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.of(pair("nine", 10), pair("eleven", 12)))
);
}


private static List<Object> pair(Object o1, Object o2) {
return List.of(o1, o2);
}


static private <T1, T2> List<Object> combine(T1 o1, T2 o2) {
return List.of(o1, o2);
}


@Test
void shouldLazilyEvaluateInZip() {
final var a = new AtomicInteger();
final var b = new AtomicInteger();
final var zipped = StreamUtils.zip(
Stream.generate(a::incrementAndGet),
Stream.generate(b::decrementAndGet),
(xa, xb) -> xb + 3 * xa);


assertEquals(0, a.get(), "Should not have evaluated a at start");
assertEquals(0, b.get(), "Should not have evaluated b at start");


final var takeTwo = zipped.limit(2);


assertEquals(0, a.get(), "Should not have evaluated a at take");
assertEquals(0, b.get(), "Should not have evaluated b at take");


final var list = takeTwo.collect(Collectors.toList());


assertEquals(2, a.get(), "Should have evaluated a after collect");
assertEquals(-2, b.get(), "Should have evaluated b after collect");
assertEquals(List.of(2, 4), list);
}
}