Your browser doesn't support the features required by impress.js, so you are presented with a simplified version of this presentation.

For the best experience please use the latest Chrome, Safari or Firefox browser.

Introducing
Reactive Streams

Who am I

Christopher Hunt

@huntchr

Reactive

Reactive Streams

Reactive Streams - status

Oracle, Pivotal, Netflix

spray.io, Twitter, Redhat, Typesafe

SUNY Oswego, Applied Duality, Kaazing Corp

Reactive Streams - scope

onError | (onSubscribe onNext* (onError | onComplete)?

Akka Streams

Fundamental Types

map, filter, grouped, drop, ...

Real World Use-cases

Receive HTTP formdata with a file, write to disk

Receive Http Stream


val f = new FileOutputStream("/tmp/a.tmp")
byteSource.
  foreach(bs => f.write(bs.toArray)).
  andThen {
    case _ => f.close()
  }
        

Real World Use-cases

Receive Http Stream


0000: POST /bundles HTTP/1.1
...
0000: --------------------------4ab597ac920f42ce
002c: Content-Disposition: form-data; name="nrOfCpus"
005d: 
005f: 2
0062: --------------------------4ab597ac920f42ce
008e: Content-Disposition: form-data; name="memory"
...
0199: --------------------------4ab597ac920f42ce
01c5: Content-Disposition: form-data; name="bundle"; filename="sbt-rea
0205: ctive-runtime-tester-1.0.0-a8c92f2f2c7902d2b76b818ada2526f7db7e8
0245: c25a9c7fb6ee7550965943fc98b.tgz"
        

Receive Http Stream


multiPartFormData <- Unmarshal(entity).to[FormData]
// => Source[BodyPart]
...
multiPartFormData.parts
  .prefixAndTail(nrRequirementParts)
  .runWith(Sink.head)
  // => Future[(Seq[BodyPart], Source[BodyPart])]
  .flatMap {
    case (requirementParts, otherParts) =>
      val requirementPartsMappings = requirementParts.map { 
        requirementPart =>
        ...
      }
      ...
  }
        

Receive Http Stream - Map


def bundleAndConfiguration(
  bodyParts: Source[FormData.BodyPart]
  ): Source[_ <: VerifiableSource] =
  bodyParts.collect {
    case part @ FormData.BodyPart(name, entity, _, _) 
      if name == "bundle" && part.filename.isDefined =>
        val filename = part.filename.get
        BundleSource(digestFrom(filename), entity.dataBytes)
...
}
...
case class BundleSource(
  digest: Digest, 
  source: Source[ByteString]) extends VerifiableSource
        

Re-publish Http Stream


val fileBodyPartsFlow = Flow[VerifiableSource]
  .map(fileBodyParts)
  .to(Sink.publisher[FormData.BodyPart])
  
val graph =
  FlowGraph { implicit builder =>
    val broadcast = Broadcast[VerifiableSource]
    request.sources ~> broadcast
    broadcast ~> bundlePublishedFlow
    broadcast ~> fileBodyPartsFlow
  }
  
val fileBodyPartsPub = 
  graph.run().get(Sink.publisher[FormData.BodyPart])
Marshal(FormData(Source(fileBodyPartsPub))).to[HttpResponse]
        

Verify Stream & Write


val digestSink = Sink.fold(messageDigest)(updateDigest)

val graph = FlowGraph { implicit builder =>
  val broadcast = Broadcast[ByteString]
  verifiableSource.source ~> broadcast
  broadcast ~> digestSink
  broadcast ~> writeSink
}

val materializedGraph = graph.run()

val isDigestValid = materializedGraph.get(digestSink)
  .map(isValidDigest)
        

Making you look awesome

Trifecta (ish)

Follow me


christopher hunt software


@huntchr


github.com/typesafehub