Scala futures allow you to easily parallelize operations, by creating a series of tasks and running them in a thread pool.
Lets take a simple example. We’ve defined two functions that require some I/O: a function that retrieves information about the SSL certificate chain for a website, and a function that publishes that to a database (Solr).
Both of these require I/O, and can take some time, so we’d like to run tasks in parallel.
def saveToSolr(data: List[Map[String, Any]]) {
...
}
def getSSLCertificates(domain: String):
List[Map[String, Any]] {
}
To really appreciate this problem we need a large list of domains (say, 1,000,000). If we’ve obtained a list as a CSV file, we can read these all in:
import com.github.tototoshi.csv._
val reader = CSVReader.open(csvFile)
val domains =
reader.iterator.map(
(x: Seq[String]) => {
x(1).trim
)
We can create a collection of futures like so:
val tasks =
for (domain <- domains) yield Future {
solr(ssl(domain))
}
Now, we could also have easily parallelized the operation with "par", so you may be asking why we'd want the more verbose futures:
domains.par.map(
solr(ssl(_))
)
The value of futures is that you can do map/reduce type things on them, and the follow-on operations will get put back into the thread pool:
val tasks =
for (domain <- domains) yield Future {
ssl(domain)
}.map(solr)
Note that at each step we're creating functions/objects that wrap the previous step. Because each step can stream it's output to the next, we should be allocating very little memory (i.e. we're not allocating new objects will a million items).
We also haven't started any threads, and nothing we've written so far will run any of our code.
To do that, we can aggregate the sequence into a single future, and run them all. One thing that confuses people here is that "Future.sequence" does not imply sequential operation - it just means that it takes a collection ("sequence") of Futures and treats them as an individual future.
val aggr = Future.sequence(tasks)
Await.result(aggr, Duration.Inf)
This takes a "duration", so you can have it time out. In my testing it looked like this applied to the individual futures (not the entire thing). When we do "Await.result" that evaluates the entire structure we've built, so at this point all the code we've written now gets actually run.
At this point, you may be asking yourself whether running a million concurrent operations is a good idea - it isn't. If it were possible to run a utility like that at home, your ISP would almost immediately rate limit your DNS lookups. Fortunately, behind the scenes scala is using a thread pool, which limits to amount of concurrent activity to a reasonable level.
You can trivially override this thread pool with your own, which is really awesome because it lets you increase or decrease the rate that this application runs.
Before yu set up the futures, all you need to add is this:
implicit val ec = new ExecutionContext { val threadPool = Executors.newFixedThreadPool(50) def execute(runnable: Runnable) { threadPool.submit(runnable) } def reportFailure(t: Throwable) {} }
Because this is an implicit, it automatically gets passed to the futures when they are created (you can also do this manually if you don't trust it)
This also gives you a convenient error handling mechanism at the thread pool level (you can also set onError / onSuccess / onComplete callbacks on the individual futures).