Process streaming data
- data stream = Any data source that grows over time
- New files landing in cloud storage
- Updates to a database captured in a CDC feed
- Events queued in a pub/sub messaging feed
Spark structured streaming
- Input streaming table can be:
- can be directory of table
- messaging system like Kafka
- delta table
- must use pyspark to create stream before using SQL
2 approaches to process data stream:
- Reprocess the entire source dataset each time (full refresh)
- Only process those new data added since last update with custom logic (incremental refresh; structured streaming)
DataStreamReader - readStream()
- spark.readStream() to query delta table as a stream source into a streamDF
- This allows process all of the data present in the table as well as new data that arrive later
- this creates a streaming data frame on which we can apply any transformation as if it were just a static table.
- readStream doesn’t activate the stream until query/ writeout the stream
streamDF = spark.readStream.table("Input_Table")
.createOrReplaceTempView("<streaming_view_name>")
//optional but good to have to allow us view the stream
Tips / Intuition
A streaming query will executes infinitely, you can monitor the live streaming performance with the dashboard (e.g. batch duration, aggregation state, Input rate..)
DataStreamWriter - writeStream()
- write streamDF out to a durable storage to persist the data
- structured streaming assumes data is only being appended in the upstream. Once the table is updated/ overwritten, it is no longer valid for streaming
//if the intermitent tranformation with SQL is involved.. transform into DF first
streamDF.table("tmp_transformed_vw")
.writeStream......
.awaitTermination() //prevent execution of other cells until the increamental batch's write has succeeded.
//otherwise
streamDF.writeStream
.trigger(processingTime="2 minutes") //check every 2mins if new data arrive)
.outputMode("append") //append to target table
.option("checkpointLocation", "/path") //for tracking the stream processing
.table("Output_Table") //output table
- trigger()
- once ⇒ all data at once; availableNow ⇒ multiple micro-batches continuously until no more data received
- once ⇒ all data at once; availableNow ⇒ multiple micro-batches continuously until no more data received
- outputMode()
- checkpoint()
- by storing the current state of your streaming job to cloud storage
- Cannot by shared between several stream = separate location
2 guarantees using Spark structured streaming
- Fault tolerance with Checkpointing + Write-ahead logs
- record the offset range of data being processed during each trigger interval.
- Exactly-once guarantee
- Idempotent sinks
Unsupported Operation by streaming DF
- sorting
- deduplication
- can be overcome by
- windowing
- watermarking
To ensure no active streams are running, we can execute the following loops:
for s in spark.streams.active:
print("Stopping stream: " + s.id)
s.stop()
s.awaitTermination()