反应堆异常抛出的正确方法

我是一个新来的项目 ABc0和一般的响应式编程。

我目前正在编写一段类似的代码:

Mono.just(userId)
.map(repo::findById)
.map(user-> {
if(user == null){
throw new UserNotFoundException();
}
return user;
})
// ... other mappings

This example is probably silly and there are surely better ways of implementing this case, but the point is:

Is it wrong to use a throw new exception in a map block or should I replace this with a return Mono.error(new UserNotFoundException())?

这两种做法有什么实际的区别吗?

81520 次浏览

There are a couple of ways that could be considered as a convenient way of exception throwing:

使用 Flux/Mono.handle处理元素

操作符 handle可以简化处理可能导致错误或空流的元素的方法之一。

下面的代码显示了我们如何使用它来解决我们的问题:

Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})

正如我们所看到的,.handle操作符需要传递 BiConsumer<T, SynchronousSink<>以处理元素。这里我们在 BiConsumer 中有两个参数。第一个是来自上游的元素,第二个是 SynchronousSink,它帮助我们同步地向下游供应元素。这种技术扩展了提供元素处理的不同结果的能力。例如,如果元件是无效的,我们可以提供错误的同一个 SycnchronousSync将取消上游和产生 onError信号下游。反过来,我们可以使用相同的 handle操作符“过滤”。一旦执行了句柄 BiConsumer并且没有提供任何元素,反应器就会将其视为一种过滤,并为我们请求额外的元素。最后,在元素有效的情况下,我们可以简单地调用 SynchronousSink#next并在下游传播我们的元素,或者在它上面应用一些映射,所以我们将在这里使用 handle作为 map操作符。此外,我们可以安全地使用这个操作符,它不会对性能造成影响,并提供复杂的元素验证,例如元素验证或向下游发送错误。

使用 #concatMap + Mono.error抛出

One of the options to throw an exception during mapping is to replace map with concatMap. In its essence, concatMap does almost the same flatMap does. The only difference is that concatMap allows only one substream at a time. Such behavior simplifies internal implementation a lot and does not impact performance. So we can use the following code in order to throw an exception in a more functional way:

Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})

在上面的示例中,如果用户无效,我们使用 Mono.error返回异常。我们可以用 Flux.error对通量做同样的处理:

Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})

Note, in both cases we return 很冷 stream which has only one element. In Reactor, there is a couple of optimizations that improve performance in the case returned stream is a cold 标量 stream. Thus, it is recommended to use Flux/Mono concatMap + .just, empty, error as a result when we need more complex mapping, that could end up with return null or throw new ....

立正!不要检查传入元素是否为空。反应堆项目将永远不会为您发送 null值,因为这违反了反应流规范(参见 Rule 2.13) 因此,如果 repo.findById返回 null,反应堆将为您抛出 NullPointerException。

等等,为什么 concatMapflatMap好?

从本质上讲,flatMap被设计用来合并来自一次执行的多个子流的元素。这意味着 latMap 下面应该有异步流,这样它们就可以在多个线程上处理数据,或者可以是多个网络调用。随后,这样的预期对实现影响很大,因此 flatMap应该能够处理来自多个流(Thread)的数据(意味着并发数据结构的使用) ,如果从另一个流(意味着为每个子流为 Queue分配额外的内存)有一个排队元素,并且不违反反应流规范规则(意味着真正复杂的实现)。考虑到所有这些事实,我们将一个简单的 map操作(是同步的)替换为使用 Flux/Mono.error抛出异常的更方便的方式(它不改变执行的同步性) ,导致我们不需要这样一个复杂的操作符,我们可以使用更简单的 concatMap,它是为一次异步处理单个流而设计的,并且有几个优化以处理标量的冷流。

使用 switchIfEmpty引发异常

因此,在结果为空时引发异常的另一种方法是 switchIfEmpty运算符。下面的代码说明了我们如何使用这种方法:

Mono.just(userId)
.flatMap(repo::findById)
.switchIfEmpty(Mono.error(new UserNotFoundExeception()))

正如我们所看到的,在这种情况下,repo::findById应该将 MonoUser作为返回类型。因此,如果找不到 User实例,结果流将为空。因此,反应堆将调用一个替代的 Mono,指定为 switchIfEmpty参数。

按原样抛出异常(例如,在 mapfilter和其他类似操作符中)

它可以被认为是一个不太可读的代码或者不好的实践(我自己的观点) ,但是你可以抛出你的异常(例如 .map(v -> throw ...))。尽管这样做可能会违反 Reactive Streams 规范(从语义的角度来看,在这种情况下违反,因为你的操作符是 Subscriber链中的 Subscriber,因此——从语义上来说,在 lambda 中抛出异常可以映射为在 onNext方法中抛出异常,这违反了 spec's rule 2.13)。但是,由于 Reactor 会为您捕获抛出的异常,然后将其作为 onError信号传播到下游,因此不禁止这样做。

外卖

  1. 使用 .handle操作符以提供复杂的元素处理
  2. 当需要在映射过程中抛出异常时,使用 concatMap + Mono.error,但这种技术最适合异步元素处理的情况。
  3. 使用 flatMap + Mono.error时,我们已经有 flatMap到位
  4. Null作为返回类型是被禁止的,所以在你的下游 map中,你将会得到意想不到的 onErrorNullPointerException
  5. Use switchIfEmpty in all cases when you need to send an error signal if the result of calling some specific function finished with the empty stream