Spark streaming and Calliope
Spark Streaming applications are similar to regular Spark applications. The central concept is DStreams which are a continuous sequence of RDDs.
You can persist the RDDs from DStream just as you would persist regular RDDs.
A quick swim
- Setup your Spark Streaming
Setup your spark streaming as you would regularly. Here is an example of setting up a socket streaming and processing it.
val ssc = new StreamingContext(args(0), "NetworkStreamToCassandra",
Seconds(10), System.getenv("SPARK_HOME"))
val lines = ssc.socketTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" "))
val wordCounts: DStream[(String, Int)] = words.map(x => (x, 1)).reduceByKey(_ + _)
- Prepare C* configuration
Prepare the C* configuration of where the processed data be saved.
val cas = CasBuilder.cql3.withColumnFamily("casdemo", "words")
.saveWithQuery("update casdemo.words set count = ?")
- Create the Marshallers
We then need marshallers to help Calliope understand how to write the RDD to the C*. This will need us to create two marshallers, one for creating a key and another for the row values.
implicit def keyMarshaller(x: (String, Int)): CQLRowKeyMap = Map("word" -> x._1)
implicit def rowMarshaller(x: (String, Int)): CQLRowValues = List(x._2)
- Setup the result stream to persist
Once the processing pipeline is set, you can setup a function to persist the RDDs in DStream to C* using the new API,
wordCounts.saveToCas(cas)
Or using the traditional approach,
wordCounts.foreach(_.cql3SaveToCassandra(cas))
- Start streaming
Start the streaming
ssc.start()
SnackFS for checkpointing
One of the things that most of the other Cassandra Spark libraries miss out is the need for a DFS for streaming checkpoint in reduceByKey and updateStateByKey operations. Calliope utilizes SnackFS for this purpose. You can set the checkpoint directory to point it to snackfs.
ssc.checkpoint("snackfs://path/to/checkpoint/dir")