Java 8: Lambda-Streams,过滤方法与异常

我有一个问题,尝试Java 8的Lambda表达式。 通常它工作得很好,但现在我有抛出IOException的方法。 最好看看下面的代码:

class Bank{
....
public Set<String> getActiveAccountNumbers() throws IOException {
Stream<Account> s =  accounts.values().stream();
s = s.filter(a -> a.isActive());
Stream<String> ss = s.map(a -> a.getNumber());
return ss.collect(Collectors.toSet());
}
....
}


interface Account{
....
boolean isActive() throws IOException;
String getNumber() throws IOException;
....
}

问题是,它不能编译,因为我必须捕获isActive-和getNumber-Methods的可能异常。但是,即使我显式地使用如下所示的try-catch-Block,它仍然不能编译,因为我没有捕获异常。所以,要么是JDK有bug,要么是我不知道如何捕捉这些异常。

class Bank{
....
//Doesn't compile either
public Set<String> getActiveAccountNumbers() throws IOException {
try{
Stream<Account> s =  accounts.values().stream();
s = s.filter(a -> a.isActive());
Stream<String> ss = s.map(a -> a.getNumber());
return ss.collect(Collectors.toSet());
}catch(IOException ex){
}
}
....
}

我怎样才能让它工作呢?谁能给我点提示吗?

197580 次浏览

你必须捕获异常之前,它转义lambda:

