Scala: List[Future] to Future[List] disregarding failed futures

I'm looking for a way to convert an arbitrary length list of Futures to a Future of List. I'm using Playframework, so ultimately, what I really want is a Future[Result], but to make things simpler, let's just say Future[List[Int]] The normal way to do this would be to use Future.sequence(...) but there's a twist... The list I'm given usually has around 10-20 futures in it, and it's not uncommon for one of those futures to fail (they are making external web service requests). Instead of having to retry all of them in the event that one of them fails, I'd like to be able to get at the ones that succeeded and return those.

For example, doing the following doesn't work

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure


val listOfFutures = Future.successful(1) :: Future.failed(new Exception("Failure")) ::
Future.successful(3) :: Nil


val futureOfList = Future.sequence(listOfFutures)


futureOfList onComplete {
case Success(x) => println("Success!!! " + x)
case Failure(ex) => println("Failed !!! " + ex)
}


scala> Failed !!! java.lang.Exception: Failure

Instead of getting the only the exception, I'd like to be able to pull the 1 and 3 out of there. I tried using Future.fold, but that apparently just calls Future.sequence behind the scenes.

Thanks in advance for the help!

62407 次浏览

The trick is to first make sure that none of the futures has failed. .recover is your friend here, you can combine it with map to convert all the Future[T] results to Future[Try[T]]] instances, all of which are certain to be successful futures.

note: You can use Option or Either as well here, but Try is the cleanest way if you specifically want to trap exceptions

def futureToFutureTry[T](f: Future[T]): Future[Try[T]] =
f.map(Success(_)).recover { case x => Failure(x)}


val listOfFutures = ...
val listOfFutureTrys = listOfFutures.map(futureToFutureTry(_))

然后像以前一样使用 Future.sequence,给你一个 Future[List[Try[T]]]

val futureListOfTrys = Future.sequence(listOfFutureTrys)

Then filter:

val futureListOfSuccesses = futureListOfTrys.map(_.filter(_.isSuccess))

如果你需要的话,你甚至可以找出具体的失败:

val futureListOfFailures = futureListOfTrys.map(_.filter(_.isFailure))

我尝试了 Kevin 的答案,然后我在我的 Scala 版本(2.11.5)上遇到了一个小故障... ... 我纠正了这个问题,如果有人感兴趣的话,我还写了一些额外的测试... ... 这是我的版本 >

implicit class FutureCompanionOps(val f: Future.type) extends AnyVal {


/** Given a list of futures `fs`, returns the future holding the list of Try's of the futures from `fs`.
* The returned future is completed only once all of the futures in `fs` have been completed.
*/
def allAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
val listOfFutureTrys: List[Future[Try[T]]] = fItems.map(futureToFutureTry)
Future.sequence(listOfFutureTrys)
}


def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = {
f.map(Success(_)) .recover({case x => Failure(x)})
}


def allFailedAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
allAsTrys(fItems).map(_.filter(_.isFailure))
}


def allSucceededAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
allAsTrys(fItems).map(_.filter(_.isSuccess))
}
}




// Tests...






