I am desperate to connect Cassandra to pyspark, but I cannot get it to work. I am completely new to spark and cassandra, so I might have missed something pretty simple.
I am a little confused by all the various explanations on the Internet, however, from what I understood, the easiest way would be to use "spark packages"? ( http://spark-packages.org/package/TargetHolding/pyspark-cassandra )
So, with the following command:
./bin/spark-submit
Do I understand correctly that I do not need to download any packages if I use spark packages as described above?
in myPysparkFile.py I tried the following two versions, none of which I worked for me:
Version 1, which I got from page 14 at http://www.slideshare.net/JonHaddad/intro-to-py-spark-and-cassandra :
"SparkCassandraTest.py" from pyspark import SparkContext, SparkConf from pyspark_cassandra import CassandraSparkContext,Row conf = SparkConf() conf.setMaster("local[4]") conf.setAppName("Spark Cassandra") conf.set("spark.cassandra.connection.host","http://127.0.0.1") sc = CassandraSparkContext(conf=conf) rdd = sc.cassandraTable("test", "words")
As an error, I get:
ImportError: No module named pyspark_cassandra
Version 2 (inspired by: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md ):
"SparkCassandraTest.py" from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext conf = SparkConf() conf.setMaster("local[4]") conf.setAppName("Spark Cassandra") conf.set("spark.cassandra.connection.host","http://127.0.0.1") sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) sqlContext.read\ .format("org.apache.spark.sql.cassandra")\ .options(table="kv", keyspace="test")\ .load().show()
which gives me the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o28.load. : java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at org.apache.spark.sql.cassandra.DefaultSource$.<init>(DefaultSource.scala:138) at org.apache.spark.sql.cassandra.DefaultSource$.<clinit>(DefaultSource.scala) at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
I really don't know what I'm doing wrong, and I will be grateful for any help. Also, what is the difference between using version 1 or version 2? Are there any advantages or disadvantages between these two versions?
In addition, any further links to how to best integrate and use the spark with cassandra would be greatly appreciated.
Btw, Cassandra runs on my computer with basic configurations on port 7000.
Thanks.