# Where parallels cross

Interesting bits of life

# Streams: handle big data with laziness

## TL;DR

Lazy data structures are a great tool to deal with big quantities of data (think Gigabytes or more). Here I show the basics of streams through Scala and then I do a short dive into how the Akka community implements streams.

## The Problem

It will happen at some point in your hacking career to face a quantity of data that your computer cannot fit in its memory. Imagine of a terabyte-long CSV file that contains data about interesting events happened today: what if you want to find only the ones nearby your location?

## The theory

The complexity of this problem hides in the fact that you cannot just load the data in memory.

Programming languages usually offer something called "iterator" in their standard libraries to deal with this kind of problem. Iterators provide a function called next to get the next element in a sequential collection. This interface abstracts over a lot of use cases: for example you can call next on a string and it will return the next character or you could call next on a google search and it would return you the next bunch of links related to your query.

This approach works because you load in memory just the current element and use the iterator to access the next. The iterator will fail when you reached the end of the data.

Another approach is to use a description of the data. This means that instead of taking a csv file and try to load it, I want to just say: this data structure represents a file, I need to apply some operations to it, apply them at the last possible moment and, to save effort, only to the parts I actually need.

Functional programmers call postponing evaluation "lazy evaluation". Think of lazy evaluation as the following:

val normalEvaluation = "someResult"
val lazyEvaluation = () => "someResult"

normalEvaluation: String = "someResult"
lazyEvaluation: () => String = ammonite.$sess.cmd15$$Lambda2276/0x0000000840a3b040@13e76a91  You can see that lazyEvaluation requires you to run it later to get the result, like this: lazyEvaluation()  res16: String = "someResult"  This is so important that Scala provides this as a feature of the language: lazy val lazyEvaluation = "someResult"  lazyEvaluation: String = <lazy>  With this concept we have all we need to describe computation over data structures that are not concrete yet: val lazyEvaluation = () => "someResult" val someLazyEvaluationOverTheDescription = () => lazyEvaluation().toUpperCase  lazyEvaluation: () => String = ammonite.sess.cmd18$$$Lambda$2285/0x0000000840a3ec40@1a0b51e7 someLazyEvaluationOverTheDescription: () => String = ammonite.$sess.cmd18$Lambda$2286/0x0000000840a3f840@c197f46


As you can see above, someLazyEvaluationOverTheDescription does not make the value concrete because there was no function application yet. Essentially, I have changed the description of a computation.

You will believe me if I say that this same concept applies to data structures (i.e., our huge-big-dataish csv file).

A data structure that uses this concept in Scala is Stream.

println("\nList evaluation")
1.to(3).toList.map(i => {println(i); i}).map(_ * 5).map(println)
println("\nStream evaluation")
1.to(3).toStream.map(i => {println(i); i}).map(_ * 5).map(println)

List evaluation
1
2
3
5
10
15

Stream evaluation
1
5
2
10
3
15
res19_1: List[Unit] = List((), (), ())
res19_3: Stream[Unit] = Stream((), (), ())


Can you spot the difference? Look at the order of the output above. One is looping the list twice and the other it is looping it only once. Stream acts lazily! The side effect of this laziness is that while calling map twice on a list requires two loops, the Stream lazy behaviour allows you to merge the map in a single loop! The functional community calls this "fusion".

I forced the evaluation of the Stream because I called println calls to show you the difference: if I did not do that, the map would not have been run at all.

By avoiding unnecessary work, the laziness of this data structure lets you handle as much data as you like.

## A solution with Akka Streams

The Akka community offers a variant of the Stream laziness mechanism based on the actor model. Let's skip the details and let me show you how to load the csv file with that (you can still do the same with the basic Scala's Stream, but in next session I will show how Akka gives you more control on the flow of big amount of data).

First a comma-separated csv file:

Username,Identifier,First name,Last name
booker12,9012,Rachel,Booker
grey07,2070,Laura,Grey
johnson81,4081,Craig,Johnson
jenkins46,9346,Mary,Jenkins
smith79,5079,Jamie Smith


Then the code we need to have an Akka Stream take care of our csv:

import $ivy.com.typesafe.akka::akka-stream:2.6.9 import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream._ import akka.stream.scaladsl._ import akka.util.ByteString import java.nio.file.Paths implicit val actorSystem = ActorSystem() import actorSystem.dispatcher implicit val flowMaterializer = ActorMaterializer() FileIO.fromPath(Paths.get("test.csv")) .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)) .runForeach(println)  import$ivy.$import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream._ import akka.stream.scaladsl._ import akka.util.ByteString import java.nio.file.Paths actorSystem: ActorSystem = akka://default import actorSystem.dispatcher flowMaterializer: ActorMaterializer = PhasedFusingActorMaterializer( akka://default, ActorMaterializerSettings(4,16,akka.actor.default-dispatcher,<function1>,StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,1000,false,true,IoSettings(16384)), Attributes( List( InputBuffer(4, 16), CancellationStrategy(PropagateFailure), akka.stream.Attributes$NestedMaterializationCancellationPolicy@33602729,
Dispatcher("akka.actor.default-dispatcher"),
SupervisionStrategy(<function1>),
DebugLogging(false),
StreamSubscriptionTimeout(5000 milliseconds, CancelTermination),
OutputBurstLimit(1000),
FuzzingMode(false),
MaxFixedBufferSize(1000000000),
SyncProcessingLimit(1000)
)
),
akka.dispatch.Dispatchers@1e686da2,
Actor[akka://default/system/Materializers/StreamSupervisor-3#-1429691641],
false,
akka.stream.impl.SeqActorNameImpl@3ce17e4
)
res15_10: concurrent.Future[akka.Done] = Future(<not completed>)

booker12,9012,Rachel,Booker
grey07,2070,Laura,Grey
johnson81,4081,Craig,Johnson
jenkins46,9346,Mary,Jenkins
smith79,5079,Jamie Smith


Again using runForeach I force evaluation of the stream. Until that point nothing has happened: everything is just a description of a computation over data.

By playing around with this basic example you are already well equipped for handling huge data.

## A bit more of Akka Streams

In the Akka's code I showed in the previous section there are three concepts I missed to explain:

1. FileIO.fromPath is a Source of data
2. Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String) is a Flow applied to a Source (i.e., a data transformation that produces a new Source)
3. .runForeach(println) is a Sink and materializer of the Stream (i.e., it consumes the Source and makes concrete the description of the computation).

Essentially Akka divides a Stream into a Source and a Sink, also known as producer and consumer. A good reason for this is the processing speed of sources and sinks: when you are handling a lot of data, there will always be a bottleneck. If the Source is the bottleneck, the Sink will have to wait on the Source to serve more data; if the Sink is the bottleneck, we risk to break it by overflowing it of data; finally if our Flow is the bottleneck we will have both the Sink waiting and the Source overflowing our Flow. Luckily, Akka takes care of this for us because it protects these fundamental parts of its Stream with its backpressure feature. It basically has an mechanism to signal Source and Flow to slow down when Flow and Sink respectively cannot cope with the pressure.

import \$ivy.com.typesafe.akka::akka-stream:2.6.9

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream._
import akka.util.ByteString
import java.nio.file.Paths
import scala.util.Random
import scala.concurrent.Await
import scala.concurrent.duration._

implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
implicit val flowMaterializer = ActorMaterializer()

val source = Source(1.to(100).map(Random.nextString(_)))
val sink = Sink.foreach[String](println(_))
val nullSink = Sink.ignore
def graph(src: Source[String, Any], snk: Sink[String,scala.concurrent.Future[akka.Done]]) = src.toMat(snk)(Keep.right)

val slowSource = Source(1.to(100).map(x => {Thread.sleep(100); Random.nextString(x)}))

val slowSink = Sink.foreach[String](x => Thread.sleep(100))

def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block    // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns, ie, " + ((t1 - t0) / 1000000000) + "s")
result
}

time {Await.result(graph(source, nullSink).run(), 100.seconds)}

time {Await.result(graph(slowSource, nullSink).run(), 100.seconds)}

time {Await.result(graph(source, slowSink).run(), 100.seconds)}

time {Await.result(graph(slowSource, slowSink).run(), 100.seconds)}


In the above I use the function graph to create a Stream, or really in Akka terms a Graph, that describes a computation.

Then I run this function with different sources and sinks to demonstrate that the backpressure mechanism takes care of fast sources and sinks for me, making sure that I do not overflow the sink.

Feel free to run this scripts in Ammonite to see how safe Akka keeps you while you play around with streams.

## Conclusion

So to summarize: learn laziness! Through the concept of delaying evaluation, you can operate on descriptions of computation (skipping worthless work!!). A side effect of this are lazy data structures, and so streams. Once you get the basics right, working with Akka is simple. Akka Streams are just streams specialized at serving big data most common use cases.