Reactive Streams

Who am I

Christopher Hunt



Reactive Streams

Reactive Streams - status

Oracle, Pivotal, Netflix, Twitter, Redhat, Typesafe

SUNY Oswego, Applied Duality, Kaazing Corp

Reactive Streams - scope

onError | (onSubscribe onNext* (onError | onComplete)?

Akka Streams

Fundamental Types

map, filter, grouped, drop, ...

Real World Use-cases

Receive HTTP formdata with a file, write to disk

Receive Http Stream

val f = new FileOutputStream("/tmp/a.tmp")
  foreach(bs => f.write(bs.toArray)).
  andThen {
    case _ => f.close()

Real World Use-cases

Receive Http Stream

0000: POST /bundles HTTP/1.1
0000: --------------------------4ab597ac920f42ce
002c: Content-Disposition: form-data; name="nrOfCpus"
005f: 2
0062: --------------------------4ab597ac920f42ce
008e: Content-Disposition: form-data; name="memory"
0199: --------------------------4ab597ac920f42ce
01c5: Content-Disposition: form-data; name="bundle"; filename="sbt-rea
0205: ctive-runtime-tester-1.0.0-a8c92f2f2c7902d2b76b818ada2526f7db7e8
0245: c25a9c7fb6ee7550965943fc98b.tgz"

Receive Http Stream

multiPartFormData <- Unmarshal(entity).to[FormData]
// => Source[BodyPart]
  // => Future[(Seq[BodyPart], Source[BodyPart])]
  .flatMap {
    case (requirementParts, otherParts) =>
      val requirementPartsMappings = { 
        requirementPart =>

Receive Http Stream - Map

def bundleAndConfiguration(
  bodyParts: Source[FormData.BodyPart]
  ): Source[_ <: VerifiableSource] =
  bodyParts.collect {
    case part @ FormData.BodyPart(name, entity, _, _) 
      if name == "bundle" && part.filename.isDefined =>
        val filename = part.filename.get
        BundleSource(digestFrom(filename), entity.dataBytes)
case class BundleSource(
  digest: Digest, 
  source: Source[ByteString]) extends VerifiableSource

Re-publish Http Stream

val fileBodyPartsFlow = Flow[VerifiableSource]
val graph =
  FlowGraph { implicit builder =>
    val broadcast = Broadcast[VerifiableSource]
    request.sources ~> broadcast
    broadcast ~> bundlePublishedFlow
    broadcast ~> fileBodyPartsFlow
val fileBodyPartsPub =[FormData.BodyPart])

Verify Stream & Write

val digestSink = Sink.fold(messageDigest)(updateDigest)

val graph = FlowGraph { implicit builder =>
  val broadcast = Broadcast[ByteString]
  verifiableSource.source ~> broadcast
  broadcast ~> digestSink
  broadcast ~> writeSink

val materializedGraph =

val isDigestValid = materializedGraph.get(digestSink)

