PySpark Support
Alpha Component
Since the launch of Calliope many people have been requesting PySpark support for it. There have been various challenges related to implementing the transformers for vanilla Calliope and exposing an API for the same in Python. But with launch of CalliopeSQL (i.e. SparkSQL support in Calliope) we have a way to reliably work with Cassandra data in PySpark without the need to write transformers.
With CalliopeSQL support for PySpark, you can work with your data in Cassandra just as you would with data in any SparkSQL supported datastore (hdfs, json files, parquet, etc.) and at the same time take advantage of the access patterns and capabilities of Cassandra.
Download the Binaries
Download calliope-sql-assembly or calliope-hive-assembly
Download the Calliope Python Egg
Getting Started
PySpark Shell
Assuming you downloaded the required binaries to a folder called calliope in your SPARKHOME, to start PySpark shell with calliope-sql support, use the following command in SPARKHOME folder.
$ bin/pyspark --jars calliope/calliope-sql-assembly-1.1.0-CTP-U2.jar --driver-class-path calliope/calliope-sql-assembly-1.1.0-CTP-U2.jar --py-files calliope/calliope-0.0.1-py2.7.egg
Or to start it with calliope-hive-support use this,
$ bin/pyspark --jars calliope/calliope-hive-assembly-1.1.0-CTP-U2.jar --driver-class-path calliope/calliope-hive-assembly-1.1.0-CTP-U2.jar --py-files calliope/calliope-0.0.1-py2.7.egg
Reading data from Cassandra
To read data from your Cassandra keyspace "testks" and table, "testcf", first you need to create the sqlContext,
from calliope import CalliopeSQLContext
sqlContext = CalliopeSQLContext(sc)
or to create the hive context,
from calliope import CalliopeHiveContext
sqlContext = CalliopeHiveContext(sc)
From here on you can use it as you would use the sqlContext in PySpark with one advantage, that all your C* keyspaces and tables are already available in the context. So to read from the table,
srdd = sqlContext.sql("select * from test_ks.test_cf")
Configuring CalliopeSQL
By default Calliope assumes that the driver application is running on the same node as C* and tries to connect to 127.0.0.1 as the root node. If the driver application doesn't run on the same system as the C* then you need to configure the location using Spark properties. Calliope also provides some additional properties that can be configured as per your requirements.
- spark.cassandra.connection.host - Configures the initial contact point in the Cassandra cluster. Must be reachable from the driver application node. [Default: 127.0.0.1]
- spark.cassandra.connection.native.port - The native protocol port that the contact node is listening to. [Default: 9042]
- spark.cassandra.connection.rpc.port - The thrift protocol port that the contact node is listening to. [Default: 9160]
- spark.cassandra.auth.username - Username for authenticating to the C*
- spark.cassandra.auth.password - Password for authenticating to the C*
Programatically loading Cassandra Tables
Sometimes you may need to connect to different Cassandra clusters or need access to the underlying SchemaRDD. For this purpose, Calliope's SQL Context provides the cassandraTable method with some alternate signatures, including one where you can provide host, port, keyspace, table, username and password.
srdd = sqlContext.cassandraTable("cas.host", "cas.native.port", "keyspace", "table")