s = s.filter(a -> {
try {
return a.isActive();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});

考虑到lambda不是在编写它的地方求值的,而是在JDK类中某个完全不相关的地方求值的。所以那将是被检查异常将被抛出的点,在那里它没有被声明。

你可以使用你的lambda的包装器来处理它,将检查异常转换为未检查异常:

public static <T> T uncheckCall(Callable<T> callable) {
try {
return callable.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

你的例子可以写成

return s.filter(a -> uncheckCall(a::isActive))
.map(Account::getNumber)
.collect(toSet());

在我的项目中,我不带包装地处理这个问题;相反,我使用一种方法,有效地化解编译器的异常检查。不用说,这应该小心处理,项目中的每个人都必须意识到,在未声明的地方可能会出现受控异常。这是管道代码:

public static <T> T uncheckCall(Callable<T> callable) {
try {
return callable.call();
} catch (Exception e) {
sneakyThrow(e);
return null; // Unreachable but needed to satisfy compiler
}
}


public static void uncheckRun(RunnableExc r) {
try {
r.run();
} catch (Exception e) {
sneakyThrow(e);
}
}


public interface RunnableExc {
void run() throws Exception;
}


@SuppressWarnings("unchecked")
private static <T extends Throwable> void sneakyThrow(Throwable t) throws T {
throw (T) t;
}

并且你可以期望得到一个IOException扔在你的脸上,即使collect没有声明它。在大部分,但不是全部现实生活中的情况下,无论如何,你只需要重新抛出异常,并将其作为一般失败处理。在所有这些情况下,在清晰度和正确性方面没有任何损失。只是要注意其他情况,在这些情况下,您实际上希望当场对异常做出反应。编译器不会让开发人员知道那里有一个IOException要捕捉,实际上,如果你试图捕捉它,编译器会报错,因为我们已经欺骗了它,让它相信没有这样的异常可以抛出。

你也可以用lambdas传播你的静态疼痛,这样整个东西看起来可读:

s.filter(a -> propagate(a::isActive))

这里的propagate接收java.util.concurrent.Callable作为参数,并将调用期间捕获的任何异常转换为RuntimeException。在Guava中有一个类似的转换方法Throwable #传播(Throwable)

这个方法对于lambda方法链接来说是必不可少的,所以我希望有一天它会被添加到一个流行的库中,或者这种传播行为将是默认的。

public class PropagateExceptionsSample {
// a simplified version of Throwables#propagate
public static RuntimeException runtime(Throwable e) {
if (e instanceof RuntimeException) {
return (RuntimeException)e;
}


return new RuntimeException(e);
}


// this is a new one, n/a in public libs
// Callable just suits as a functional interface in JDK throwing Exception
public static <V> V propagate(Callable<V> callable){
try {
return callable.call();
} catch (Exception e) {
throw runtime(e);
}
}


public static void main(String[] args) {
class Account{
String name;
Account(String name) { this.name = name;}


public boolean isActive() throws IOException {
return name.startsWith("a");
}
}




List<Account> accounts = new ArrayList<>(Arrays.asList(new Account("andrey"), new Account("angela"), new Account("pamela")));


Stream<Account> s = accounts.stream();


s
.filter(a -> propagate(a::isActive))
.map(a -> a.name)
.forEach(System.out::println);
}
}

为了正确地添加IOException(到RuntimeException)处理代码,你的方法看起来像这样:

Stream<Account> s =  accounts.values().stream();


s = s.filter(a -> { try { return a.isActive(); }
catch (IOException e) { throw new RuntimeException(e); }});


Stream<String> ss = s.map(a -> { try { return a.getNumber() }
catch (IOException e) { throw new RuntimeException(e); }});


return ss.collect(Collectors.toSet());

现在的问题是,IOException必须被捕获为RuntimeException,并转换回IOException——这将为上面的方法添加更多的代码。

为什么要使用Stream,因为它可以这样做——而且该方法抛出IOException,因此也不需要额外的代码:

Set<String> set = new HashSet<>();
for(Account a: accounts.values()){
if(a.isActive()){
set.add(a.getNumber());
}
}
return set;

你可以通过包装你的lambda来抛出一个未检查的异常,然后在终端操作中解开这个未检查的异常,从而滚动你自己的Stream变体:

@FunctionalInterface
public interface ThrowingPredicate<T, X extends Throwable> {
public boolean test(T t) throws X;
}


@FunctionalInterface
public interface ThrowingFunction<T, R, X extends Throwable> {
public R apply(T t) throws X;
}


@FunctionalInterface
public interface ThrowingSupplier<R, X extends Throwable> {
public R get() throws X;
}


public interface ThrowingStream<T, X extends Throwable> {
public ThrowingStream<T, X> filter(
ThrowingPredicate<? super T, ? extends X> predicate);


public <R> ThrowingStream<T, R> map(
ThrowingFunction<? super T, ? extends R, ? extends X> mapper);


public <A, R> R collect(Collector<? super T, A, R> collector) throws X;


// etc
}


class StreamAdapter<T, X extends Throwable> implements ThrowingStream<T, X> {
private static class AdapterException extends RuntimeException {
public AdapterException(Throwable cause) {
super(cause);
}
}


private final Stream<T> delegate;
private final Class<X> x;


StreamAdapter(Stream<T> delegate, Class<X> x) {
this.delegate = delegate;
this.x = x;
}


private <R> R maskException(ThrowingSupplier<R, X> method) {
try {
return method.get();
} catch (Throwable t) {
if (x.isInstance(t)) {
throw new AdapterException(t);
} else {
throw t;
}
}
}


@Override
public ThrowingStream<T, X> filter(ThrowingPredicate<T, X> predicate) {
return new StreamAdapter<>(
delegate.filter(t -> maskException(() -> predicate.test(t))), x);
}


@Override
public <R> ThrowingStream<R, X> map(ThrowingFunction<T, R, X> mapper) {
return new StreamAdapter<>(
delegate.map(t -> maskException(() -> mapper.apply(t))), x);
}


private <R> R unmaskException(Supplier<R> method) throws X {
try {
return method.get();
} catch (AdapterException e) {
throw x.cast(e.getCause());
}
}


@Override
public <A, R> R collect(Collector<T, A, R> collector) throws X {
return unmaskException(() -> delegate.collect(collector));
}
}

然后你可以像使用Stream一样使用它:

Stream<Account> s = accounts.values().stream();
ThrowingStream<Account, IOException> ts = new StreamAdapter<>(s, IOException.class);
return ts.filter(Account::isActive).map(Account::getNumber).collect(toSet());

这个解决方案需要相当多的样板文件,所以我建议你看看我已经创建的库,它所做的正是我在这里为整个Stream类所描述的(以及更多!)

使用#propagate()方法。来自由Sam Beran撰写的Java 8博客的非guava实现示例:

public class Throwables {
public interface ExceptionWrapper<E> {
E wrap(Exception e);
}


public static <T> T propagate(Callable<T> callable) throws RuntimeException {
return propagate(callable, RuntimeException::new);
}


public static <T, E extends Throwable> T propagate(Callable<T> callable, ExceptionWrapper<E> wrapper) throws E {
try {
return callable.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw wrapper.wrap(e);
}
}
}

这个UtilException帮助类允许你在Java流中使用任何受控异常,如下所示:

Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
.map(rethrowFunction(Class::forName))
.collect(Collectors.toList());

注意Class::forName抛出ClassNotFoundException,即检查。流本身也会抛出ClassNotFoundException和NOT一些未检查的包装异常。

public final class UtilException {


@FunctionalInterface
public interface Consumer_WithExceptions<T, E extends Exception> {
void accept(T t) throws E;
}


@FunctionalInterface
public interface BiConsumer_WithExceptions<T, U, E extends Exception> {
void accept(T t, U u) throws E;
}


@FunctionalInterface
public interface Function_WithExceptions<T, R, E extends Exception> {
R apply(T t) throws E;
}


@FunctionalInterface
public interface Supplier_WithExceptions<T, E extends Exception> {
T get() throws E;
}


@FunctionalInterface
public interface Runnable_WithExceptions<E extends Exception> {
void run() throws E;
}


/** .forEach(rethrowConsumer(name -> System.out.println(Class.forName(name)))); or .forEach(rethrowConsumer(ClassNameUtil::println)); */
public static <T, E extends Exception> Consumer<T> rethrowConsumer(Consumer_WithExceptions<T, E> consumer) throws E {
return t -> {
try { consumer.accept(t); }
catch (Exception exception) { throwAsUnchecked(exception); }
};
}


public static <T, U, E extends Exception> BiConsumer<T, U> rethrowBiConsumer(BiConsumer_WithExceptions<T, U, E> biConsumer) throws E {
return (t, u) -> {
try { biConsumer.accept(t, u); }
catch (Exception exception) { throwAsUnchecked(exception); }
};
}


/** .map(rethrowFunction(name -> Class.forName(name))) or .map(rethrowFunction(Class::forName)) */
public static <T, R, E extends Exception> Function<T, R> rethrowFunction(Function_WithExceptions<T, R, E> function) throws E {
return t -> {
try { return function.apply(t); }
catch (Exception exception) { throwAsUnchecked(exception); return null; }
};
}


/** rethrowSupplier(() -> new StringJoiner(new String(new byte[]{77, 97, 114, 107}, "UTF-8"))), */
public static <T, E extends Exception> Supplier<T> rethrowSupplier(Supplier_WithExceptions<T, E> function) throws E {
return () -> {
try { return function.get(); }
catch (Exception exception) { throwAsUnchecked(exception); return null; }
};
}


/** uncheck(() -> Class.forName("xxx")); */
public static void uncheck(Runnable_WithExceptions t)
{
try { t.run(); }
catch (Exception exception) { throwAsUnchecked(exception); }
}


/** uncheck(() -> Class.forName("xxx")); */
public static <R, E extends Exception> R uncheck(Supplier_WithExceptions<R, E> supplier)
{
try { return supplier.get(); }
catch (Exception exception) { throwAsUnchecked(exception); return null; }
}


/** uncheck(Class::forName, "xxx"); */
public static <T, R, E extends Exception> R uncheck(Function_WithExceptions<T, R, E> function, T t) {
try { return function.apply(t); }
catch (Exception exception) { throwAsUnchecked(exception); return null; }
}


@SuppressWarnings ("unchecked")
private static <E extends Throwable> void throwAsUnchecked(Exception exception) throws E { throw (E)exception; }


}

关于如何使用它的许多其他示例(在静态导入UtilException之后):

@Test
public void test_Consumer_with_checked_exceptions() throws IllegalAccessException {
Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
.forEach(rethrowConsumer(className -> System.out.println(Class.forName(className))));


Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
.forEach(rethrowConsumer(System.out::println));
}


@Test
public void test_Function_with_checked_exceptions() throws ClassNotFoundException {
List<Class> classes1
= Stream.of("Object", "Integer", "String")
.map(rethrowFunction(className -> Class.forName("java.lang." + className)))
.collect(Collectors.toList());


List<Class> classes2
= Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
.map(rethrowFunction(Class::forName))
.collect(Collectors.toList());
}


@Test
public void test_Supplier_with_checked_exceptions() throws ClassNotFoundException {
Collector.of(
rethrowSupplier(() -> new StringJoiner(new String(new byte[]{77, 97, 114, 107}, "UTF-8"))),
StringJoiner::add, StringJoiner::merge, StringJoiner::toString);
}


@Test
public void test_uncheck_exception_thrown_by_method() {
Class clazz1 = uncheck(() -> Class.forName("java.lang.String"));


Class clazz2 = uncheck(Class::forName, "java.lang.String");
}


@Test (expected = ClassNotFoundException.class)
public void test_if_correct_exception_is_still_thrown_by_method() {
Class clazz3 = uncheck(Class::forName, "INVALID");
}

但是,在了解以下优点、缺点和限制之前,不要使用它:

如果调用代码要处理检查异常,则必须将其添加到包含流的方法的throws子句中。 编译器不会强迫你再添加它,所以更容易忘记它 如果调用代码已经处理了检查异常,编译器会提醒你将throws子句添加到方法声明中 包含流(如果你不这样做,它会说:异常从未在相应的try语句体中抛出) 在任何情况下,你都不能环绕流本身来捕获包含流的方法内部的检查异常 (如果你尝试,编译器会说:异常从未在相应的try语句体中抛出) 如果你调用的方法字面上永远不能抛出它声明的异常,那么你不应该包含throws子句。 例如:new String(byteArr, "UTF-8")抛出UnsupportedEncodingException,但Java规范保证UTF-8始终存在。 在这里,throws声明是一个麻烦,任何用最少的样板文件来沉默它的解决方案都是受欢迎的 如果你讨厌受控异常,觉得不应该从一开始就把它添加到Java语言中(越来越多的人这样认为), 并且我不是其中之一),那么就不要将检查异常添加到包含流的方法的throws子句中。的检查

. exception将像一个未检查的异常一样

•如果您正在实现一个严格的接口,其中您没有添加throws声明的选项,但抛出异常 完全合适,然后包装一个异常只是为了获得抛出它的特权,结果是一个带有虚假异常的堆栈跟踪 不要提供关于实际出错的信息。一个很好的例子是Runnable.run(),它不会抛出任何受控异常。 在这种情况下,您可以决定不将检查异常添加到包含流的方法的throws子句 在任何情况下,如果你决定不添加(或忘记添加)检查异常到包含流的方法的throws子句中, 注意抛出CHECKED异常的两个后果:

1)调用代码将无法通过名称捕获它(如果你尝试,编译器会说:异常从未在相应的try体中抛出 声明)。它会冒泡,并可能在主程序循环中被一些“catch Exception”或“catch Throwable”捕获,这可能是你 希望不管怎样。< / p > 它违反了最小意外原则:它将不再足够捕捉RuntimeException能够确保捕获所有 可能的例外。基于这个原因,我认为这不应该在框架代码中完成,而应该在你完全控制的业务代码中完成

总之:我相信这里的限制并不严重,并且可以放心地使用UtilException类。然而,这取决于你!

扩展@marcg解决方案,你通常可以在Streams中抛出和捕获检查异常;也就是说,编译器会要求你捕获/重新抛出是你的外部流!!

@FunctionalInterface
public interface Predicate_WithExceptions<T, E extends Exception> {
boolean test(T t) throws E;
}


/**
* .filter(rethrowPredicate(t -> t.isActive()))
*/
public static <T, E extends Exception> Predicate<T> rethrowPredicate(Predicate_WithExceptions<T, E> predicate) throws E {
return t -> {
try {
return predicate.test(t);
} catch (Exception exception) {
return throwActualException(exception);
}
};
}


@SuppressWarnings("unchecked")
private static <T, E extends Exception> T throwActualException(Exception exception) throws E {
throw (E) exception;
}

然后,您的示例将如下所示(添加测试以更清楚地显示它):

@Test
public void testPredicate() throws MyTestException {
List<String> nonEmptyStrings = Stream.of("ciao", "")
.filter(rethrowPredicate(s -> notEmpty(s)))
.collect(toList());
assertEquals(1, nonEmptyStrings.size());
assertEquals("ciao", nonEmptyStrings.get(0));
}


private class MyTestException extends Exception { }


private boolean notEmpty(String value) throws MyTestException {
if(value==null) {
throw new MyTestException();
}
return !value.isEmpty();
}


@Test
public void testPredicateRaisingException() throws MyTestException {
try {
Stream.of("ciao", null)
.filter(rethrowPredicate(s -> notEmpty(s)))
.collect(toList());
fail();
} catch (MyTestException e) {
//OK
}
}

考虑到这个问题,我开发了一个小型库来处理受控异常和lambdas。自定义适配器允许您与现有的函数类型集成:

stream().map(unchecked(URI::new)) //with a static import

https://github.com/TouK/ThrowingFunction/

如果你不介意使用第三方库,AOL的cyclops-react库,disclosure::I是贡献者,有一个ExceptionSoftener类可以在这里提供帮助。

 s.filter(softenPredicate(a->a.isActive()));

你的例子可以写成:

import utils.stream.Unthrow;


class Bank{
....
public Set<String> getActiveAccountNumbers() {
return accounts.values().stream()
.filter(a -> Unthrow.wrap(() -> a.isActive()))
.map(a -> Unthrow.wrap(() -> a.getNumber()))
.collect(Collectors.toSet());
}
....
}

Unthrow类可以在这里https://github.com/SeregaLBN/StreamUnthrower

TLDR:尝试通过重构你的代码来避免这个问题:“安全”操作;在lambdas中只使用安全的操作。


细节:

这并没有直接回答问题(有很多其他的答案),但试图从一开始就避免这个问题:

根据我的经验,在Stream(或其他lambda表达式)中处理异常的需求通常来自这样一个事实,即异常被声明为从不应该抛出的方法抛出。这通常来自于将业务逻辑与输入和输出混合。你的Account接口就是一个完美的例子:

interface Account {
boolean isActive() throws IOException;
String getNumber() throws IOException;
}

与其在每个getter上抛出IOException,不如考虑以下设计:

interface AccountReader {
Account readAccount(…) throws IOException;
}


interface Account {
boolean isActive();
String getNumber();
}

方法AccountReader.readAccount(…)可以从数据库或文件中读取帐户,如果不成功则抛出异常。它构造了一个Account对象,该对象已经包含了所有值,可以随时使用。由于值已经被readAccount(…)加载,getter不会抛出异常。因此,你可以在lambdas中自由地使用它们,而不需要包装、屏蔽或隐藏异常。

注意,你仍然需要处理由readAccount(…)引发的异常。毕竟,这就是异常存在的首要原因。但假设readAccount(…)被“;在其他地方”使用,即在lambdas之外,在那里你可以使用“;normal"Java提供的异常处理机制,即try-catch处理它或throws让它“冒泡”。

当然,不可能总是按照我描述的方式来做,但通常是这样的,它会导致更干净的代码(恕我直言):

  • 更好的分离关注点和后续的单一责任原则
  • 更少的样板文件:你不必用throws IOException来混淆你的代码,而只是为了满足编译器
  • 错误处理:在错误发生的地方处理错误(从文件或数据库读取时),而不是在业务逻辑中间的某个地方处理错误,因为您想获得一个字段值
  • 你可以使Account 不可变的并从其优点中获益(例如线程安全)
  • 你不需要“肮脏的诡计”;或在lambdas中使用Account的变通方法(例如在Stream中)

它可以通过下面的简单代码来解决,在AbacusUtil中使用试一试:

Stream.of(accounts).filter(a -> Try.call(a::isActive)).map(a -> Try.call(a::getNumber)).toSet();

披露:我是AbacusUtil的开发者。

Java中的功能接口不声明任何已检查或未检查的异常。 我们需要更改方法的签名:

boolean isActive() throws IOException;
String getNumber() throwsIOException;

:

boolean isActive();
String getNumber();

或者用try-catch block处理它:

public Set<String> getActiveAccountNumbers() {
Stream<Account> s =  accounts.values().stream();
s = s.filter(a ->
try{
a.isActive();
}catch(IOException e){
throw new RuntimeException(e);
}
);
Stream<String> ss = s.map(a ->
try{
a.getNumber();
}catch(IOException e){
throw new RuntimeException(e);
}
);
return ss.collect(Collectors.toSet());
}
另一种选择是编写自定义包装器或使用像ThrowingFunction这样的库。 对于库,我们只需要将依赖项添加到pom.xml:

<dependency>
<groupId>pl.touk</groupId>
<artifactId>throwing-function</artifactId>
<version>1.3</version>
</dependency>

并使用特定的类,如ThrowingFunction, ThrowingConsumer, ThrowingPredicate, ThrowingRunnable, ThrowingSupplier。

代码的最后是这样的:

public Set<String> getActiveAccountNumbers() {
return accounts.values().stream()
.filter(ThrowingPredicate.unchecked(Account::isActive))
.map(ThrowingFunction.unchecked(Account::getNumber))
.collect(Collectors.toSet());
}

如果你想处理流中的异常并继续处理额外的异常,Brian Vermeer在DZone中有一篇很棒的文章,使用了Either的概念。这是一种处理这种情况的好方法。唯一缺少的是示例代码。这是我使用那篇文章中的概念进行探索的一个示例。

@Test
public void whenValuePrinted_thenPrintValue() {


List<Integer> intStream = Arrays.asList(0, 1, 2, 3, 4, 5, 6);
intStream.stream().map(Either.liftWithValue(item -> doSomething(item)))
.map(item -> item.isLeft() ? item.getLeft() : item.getRight())
.flatMap(o -> {
System.out.println(o);
return o.isPresent() ? Stream.of(o.get()) : Stream.empty();
})
.forEach(System.out::println);
}


private Object doSomething(Integer item) throws Exception {


if (item == 0) {
throw new Exception("Zero ain't a number!");
} else if (item == 4) {
return Optional.empty();
}


return item;
}

也可以使用一些外部(流)错误指示器在更高级别抛出异常:

List<String> errorMessages = new ArrayList<>(); // error indicator
//..
errorMessages.clear();


List<String> names = new ArrayList<>(Arrays.asList("andrey", "angela", "pamela"));


names.stream()
.map(name -> {
if (name != "pamela") {
errorMessages.add(name + " is wrong here!");
return null; // triggering the indicator
}
return name;
} )
.filter(elem -> (elem != null)) // bypassing propagation of only current unwanted data
//.filter(elem -> (errorMessages.size() == 0)) // or blocking any propagation once unwanted data detected
.forEach(System.out::println);


if (errorMessages.size() > 0) { // handling the indicator
throw  new RuntimeException(String,join(", ", errorMessages));
}