Java 8 Stream with batch processing

I have a large file that contains a list of items.

I would like to create a batch of items, make an HTTP request with this batch (all of the items are needed as parameters in the HTTP request). I can do it very easily with a for loop, but as Java 8 lover, I want to try writing this with Java 8's Stream framework (and reap the benefits of lazy processing).

Example:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}


if (batch.size() > 0) process(batch);

I want to do something a long the line of lazyFileStream.group(500).map(processBatch).collect(toList())

What would be the best way to do this?

95497 次浏览

Note! 此解决方案在运行 forEach 之前读取整个文件。

你可以使用 JoOλ,一个扩展 Java8流的库,用于单线程、连续的流用例:

Seq.seq(lazyFileStream)              // Seq<String>
.zipWithIndex()                   // Seq<Tuple2<String, Long>>
.groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});

Behind the scenes, zipWithIndex() is just:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();


class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;


@Override
public boolean hasNext() {
return it.hasNext();
}


@Override
public Tuple2<T, Long> next() {
return tuple(it.next(), index++);
}
}


return seq(new ZipWithIndex());
}

groupBy()是方便用户的 API:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}

(免责声明: 我为 joOOλ 背后的公司工作)

纯 Java-8的实现也是可能的:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));

注意,与 JOOL 不同,它可以很好地并行工作(假设您的 data是一个随机访问列表)。

You could also take a look at 剑水蚤-反应, I am the author of this library. It implements the jOOλ interface (and by extension JDK 8 Streams), but unlike JDK 8 Parallel Streams it has a focus on Asynchronous operations (such as potentially blocking Async I/O calls). JDK Parallel Streams, by contrast focus on data parallelism for CPU bound operations. It works by managing aggregates of Future based tasks under the hood, but presents a standard extended Stream API to end users.

下面的示例代码可以帮助您入门

LazyFutureStream.parallelCommonBuilder()
.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();

There is a 这里有关于批处理的教程

还有 more general Tutorial here

要使用您自己的线程池(可能更适合于阻塞 I/O) ,您可以使用

     LazyReact reactor = new LazyReact(40);


reactor.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();

你也可以使用 RxJava:

RxJavav3:

int batchSize = 50;
List<Table> tables = new ArrayList<>();
Observable.fromIterable(_someStream_)
.buffer(batchSize)
.map(batch -> process(batch))
.blockingSubscribe(tables::addAll, t -> Log.warning("Error", t));

Previous version:

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

或者

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

或者

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();

For completeness, here is a 番石榴 solution.

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

In the question the collection is available so a stream isn't needed and it can be written as,

Iterables.partition(data, batchSize).forEach(this::process);

Pure Java8解决方案 :

我们可以创建一个自定义收集器来优雅地完成这项工作,它接受一个 batch size和一个 Consumer来处理每个批处理:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;


import static java.util.Objects.requireNonNull;




/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {


private final int batchSize;
private final Consumer<List<T>> batchProcessor;




/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);


this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}


public Supplier<List<T>> supplier() {
return ArrayList::new;
}


public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}


public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}


public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}


public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}

然后可选地创建一个 helper 实用工具类:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;


public class StreamUtils {


/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}

示例用法:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();


int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);


input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));

我也在 GitHub 上发布了我的代码,如果有人想看的话:

链接到 Github

我为这样的场景编写了一个定制的 Spliterator。它将从输入 Stream 中填充给定大小的列表。这种方法的优点是它将执行延迟处理,并且可以与其他流函数一起工作。

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
return batchSize <= 0
? Stream.of(stream.collect(Collectors.toList()))
: StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}


private static class BatchSpliterator<E> implements Spliterator<List<E>> {


private final Spliterator<E> base;
private final int batchSize;


public BatchSpliterator(Spliterator<E> base, int batchSize) {
this.base = base;
this.batchSize = batchSize;
}


@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final List<E> batch = new ArrayList<>(batchSize);
for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
;
if (batch.isEmpty())
return false;
action.accept(batch);
return true;
}


@Override
public Spliterator<List<E>> trySplit() {
if (base.estimateSize() <= batchSize)
return null;
final Spliterator<E> splitBase = this.base.trySplit();
return splitBase == null ? null
: new BatchSpliterator<>(splitBase, batchSize);
}


@Override
public long estimateSize() {
final double baseSize = base.estimateSize();
return baseSize == 0 ? 0
: (long) Math.ceil(baseSize / (double) batchSize);
}