// allAsTrys tests
//
test("futureToFutureTry returns Success if no exception") {
val future =  Future.futureToFutureTry(Future{"mouse"})
Thread.sleep(0, 100)
val futureValue = future.value
assert(futureValue == Some(Success(Success("mouse"))))
}
test("futureToFutureTry returns Failure if exception thrown") {
val future =  Future.futureToFutureTry(Future{throw new IllegalStateException("bad news")})
Thread.sleep(5)            // need to sleep a LOT longer to get Exception from failure case... interesting.....
val futureValue = future.value


assertResult(true) {
futureValue match {
case Some(Success(Failure(error: IllegalStateException)))  => true
}
}
}
test("Future.allAsTrys returns Nil given Nil list as input") {
val future =  Future.allAsTrys(Nil)
assert ( Await.result(future, 100 nanosecond).isEmpty )
}
test("Future.allAsTrys returns successful item even if preceded by failing item") {
val future1 =  Future{throw new IllegalStateException("bad news")}
var future2 = Future{"dog"}


val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
val listOfTrys =  Await.result(futureListOfTrys, 10 milli)
System.out.println("successItem:" + listOfTrys);


assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
assert(listOfTrys(1) == Success("dog"))
}
test("Future.allAsTrys returns successful item even if followed by failing item") {
var future1 = Future{"dog"}
val future2 =  Future{throw new IllegalStateException("bad news")}


val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
System.out.println("successItem:" + listOfTrys);


assert(listOfTrys(1).failed.get.getMessage.contains("bad news"))
assert(listOfTrys(0) == Success("dog"))
}
test("Future.allFailedAsTrys returns the failed item and only that item") {
var future1 = Future{"dog"}
val future2 =  Future{throw new IllegalStateException("bad news")}


val futureListOfTrys =  Future.allFailedAsTrys(List(future1,future2))
val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
assert(listOfTrys.size == 1)
}
test("Future.allSucceededAsTrys returns the succeeded item and only that item") {
var future1 = Future{"dog"}
val future2 =  Future{throw new IllegalStateException("bad news")}


val futureListOfTrys =  Future.allSucceededAsTrys(List(future1,future2))
val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
assert(listOfTrys(0) == Success("dog"))
assert(listOfTrys.size == 1)
}

我只是碰巧遇到这个问题,并提出了另一个解决方案:

def allSuccessful[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])
(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]],
executor: ExecutionContext): Future[M[A]] = {
in.foldLeft(Future.successful(cbf(in))) {
(fr, fa) ⇒ (for (r ← fr; a ← fa) yield r += a) fallbackTo fr
} map (_.result())
}

这里的想法是,在折叠中,您正在等待列表中的下一个元素完成(使用 for 理解语法) ,如果下一个元素失败,您只需回到已经拥有的元素。

Scala 2.12对 Future.transform进行了改进,使得答案的代码更少。

val futures = Seq(Future{1},Future{throw new Exception})


// instead of `map` and `recover`, use `transform`
val seq = Future.sequence(futures.map(_.transform(Success(_))))


val successes = seq.map(_.collect{case Success(x)=>x})
successes
//res1: Future[Seq[Int]] = Future(Success(List(1)))


val failures = seq.map(_.collect{case Failure(x)=>x})
failures
//res2: Future[Seq[Throwable]] = Future(Success(List(java.lang.Exception)))

你可以很容易地用选项包装未来的结果,然后展开列表:

def futureToFutureOption[T](f: Future[T]): Future[Option[T]] =
f.map(Some(_)).recover {
case e => None
}
val listOfFutureOptions = listOfFutures.map(futureToFutureOption(_))


val futureListOfOptions = Future.sequence(listOfFutureOptions)


val futureListOfSuccesses = futureListOfOptions.flatten

你亦可以在不同的名单中收集成功或不成功的结果:

def safeSequence[A](futures: List[Future[A]]): Future[(List[Throwable], List[A])] = {
futures.foldLeft(Future.successful((List.empty[Throwable], List.empty[A]))) { (flist, future) =>
flist.flatMap { case (elist, alist) =>
future
.map { success => (elist, alist :+ success) }
.recover { case error: Throwable => (elist :+ error, alist) }
}
}
}

如果出于某种原因,例如,日志记录或条件处理,需要保留失败的期货,那么可以使用 Scala 2.12 + 。您可以找到工作代码 给你

val f1 = Future(1)
val f2 = Future(2)
val ff = Future.failed(new Exception())


val futures: Seq[Future[Either[Throwable, Int]]] =
Seq(f1, f2, ff).map(_.transform(f => Success(f.toEither)))


val sum = Future
.sequence(futures)
.map { eithers =>
val (failures, successes) = eithers.partitionMap(identity)


val fsum = failures.map(_ => 100).sum
val ssum = successes.sum


fsum + ssum
}


assert(Await.result(sum, 1.second) == 103)