Process streaming data

  • data stream = Any data source that grows over time
  • New files landing in cloud storage
  • Updates to a database captured in a CDC feed
  • Events queued in a pub/sub messaging feed

Spark structured streaming

  • Input streaming table can be:
    • can be directory of table
    • messaging system like Kafka
    • delta table
  • must use pyspark to create stream before using SQL

2 approaches to process data stream:

  • Reprocess the entire source dataset each time (full refresh)
  • Only process those new data added since last update with custom logic (incremental refresh; structured streaming)

DataStreamReader - readStream()

  • spark.readStream() to query delta table as a stream source into a streamDF
  • This allows process all of the data present in the table as well as new data that arrive later
  • this creates a streaming data frame on which we can apply any transformation as if it were just a static table.
  • readStream doesn’t activate the stream until query/ writeout the stream
streamDF = spark.readStream.table("Input_Table")
		.createOrReplaceTempView("<streaming_view_name>") 
			//optional but good to have to allow us view the stream

Tips / Intuition

A streaming query will executes infinitely, you can monitor the live streaming performance with the dashboard (e.g. batch duration, aggregation state, Input rate..)

DataStreamWriter - writeStream()

  • write streamDF out to a durable storage to persist the data
  • structured streaming assumes data is only being appended in the upstream. Once the table is updated/ overwritten, it is no longer valid for streaming
//if the intermitent tranformation with SQL is involved.. transform into DF first
streamDF.table("tmp_transformed_vw")
	.writeStream......
	.awaitTermination() //prevent execution of other cells until the increamental batch's write has succeeded.

//otherwise
streamDF.writeStream
	.trigger(processingTime="2 minutes") //check every 2mins if new data arrive)
	.outputMode("append") //append to target table
	.option("checkpointLocation", "/path")  //for tracking the stream processing
	.table("Output_Table") //output table
  • trigger()
    • once all data at once; availableNow multiple micro-batches continuously until no more data received
  • outputMode()
  • checkpoint()
    • by storing the current state of your streaming job to cloud storage
    • Cannot by shared between several stream = separate location
2 guarantees using Spark structured streaming
  1. Fault tolerance with Checkpointing + Write-ahead logs
    • record the offset range of data being processed during each trigger interval.
  2. Exactly-once guarantee
    • Idempotent sinks
Unsupported Operation by streaming DF
  • sorting
  • deduplication
  • can be overcome by
    • windowing
    • watermarking

To ensure no active streams are running, we can execute the following loops:

for s in spark.streams.active:

    print("Stopping stream: " + s.id)

    s.stop()

    s.awaitTermination()