The original Neo4j loadDataFrame connector gives an error

EDIT: tried to do what was suggested in the comments with .toDF, got this error:

^ neo: org.neo4j.spark.Neo4j = org.neo4j.spark.Neo4j@5dfb65d5 warning: there was one deprecation warning; re-run with -deprecation for details sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@333e01c6 import sqlContext.implicits._ rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n using Map() <console>:48: error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] val df = rdd.toDF() 

I run this simple scala code:

 import org.neo4j.spark._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf val conf = new SparkConf.setMaster("local").setAppName("neo4jspark") val sc = new SparkContext(conf) val neo = Neo4j(sc) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ //val rdd = neo.cypher("call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n").loadRowRdd val df = neo.cypher("call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n").loadDataFrame 

It gives a few errors, the conf problem is an error, but it seems to work when I load the RDD. But here I also get an error, it still gives me the number of elements in the call, although then I get serialization errors. Not sure if there are any steps or things missing in this example:

https://blog.knoldus.com/2016/10/05/neo4j-with-scala-awesome-experience-with-spark/

 Loading neo4jspark.scala... import org.neo4j.spark._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf <console>:38: error: not found: value SparkConf val conf = new SparkConf.setMaster("local").setAppName("neo4jspark") ^ <console>:38: error: not found: value conf val sc = new SparkContext(conf) ^ neo: org.neo4j.spark.Neo4j = org.neo4j.spark.Neo4j@5dfb65d5 rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) MATCH (p:POINT) RETURN p using Map() res0: Long = 53118 17/08/25 14:31:15 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.io.NotSerializableException: org.neo4j.driver.internal.InternalNode Serialization stack: - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;) - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>]) - element of array (index: 0) - array (class [Lorg.apache.spark.sql.Row;, size 5) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/08/25 14:31:15 ERROR TaskSetManager: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.neo4j.driver.internal.InternalNode Serialization stack: - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;) - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>]) - element of array (index: 0) - array (class [Lorg.apache.spark.sql.Row;, size 5); not retrying org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.neo4j.driver.internal.InternalNode Serialization stack: - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;) - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>]) - element of array (index: 0) - array (class [Lorg.apache.spark.sql.Row;, size 5) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.take(RDD.scala:1327) ... 79 elided 
0
source share

Source: https://habr.com/ru/post/1271962/


All Articles