Labels

Thursday, December 27, 2018

Spark Structured Streaming Examples


 Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Read Stream :
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("group.id", "mapr").option("subscribe", "/tmp/kafka:test")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", false)
.option("maxOffsetsPerTrigger", 1000).load()

Write Stream :
val query = df.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("checkpointLocation", "/tmp/checkpoint")
.outputMode("append").format("console").start()

Note : Not required to provide any values in kafka.bootstrap.servers


Example to read and write to MapR Streams:

val topicsSrc = "/tmp/kafka:test"
val topicsDst = "/tmp/kafka1:test"
val checkPointLocation = "/tmp/checkpoint"

//Read from stream
val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", topicsSrc).option("startingOffsets", "earliest").load()

//Write toStream
val query1 = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic",topicsDst).option("checkpointLocation", checkPointLocation).start()



No comments:

Post a Comment