Oracle, Pivotal, Netflix
spray.io, Twitter, Redhat, Typesafe
SUNY Oswego, Applied Duality, Kaazing Corp
onError | (onSubscribe onNext* (onError | onComplete)?
map, filter, grouped, drop, ...
Receive HTTP formdata with a file, write to disk
val f = new FileOutputStream("/tmp/a.tmp")
byteSource.
foreach(bs => f.write(bs.toArray)).
andThen {
case _ => f.close()
}
0000: POST /bundles HTTP/1.1
...
0000: --------------------------4ab597ac920f42ce
002c: Content-Disposition: form-data; name="nrOfCpus"
005d:
005f: 2
0062: --------------------------4ab597ac920f42ce
008e: Content-Disposition: form-data; name="memory"
...
0199: --------------------------4ab597ac920f42ce
01c5: Content-Disposition: form-data; name="bundle"; filename="sbt-rea
0205: ctive-runtime-tester-1.0.0-a8c92f2f2c7902d2b76b818ada2526f7db7e8
0245: c25a9c7fb6ee7550965943fc98b.tgz"
multiPartFormData <- Unmarshal(entity).to[FormData]
// => Source[BodyPart]
...
multiPartFormData.parts
.prefixAndTail(nrRequirementParts)
.runWith(Sink.head)
// => Future[(Seq[BodyPart], Source[BodyPart])]
.flatMap {
case (requirementParts, otherParts) =>
val requirementPartsMappings = requirementParts.map {
requirementPart =>
...
}
...
}
def bundleAndConfiguration(
bodyParts: Source[FormData.BodyPart]
): Source[_ <: VerifiableSource] =
bodyParts.collect {
case part @ FormData.BodyPart(name, entity, _, _)
if name == "bundle" && part.filename.isDefined =>
val filename = part.filename.get
BundleSource(digestFrom(filename), entity.dataBytes)
...
}
...
case class BundleSource(
digest: Digest,
source: Source[ByteString]) extends VerifiableSource
val fileBodyPartsFlow = Flow[VerifiableSource]
.map(fileBodyParts)
.to(Sink.publisher[FormData.BodyPart])
val graph =
FlowGraph { implicit builder =>
val broadcast = Broadcast[VerifiableSource]
request.sources ~> broadcast
broadcast ~> bundlePublishedFlow
broadcast ~> fileBodyPartsFlow
}
val fileBodyPartsPub =
graph.run().get(Sink.publisher[FormData.BodyPart])
Marshal(FormData(Source(fileBodyPartsPub))).to[HttpResponse]
val digestSink = Sink.fold(messageDigest)(updateDigest)
val graph = FlowGraph { implicit builder =>
val broadcast = Broadcast[ByteString]
verifiableSource.source ~> broadcast
broadcast ~> digestSink
broadcast ~> writeSink
}
val materializedGraph = graph.run()
val isDigestValid = materializedGraph.get(digestSink)
.map(isValidDigest)