Scala Futures in Detail – Part 2

In the previous blog post of Scala Future in Detail – Part 1, we talked in details around synchronous and asynchronous execution. In this blog post, we are going to deepen our understanding around futures. Futures play a very important role in writing clean and crisp asynchronous code.

From the scala-docs

Futures provide a way to reason about performing many operations in parallel– in an efficient and non-blocking way. A Future is a placeholder object for a value that may not yet exist. Generally, the value of the Future is supplied concurrently and can subsequently be used. Composing concurrent tasks in this way tends to result in faster, asynchronous, non-blocking parallel code.

How do we create Futures

That is the first question that is going to come to anyone who is going to read this blogpost. Whenever we submit some runnable task to any ExecutorService, a future is returned in response to the statement.

Blank Diagram
Submitting a Task to the ExecutorService
Blank Diagram (2)
Caller gets the Future and the task is scheduled on the Thread
Blank Diagram (1)
Thread submits the result in the future container and Caller blocks on the results of the future

So whenever we submit a task to any ExecutorService, this task is picked up by one of threads of the ExecutorService. This threads executes the instructions enclosed within the task. We can think Future as just a container in which the results are put by the thread whenever the computation is done. You can toss this container from one function to another , but beware if you try to look into this container , you will get blocked till the results are placed within the future.

  • This code piece returns a Scala Future. This is created when we run a set of commands within some executionContext.
implicit val executorService = new ThreadPoolExecutor()
implicit val executionContext = ExecutionContext.fromExecutor(executorService)
val dbCallFut = Future {
    makeDBCallSync()
}
  • This code piece returns a Java Future. This is created when we submit a runnable to some executorService.
val executorService = new ThreadPoolExecutor()
val future = executorService.submit(new Runnable {
    override def run(): Unit = makeDBCallSync()
})

We have shown both the examples just to show that one should know the difference between Scala Futures and Java Futures, so that one can appreciate when to use what.

Whenever you are working in Scala, one should prefer using Scala Futures over Java Futures because of one of the most simplest and crucial advantages i.e. Code Readability.

One of the core differences between Scala Futures and Java Futures is that in Java Futures we can do almost no operations on the Future. We can just toss the Future around between functions and wait on the results of the Future. But with Scala, one can easily play around with the value enclosed in a future and perform various operations on the value enclosed which hasn’t been completed yet. Read this for more details

def prepareDrinkAsync(): Future[Drink] = {
  getMeAsync("soda")
    .flatMap(item => getMeAsync("whisky + " + item))
    .flatMap(item => getMeAsync("ice + " + item))
    .map(item => Drink(Seq(item)))
}

How are the operations on a Scala Future Implemented

Following questions popped in my head when i started using Scala Futures.

  • Do the .map or .flatMap operations happen on the same thread on which the task was submitted.
  • If the .map or .flatMap operation happened on a different thread, then how does the handOff happened.
  • And other similar questions

Turns out the answers are somewhat simpler. Lets understand this by taking the example for .map on a Future

def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
  val p = Promise[S]()
  onComplete { v => p complete (v map f) }
  p.future
}

From this code segment , we can make following assumptions

  • Map function on a Future A first creates a Promise and then registers this Promise inside the onComplete function of this Future A. This essentially means that whenever the computation for Future A is complete, this triggers function to be computed. The thread on which this function is computed happens on a different thread from a executorService executor, which is passed as an implicit down to the .MAP function
  • This Map function returns the Future which encloses the Computation which is defined by the function f, which happens after the result of the Future A is computed.

One more question which needs to be answered is that how do we link the computation of Future A to the Computation defined by f. For this, we need to look deeper in the onComplete function

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
  val preparedEC = executor.prepare()
  val runnable = new CallbackRunnable[T](preparedEC, func)
  dispatchOrAddCallback(runnable)
}
@tailrec
private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = {
  getState match {
    case r: Try[_]          => runnable.executeWithValue(r.asInstanceOf[Try[T]])
    case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable)
    case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable)
  }
}

In this code segment, we can see following things:

  • In onComplete function, we make a Runnable which encloses the function func. This func was supplied by the user originally in the .MAP function.
  • This runnable is then attached to the list of listeners waiting for the Future A to get complete. This Future A is the original future on which we applied the .map function
  • So when the Future A is completed, then all the listeners attached to the Promise enclosing the Future A are executed. These executions happen in their own thread which is defined by the executor passed at the time of the .map or .flatMap. For more details see this file

Till now , we do have a simple understanding “How does the simple operations like .map and implemented in a Scala Future”. Now let’s go one step further and understand how does a simple primitive like Futures work in Scala.

val executionContext = ExecutionContext.fromExecutor(executorService)
val dbCallFut = Future {
    makeDBCallSync()
}(executionContext)

Await.result(dbCallFut)

In this example, we submit a task to a ExecutionContext and then wait for the results of the computation in the Await.result

Submitting a task to a ExecutionContext simply means these steps

  • ExecutionContext encloses an executorService, so whenever we submit a task or runnable to executionContext it simply forwards the task or runnable to the executorService after doing some bookkeeping about states.
  • ExecutorService takes the task and then creates a new Worker Runnable. This worker Runnable encapsulates the original task.
  • After creating the worker, the ExecutorService starts this worker on a new thread. Or if the worker was already created, it gets the task from the task queue where the executorService must have submitted the task.
  • After the worker gets hold of the task , it then executes the task on the thread associated with the worker. See ThreadPoolExecutor for detailed info.

You can understand the execution of a runnable or task via this simple code segment

Await.result simply waits for the result to be available in the Promise

val l = new CompletionLatch[T]()
onComplete(l)(InternalCallbackExecutor)
l.tryAcquireSharedNanos(1, f.toNanos)
  • Calling Await.result creates a CompletionLatch
  • Tries to Acquire the Shared Latch but because of how the CompletionLatch has been implemented, it checks whether the result of the future has been completed. If yes, then it can acquire the latch, otherwise not.
  • Thread calling Await.result also attaches the CompletionLatch to the Promise Listeners, so whenever the Future is completed, it releases the Latch ( as per the implementation of the CompletionLatch ) which in turn unblocks the thread calling the Await.Result.

References

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.