@Override
public int characteristics() {
return base.characteristics();
}


}

我们也有类似的问题要解决。我们希望获得一个大于系统内存的流(遍历数据库中的所有对象) ,并尽可能地随机化顺序——我们认为缓冲10,000个项目并随机化它们是可行的。

目标是一个在流中采取的函数。

在这里提出的解决方案中,似乎有一系列选择:

  • 使用各种非 java 8附加库
  • 从一些不是流的东西开始-例如一个随机访问列表
  • 有一个流,可以很容易地分裂在一个分裂器

我们的本能最初是使用一个定制收藏家,但这意味着退出流。上面的自定义收集器解决方案非常好,我们几乎使用了它。

Here's a solution which cheats by using the fact that Streams can give you an Iterator which you can use as 逃生出口 to let you do something extra that streams don't support. The Iterator is converted back to a stream using another bit of Java 8 StreamSupport sorcery.

/**
* An iterator which returns batches of items taken from another iterator
*/
public class BatchingIterator<T> implements Iterator<List<T>> {
/**
* Given a stream, convert it to a stream of batches no greater than the
* batchSize.
* @param originalStream to convert
* @param batchSize maximum size of a batch
* @param <T> type of items in the stream
* @return a stream of batches taken sequentially from the original stream
*/
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}


private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}


private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;


public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}


@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}


@Override
public List<T> next() {
return currentBatch;
}


private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}

使用这种方法的一个简单示例如下:

@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}

上面的指纹

[A, B, C]
[D, E, F]

对于我们的用例,我们希望对批处理进行洗牌,然后将它们保持为一个流——它看起来是这样的:

@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
// the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}

这个输出类似(它是随机的,每次都不一样)

A
C
B
E
D
F

这里的秘诀是总有一个流,所以你可以操作一系列的批处理,或者对每一批处理做一些事情,然后 flatMap它回到一个流。更好的是,以上所有操作仅作为最终的 forEachcollect或其他终止表达式 运行,数据通过流。

事实证明,iterator是流上 终止操作终止操作终止操作的一种特殊类型,它不会导致整个流运行并进入内存!感谢 Java8的家伙们的杰出设计!

使用 Spliterator 的简单示例

    // read file into stream, try-with-resources
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
//skip header
Spliterator<String> split = stream.skip(1).spliterator();
Chunker<String> chunker = new Chunker<String>();
while(true) {
boolean more = split.tryAdvance(chunker::doSomething);
if (!more) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}


static class Chunker<T> {
int ct = 0;
public void doSomething(T line) {
System.out.println(ct++ + " " + line.toString());
if (ct % 100 == 0) {
System.out.println("====================chunk=====================");
}
}
}

布鲁斯的回答更全面,但我想找一些快速和肮脏的东西来处理一堆文件。

纯 Java8示例也可以使用并行流。

使用方法:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

方法的声明和实现:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
List<ElementType> newBatch = new ArrayList<>(batchSize);


stream.forEach(element -> {
List<ElementType> fullBatch;


synchronized (newBatch)
{
if (newBatch.size() < batchSize)
{
newBatch.add(element);
return;
}
else
{
fullBatch = new ArrayList<>(newBatch);
newBatch.clear();
newBatch.add(element);
}
}


batchProcessor.accept(fullBatch);
});


if (newBatch.size() > 0)
batchProcessor.accept(new ArrayList<>(newBatch));
}

这是一个延迟计算的纯 Java 解决方案。

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}

使用 Java 8com.google.common.collect.Lists,您可以执行以下操作:

public class BatchProcessingUtil {
public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
List<List<T>> batches = Lists.partition(data, batchSize);
return batches.stream()
.map(processFunction) // Send each batch to the process function
.flatMap(Collection::stream) // flat results to gather them in 1 stream
.collect(Collectors.toList());
}
}

这里的 T是输入列表中项目的类型,U是输出列表中项目的类型

你可以这样使用:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
userKeys,
10, // Batch Size
partialKeys -> service.getUsers(partialKeys)
);

You can use apache.commons :

ListUtils.partition(ListOfLines, 500).stream()
.map(partition -> processBatch(partition)
.collect(Collectors.toList());

分区部分是非惰性地完成的,但是在对列表进行分区之后,您将获得使用流的好处(例如,使用并行流、添加过滤器等)。 其他答案建议采用更精细的解决方案,但有时可读性和可维护性更重要(有时不是: ——)

In all fairness, take a look at the elegant Vavr solution:

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);