Skip to content

Spark Stream: Multi Query

Note

If you are looking for writing the same dataset to many different sinks, you should consider the foreachBatch sink.

val filesWithNumbers = sparkSession.readStream.text(s"${basePath}/data").as[Int]

val multipliedBy2 = filesWithNumbers.map(nr => nr * 2)
val multipliedBy2Writer = multipliedBy2.writeStream.format("json")
    .option("path", s"${basePath}/output/sink-1")
    .option("checkpointLocation", s"${basePath}/checkpoint/sink-1")
    .start()

val multipliedBy3 = filesWithNumbers.map(nr => nr * 3)
val multipliedBy3Writer = multipliedBy3.writeStream.format("json")
    .option("path", s"${basePath}/output/sink-2")
    .option("checkpointLocation", s"${basePath}/checkpoint/sink-2")
    .start()

sparkSession.streams.awaitAnyTermination()