Scala 并发的用例是什么?

我正在阅读 SIP-14Future的概念是完美的意义和容易理解。但是对于 Promise有两个问题:

  1. 这怎么可能? FuturePromise不是两种不同的类型吗?

  2. 我们应该什么时候使用 Promise? 示例 producer and consumer代码:

    import scala.concurrent.{ future, promise }
    val p = promise[T]
    val f = p.future
    
    
    val producer = future {
    val r = produceSomething()
    p success r
    continueDoingSomethingUnrelated()
    }
    val consumer = future {
    startDoingSomething()
    f onSuccess {
    case r => doSomethingWithResult()
    }
    }
    

is easy to read but do we really need to write like that? I tried to implement it only with Future and without Promise like this:

val f = future {
produceSomething()
}


val producer = future {
continueDoingSomethingUnrelated()
}


startDoingSomething()


val consumer = future {
f onSuccess {
case r => doSomethingWithResult()
}
}

这个例子和给定的例子有什么区别? 是什么使承诺成为必要?

16071 次浏览

The Promise and Future are complementary concepts. The Future is a value which will be retrieved, well, sometime in the future and you can do stuff with it when that event happens. It is, therefore, the read or out endpoint of a computation - it is something that you retrieve a value from.

A Promise is, by analogy, the writing side of the computation. You create a promise which is the place where you'll put the result of the computation and from that promise you get a future that will be used to read the result that was put into the promise. When you'll complete a Promise, either by failure or success, you will trigger all the behavior which was attached to the associated Future.

Regarding your first question, how can it be that for a promise p we have p.future == p. You can imagine this like a single-item buffer - a container which is initially empty and you can afterwords store one value which will become its content forever. Now, depending on your point of view this is both a Promise and a Future. It is promise for someone who intends to write the value in the buffer. It is a future for someone who waits for that value to be put in the buffer.

Specifically, for the Scala concurrent API, if you take a look at the Promise trait in here you can see how the methods from the Promise companion object are implemented :

object Promise {


/** Creates a promise object which can be completed with a value.
*
*  @tparam T       the type of the value in the promise
*  @return         the newly created `Promise` object
*/
def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()


/** Creates an already completed Promise with the specified exception.
*
*  @tparam T       the type of the value in the promise
*  @return         the newly created `Promise` object
*/
def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception))


/** Creates an already completed Promise with the specified result.
*
*  @tparam T       the type of the value in the promise
*  @return         the newly created `Promise` object
*/
def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Success(result))


}

Now, those implementation of promises, DefaultPromise and KeptPromise can be found here. They both extend a base little trait which happens to have the same name, but it is located in a different package:

private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
def future: this.type = this
}

So you can see what they mean by p.future == p.

DefaultPromise is the buffer I was referring above, while KeptPromise is a buffer with the value put in from its very creation.

Regarding your example, the future block you use there actually creates a promise behind the scenes. Let's look at the definition of future in here :

def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)

By following the chain of methods you end up in the impl.Future:

private[concurrent] object Future {
class PromiseCompletingRunnable[T](body: => T) extends Runnable {
val promise = new Promise.DefaultPromise[T]()


override def run() = {
promise complete {
try Success(body) catch { case NonFatal(e) => Failure(e) }
}
}
}


def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
val runnable = new PromiseCompletingRunnable(body)
executor.execute(runnable)
runnable.promise.future
}
}

So, as you can see, the result you get from your producer block gets poured into a promise.

LATER EDIT:

Regarding the real-world use: Most of the time you won't deal with promises directly. If you'll use a library which performs asynchronous computation then you'll just work with the futures returned by the library's methods. Promises are, in this case, created by the library - you're just working with the reading end of what those methods do.

But if you need to implement your own asynchronous API you'll have to start working with them. Suppose you need to implement an async HTTP client on top of, lets say, Netty. Then your code will look somewhat like this

    def makeHTTPCall(request: Request): Future[Response] = {
val p = Promise[Response]
registerOnCompleteCallback(buffer => {
val response = makeResponse(buffer)
p success response
})
p.future
}