Streams: handle big data with laziness
TL;DR
Lazy data structures are a great tool to deal with big quantities of data (think Gigabytes or more). Here I show the basics of streams through Scala and then I do a short dive into how the Akka community implements streams.
The Problem
It will happen at some point in your hacking career to face a quantity of data that your computer cannot fit in its memory. Imagine of a terabyte-long CSV file that contains data about interesting events happened today: what if you want to find only the ones nearby your location?
The theory
The complexity of this problem hides in the fact that you cannot just load the data in memory.
Programming languages usually offer something called "iterator" in
their standard libraries to deal with this kind of problem. Iterators
provide a function called next to get the next element in a
sequential collection. This interface abstracts over a lot of use
cases: for example you can call next on a string and it will return
the next character or you could call next on a google search and it
would return you the next bunch of links related to your query.
This approach works because you load in memory just the current element and use the iterator to access the next. The iterator will fail when you reached the end of the data.
Another approach is to use a description of the data. This means that instead of taking a csv file and try to load it, I want to just say: this data structure represents a file, I need to apply some operations to it, apply them at the last possible moment and, to save effort, only to the parts I actually need.
Functional programmers call postponing evaluation "lazy evaluation". Think of lazy evaluation as the following:
val normalEvaluation = "someResult" val lazyEvaluation = () => "someResult"
normalEvaluation: String = "someResult" lazyEvaluation: () => String = ammonite.$sess.cmd15$$$Lambda$2276/0x0000000840a3b040@13e76a91
You can see that lazyEvaluation requires you to run it later to get the result, like this:
lazyEvaluation()
res16: String = "someResult"
This is so important that Scala provides this as a feature of the language:
lazy val lazyEvaluation = "someResult"
lazyEvaluation: String = <lazy>
With this concept we have all we need to describe computation over data structures that are not concrete yet:
val lazyEvaluation = () => "someResult" val someLazyEvaluationOverTheDescription = () => lazyEvaluation().toUpperCase
lazyEvaluation: () => String = ammonite.$sess.cmd18$$$Lambda$2285/0x0000000840a3ec40@1a0b51e7 someLazyEvaluationOverTheDescription: () => String = ammonite.$sess.cmd18$$$Lambda$2286/0x0000000840a3f840@c197f46
As you can see above, someLazyEvaluationOverTheDescription does not
make the value concrete because there was no function application yet.
Essentially, I have changed the description of a computation.
You will believe me if I say that this same concept applies to data structures (i.e., our huge-big-dataish csv file).
A data structure that uses this concept in Scala is Stream.
println("\nList evaluation")
1.to(3).toList.map(i => {println(i); i}).map(_ * 5).map(println)
println("\nStream evaluation")
1.to(3).toStream.map(i => {println(i); i}).map(_ * 5).map(println)
List evaluation 1 2 3 5 10 15 Stream evaluation 1 5 2 10 3 15 res19_1: List[Unit] = List((), (), ()) res19_3: Stream[Unit] = Stream((), (), ())
Can you spot the difference? Look at the order of the output above.
One is looping the list twice and the other it is looping it only
once. Stream acts lazily! The side effect of this laziness is that
while calling map twice on a list requires two loops, the Stream
lazy behaviour allows you to merge the map in a single loop! The
functional community calls this "fusion".
I forced the evaluation of the Stream because I called println
calls to show you the difference: if I did not do that, the map
would not have been run at all.
By avoiding unnecessary work, the laziness of this data structure lets you handle as much data as you like.
A solution with Akka Streams
The Akka community offers a variant of the Stream laziness mechanism
based on the actor model. Let's skip the details and let me show you
how to load the csv file with that (you can still do the same with the
basic Scala's Stream, but in next session I will show how Akka gives
you more control on the flow of big amount of data).
First a comma-separated csv file:
Username,Identifier,First name,Last name booker12,9012,Rachel,Booker grey07,2070,Laura,Grey johnson81,4081,Craig,Johnson jenkins46,9346,Mary,Jenkins smith79,5079,Jamie Smith
Then the code we need to have an Akka Stream take care of our csv:
import $ivy.`com.typesafe.akka::akka-stream:2.6.9`
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
implicit val flowMaterializer = ActorMaterializer()
FileIO.fromPath(Paths.get("test.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
.runForeach(println)
import $ivy.$
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths
actorSystem: ActorSystem = akka://default
import actorSystem.dispatcher
flowMaterializer: ActorMaterializer = PhasedFusingActorMaterializer(
akka://default,
ActorMaterializerSettings(4,16,akka.actor.default-dispatcher,<function1>,StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,1000,false,true,IoSettings(16384)),
Attributes(
List(
InputBuffer(4, 16),
CancellationStrategy(PropagateFailure),
akka.stream.Attributes$NestedMaterializationCancellationPolicy@33602729,
Dispatcher("akka.actor.default-dispatcher"),
SupervisionStrategy(<function1>),
DebugLogging(false),
StreamSubscriptionTimeout(5000 milliseconds, CancelTermination),
OutputBurstLimit(1000),
FuzzingMode(false),
MaxFixedBufferSize(1000000000),
SyncProcessingLimit(1000)
)
),
akka.dispatch.Dispatchers@1e686da2,
Actor[akka://default/system/Materializers/StreamSupervisor-3#-1429691641],
false,
akka.stream.impl.SeqActorNameImpl@3ce17e4
)
res15_10: concurrent.Future[akka.Done] = Future(<not completed>)
Username,Identifier,First name,Last name
booker12,9012,Rachel,Booker
grey07,2070,Laura,Grey
johnson81,4081,Craig,Johnson
jenkins46,9346,Mary,Jenkins
smith79,5079,Jamie Smith
Again using runForeach I force evaluation of the stream. Until that
point nothing has happened: everything is just a description of a
computation over data.
By playing around with this basic example you are already well equipped for handling huge data.
A bit more of Akka Streams
In the Akka's code I showed in the previous section there are three concepts I missed to explain:
FileIO.fromPathis aSourceof dataFraming.delimiter(ByteString("\n"), 256, true).map(_.utf8String)is aFlowapplied to aSource(i.e., a data transformation that produces a newSource).runForeach(println)is aSinkand materializer of theStream(i.e., it consumes theSourceand makes concrete the description of the computation).
Essentially Akka divides a Stream into a Source and a Sink, also
known as producer and consumer. A good reason for this is the
processing speed of sources and sinks: when you are handling a lot of
data, there will always be a bottleneck. If the Source is the
bottleneck, the Sink will have to wait on the Source to serve more
data; if the Sink is the bottleneck, we risk to break it by
overflowing it of data; finally if our Flow is the bottleneck we
will have both the Sink waiting and the Source overflowing our
Flow. Luckily, Akka takes care of this for us because it protects
these fundamental parts of its Stream with its backpressure feature.
It basically has an mechanism to signal Source and Flow to slow
down when Flow and Sink respectively cannot cope with the
pressure.
Let's see a final code example about this:
import $ivy.`com.typesafe.akka::akka-stream:2.6.9`
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths
import scala.util.Random
import scala.concurrent.Await
import scala.concurrent.duration._
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
implicit val flowMaterializer = ActorMaterializer()
val source = Source(1.to(100).map(Random.nextString(_)))
val sink = Sink.foreach[String](println(_))
val nullSink = Sink.ignore
def graph(src: Source[String, Any], snk: Sink[String,scala.concurrent.Future[akka.Done]]) = src.toMat(snk)(Keep.right)
val slowSource = Source(1.to(100).map(x => {Thread.sleep(100); Random.nextString(x)}))
val slowSink = Sink.foreach[String](x => Thread.sleep(100))
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns, ie, " + ((t1 - t0) / 1000000000) + "s")
result
}
time {Await.result(graph(source, nullSink).run(), 100.seconds)}
time {Await.result(graph(slowSource, nullSink).run(), 100.seconds)}
time {Await.result(graph(source, slowSink).run(), 100.seconds)}
time {Await.result(graph(slowSource, slowSink).run(), 100.seconds)}
In the above I use the function graph to create a Stream, or
really in Akka terms a Graph, that describes a computation.
Then I run this function with different sources and sinks to demonstrate that the backpressure mechanism takes care of fast sources and sinks for me, making sure that I do not overflow the sink.
Feel free to run this scripts in Ammonite to see how safe Akka keeps you while you play around with streams.
Conclusion
So to summarize: learn laziness! Through the concept of delaying evaluation, you can operate on descriptions of computation (skipping worthless work!!). A side effect of this are lazy data structures, and so streams. Once you get the basics right, working with Akka is simple. Akka Streams are just streams specialized at serving big data most common use cases.