Streams: handle big data with laziness
TL;DR
Lazy data structures are a great tool to deal with big quantities of data (think Gigabytes or more). Here I show the basics of streams through Scala and then I do a short dive into how the Akka community implements streams.
The Problem
It will happen at some point in your hacking career to face a quantity of data that your computer cannot fit in its memory. Imagine of a terabyte-long CSV file that contains data about interesting events happened today: what if you want to find only the ones nearby your location?
The theory
The complexity of this problem hides in the fact that you cannot just load the data in memory.
Programming languages usually offer something called "iterator" in
their standard libraries to deal with this kind of problem. Iterators
provide a function called next
to get the next element in a
sequential collection. This interface abstracts over a lot of use
cases: for example you can call next
on a string and it will return
the next character or you could call next on a google search and it
would return you the next bunch of links related to your query.
This approach works because you load in memory just the current element and use the iterator to access the next. The iterator will fail when you reached the end of the data.
Another approach is to use a description of the data. This means that instead of taking a csv file and try to load it, I want to just say: this data structure represents a file, I need to apply some operations to it, apply them at the last possible moment and, to save effort, only to the parts I actually need.
Functional programmers call postponing evaluation "lazy evaluation". Think of lazy evaluation as the following:
val normalEvaluation = "someResult" val lazyEvaluation = () => "someResult"
normalEvaluation: String = "someResult" lazyEvaluation: () => String = ammonite.$sess.cmd15$$$Lambda$2276/0x0000000840a3b040@13e76a91
You can see that lazyEvaluation
requires you to run it later to get the result, like this:
lazyEvaluation()
res16: String = "someResult"
This is so important that Scala provides this as a feature of the language:
lazy val lazyEvaluation = "someResult"
lazyEvaluation: String = <lazy>
With this concept we have all we need to describe computation over data structures that are not concrete yet:
val lazyEvaluation = () => "someResult" val someLazyEvaluationOverTheDescription = () => lazyEvaluation().toUpperCase
lazyEvaluation: () => String = ammonite.$sess.cmd18$$$Lambda$2285/0x0000000840a3ec40@1a0b51e7 someLazyEvaluationOverTheDescription: () => String = ammonite.$sess.cmd18$$$Lambda$2286/0x0000000840a3f840@c197f46
As you can see above, someLazyEvaluationOverTheDescription
does not
make the value concrete because there was no function application yet.
Essentially, I have changed the description of a computation.
You will believe me if I say that this same concept applies to data structures (i.e., our huge-big-dataish csv file).
A data structure that uses this concept in Scala is Stream
.
println("\nList evaluation") 1.to(3).toList.map(i => {println(i); i}).map(_ * 5).map(println) println("\nStream evaluation") 1.to(3).toStream.map(i => {println(i); i}).map(_ * 5).map(println)
List evaluation 1 2 3 5 10 15 Stream evaluation 1 5 2 10 3 15 res19_1: List[Unit] = List((), (), ()) res19_3: Stream[Unit] = Stream((), (), ())
Can you spot the difference? Look at the order of the output above.
One is looping the list twice and the other it is looping it only
once. Stream
acts lazily! The side effect of this laziness is that
while calling map
twice on a list requires two loops, the Stream
lazy behaviour allows you to merge the map
in a single loop! The
functional community calls this "fusion".
I forced the evaluation of the Stream
because I called println
calls to show you the difference: if I did not do that, the map
would not have been run at all.
By avoiding unnecessary work, the laziness of this data structure lets you handle as much data as you like.
A solution with Akka Streams
The Akka community offers a variant of the Stream
laziness mechanism
based on the actor model. Let's skip the details and let me show you
how to load the csv file with that (you can still do the same with the
basic Scala's Stream
, but in next session I will show how Akka gives
you more control on the flow of big amount of data).
First a comma-separated csv file:
Username,Identifier,First name,Last name booker12,9012,Rachel,Booker grey07,2070,Laura,Grey johnson81,4081,Craig,Johnson jenkins46,9346,Mary,Jenkins smith79,5079,Jamie Smith
Then the code we need to have an Akka Stream
take care of our csv:
import $ivy.`com.typesafe.akka::akka-stream:2.6.9` import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream._ import akka.stream.scaladsl._ import akka.util.ByteString import java.nio.file.Paths implicit val actorSystem = ActorSystem() import actorSystem.dispatcher implicit val flowMaterializer = ActorMaterializer() FileIO.fromPath(Paths.get("test.csv")) .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)) .runForeach(println)
import $ivy.$ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream._ import akka.stream.scaladsl._ import akka.util.ByteString import java.nio.file.Paths actorSystem: ActorSystem = akka://default import actorSystem.dispatcher flowMaterializer: ActorMaterializer = PhasedFusingActorMaterializer( akka://default, ActorMaterializerSettings(4,16,akka.actor.default-dispatcher,<function1>,StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,1000,false,true,IoSettings(16384)), Attributes( List( InputBuffer(4, 16), CancellationStrategy(PropagateFailure), akka.stream.Attributes$NestedMaterializationCancellationPolicy@33602729, Dispatcher("akka.actor.default-dispatcher"), SupervisionStrategy(<function1>), DebugLogging(false), StreamSubscriptionTimeout(5000 milliseconds, CancelTermination), OutputBurstLimit(1000), FuzzingMode(false), MaxFixedBufferSize(1000000000), SyncProcessingLimit(1000) ) ), akka.dispatch.Dispatchers@1e686da2, Actor[akka://default/system/Materializers/StreamSupervisor-3#-1429691641], false, akka.stream.impl.SeqActorNameImpl@3ce17e4 ) res15_10: concurrent.Future[akka.Done] = Future(<not completed>) Username,Identifier,First name,Last name booker12,9012,Rachel,Booker grey07,2070,Laura,Grey johnson81,4081,Craig,Johnson jenkins46,9346,Mary,Jenkins smith79,5079,Jamie Smith
Again using runForeach
I force evaluation of the stream. Until that
point nothing has happened: everything is just a description of a
computation over data.
By playing around with this basic example you are already well equipped for handling huge data.
A bit more of Akka Streams
In the Akka's code I showed in the previous section there are three concepts I missed to explain:
FileIO.fromPath
is aSource
of dataFraming.delimiter(ByteString("\n"), 256, true).map(_.utf8String)
is aFlow
applied to aSource
(i.e., a data transformation that produces a newSource
).runForeach(println)
is aSink
and materializer of theStream
(i.e., it consumes theSource
and makes concrete the description of the computation).
Essentially Akka divides a Stream
into a Source
and a Sink
, also
known as producer and consumer. A good reason for this is the
processing speed of sources and sinks: when you are handling a lot of
data, there will always be a bottleneck. If the Source
is the
bottleneck, the Sink
will have to wait on the Source
to serve more
data; if the Sink
is the bottleneck, we risk to break it by
overflowing it of data; finally if our Flow
is the bottleneck we
will have both the Sink
waiting and the Source
overflowing our
Flow
. Luckily, Akka takes care of this for us because it protects
these fundamental parts of its Stream
with its backpressure feature.
It basically has an mechanism to signal Source
and Flow
to slow
down when Flow
and Sink
respectively cannot cope with the
pressure.
Let's see a final code example about this:
import $ivy.`com.typesafe.akka::akka-stream:2.6.9` import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream._ import akka.stream.scaladsl._ import akka.util.ByteString import java.nio.file.Paths import scala.util.Random import scala.concurrent.Await import scala.concurrent.duration._ implicit val actorSystem = ActorSystem() import actorSystem.dispatcher implicit val flowMaterializer = ActorMaterializer() val source = Source(1.to(100).map(Random.nextString(_))) val sink = Sink.foreach[String](println(_)) val nullSink = Sink.ignore def graph(src: Source[String, Any], snk: Sink[String,scala.concurrent.Future[akka.Done]]) = src.toMat(snk)(Keep.right) val slowSource = Source(1.to(100).map(x => {Thread.sleep(100); Random.nextString(x)})) val slowSink = Sink.foreach[String](x => Thread.sleep(100)) def time[R](block: => R): R = { val t0 = System.nanoTime() val result = block // call-by-name val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) + "ns, ie, " + ((t1 - t0) / 1000000000) + "s") result } time {Await.result(graph(source, nullSink).run(), 100.seconds)} time {Await.result(graph(slowSource, nullSink).run(), 100.seconds)} time {Await.result(graph(source, slowSink).run(), 100.seconds)} time {Await.result(graph(slowSource, slowSink).run(), 100.seconds)}
In the above I use the function graph
to create a Stream
, or
really in Akka terms a Graph
, that describes a computation.
Then I run this function with different sources and sinks to demonstrate that the backpressure mechanism takes care of fast sources and sinks for me, making sure that I do not overflow the sink.
Feel free to run this scripts in Ammonite to see how safe Akka keeps you while you play around with streams.
Conclusion
So to summarize: learn laziness! Through the concept of delaying evaluation, you can operate on descriptions of computation (skipping worthless work!!). A side effect of this are lazy data structures, and so streams. Once you get the basics right, working with Akka is simple. Akka Streams are just streams specialized at serving big data most common use cases.