Calliope

Calliope provides a bridge between Cassandra and Spark framework allowing you to create those magical, realtime bigdata apps with ease!

PySpark Support

Alpha Component

Since the launch of Calliope many people have been requesting PySpark support for it. There have been various challenges related to implementing the transformers for vanilla Calliope and exposing an API for the same in Python. But with launch of CalliopeSQL (i.e. SparkSQL support in Calliope) we have a way to reliably work with Cassandra data in PySpark without the need to write transformers.

With CalliopeSQL support for PySpark, you can work with your data in Cassandra just as you would with data in any SparkSQL supported datastore (hdfs, json files, parquet, etc.) and at the same time take advantage of the access patterns and capabilities of Cassandra.

Download the Binaries

Download calliope-sql-assembly or calliope-hive-assembly

Download the Calliope Python Egg

Getting Started

PySpark Shell

Assuming you downloaded the required binaries to a folder called calliope in your SPARKHOME, to start PySpark shell with calliope-sql support, use the following command in SPARKHOME folder.

$ bin/pyspark --jars calliope/calliope-sql-assembly-1.1.0-CTP-U2.jar --driver-class-path calliope/calliope-sql-assembly-1.1.0-CTP-U2.jar --py-files calliope/calliope-0.0.1-py2.7.egg

Or to start it with calliope-hive-support use this,

$ bin/pyspark --jars calliope/calliope-hive-assembly-1.1.0-CTP-U2.jar --driver-class-path calliope/calliope-hive-assembly-1.1.0-CTP-U2.jar --py-files calliope/calliope-0.0.1-py2.7.egg

Reading data from Cassandra

To read data from your Cassandra keyspace "testks" and table, "testcf", first you need to create the sqlContext,

from calliope import CalliopeSQLContext
sqlContext = CalliopeSQLContext(sc)

or to create the hive context,

from calliope import CalliopeHiveContext
sqlContext = CalliopeHiveContext(sc)

From here on you can use it as you would use the sqlContext in PySpark with one advantage, that all your C* keyspaces and tables are already available in the context. So to read from the table,

srdd = sqlContext.sql("select * from test_ks.test_cf")

Configuring CalliopeSQL

By default Calliope assumes that the driver application is running on the same node as C* and tries to connect to 127.0.0.1 as the root node. If the driver application doesn't run on the same system as the C* then you need to configure the location using Spark properties. Calliope also provides some additional properties that can be configured as per your requirements.

  • spark.cassandra.connection.host - Configures the initial contact point in the Cassandra cluster. Must be reachable from the driver application node. [Default: 127.0.0.1]
  • spark.cassandra.connection.native.port - The native protocol port that the contact node is listening to. [Default: 9042]
  • spark.cassandra.connection.rpc.port - The thrift protocol port that the contact node is listening to. [Default: 9160]
  • spark.cassandra.auth.username - Username for authenticating to the C*
  • spark.cassandra.auth.password - Password for authenticating to the C*

Programatically loading Cassandra Tables

Sometimes you may need to connect to different Cassandra clusters or need access to the underlying SchemaRDD. For this purpose, Calliope's SQL Context provides the cassandraTable method with some alternate signatures, including one where you can provide host, port, keyspace, table, username and password.

srdd = sqlContext.cassandraTable("cas.host", "cas.native.port", "keyspace", "table")