Spark / Neo4j error: RuntimeException: java.util.Collections $ UnmodifiableRandomAccessList is not a valid external type for string schema

Exact request:

call spatial.bbox('geom', {lat:37.5,lon:43.4}, {lat:37.6,lon:43.5}) yield node return node.altitude as altitude, node.detect_type as detect_type, node.gtype as gtype, node.toDateFormatLong as toDateFormatLong, node.change_area as change_area, node.latitude as latitude, node.longitude as longitude, node.fromDateFormatLong as fromDateFormatLong, node.iids as iids, node.detect_strength as detect_strength, node.fromDate as fromDate, node.bbox as bbox ORDER BY node.toDateFormatLong DESC 

Dataset example:

 ╒══════════╀═════════════╀═══════╀══════════════════╀═════════════╀══════════╀═══════════╀════════════════════╀═════════════════════════════════════════════════════════════════════╀═════════════════╀══════════╀═════════════════════════════════════════════╕ β”‚"altitude"β”‚"detect_type"β”‚"gtype"β”‚"toDateFormatLong"β”‚"change_area"β”‚"latitude"β”‚"longitude"β”‚"fromDateFormatLong"β”‚"iids" β”‚"detect_strength"β”‚"fromDate"β”‚"bbox" β”‚ β•žβ•β•β•β•β•β•β•β•β•β•β•ͺ═════════════β•ͺ═══════β•ͺ══════════════════β•ͺ═════════════β•ͺ══════════β•ͺ═══════════β•ͺ════════════════════β•ͺ═════════════════════════════════════════════════════════════════════β•ͺ═════════════════β•ͺ══════════β•ͺ═════════════════════════════════════════════║ β”‚-1 β”‚"Arrival" β”‚1 β”‚20161104 β”‚16981 β”‚37.5608649β”‚43.4297988 β”‚20161023 β”‚"23OCT16S1A89377_09_IW1_09_pp_1231_04NOV16S1A90776_09_123_31_TT_QQQQ"β”‚7.2 β”‚"23OCT16" β”‚[43.4297988,37.5608649,43.4297988,37.5608649]β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚-1 β”‚"Arrival" β”‚1 β”‚20161104 β”‚3123 β”‚37.56749 β”‚43.4807208 β”‚20161023 β”‚"23OCT16S1A89377_09_IW1_09_pp_1231_04NOV16S1A90776_09_124_32_TT_QQQQ"β”‚7.5 β”‚"23OCT16" β”‚[43.4807208,37.56749,43.4807208,37.56749] β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ 

which i call

 try { val initialDf2 = neo.cypher(query).loadDataFrame val someVal = initialDf2.collectAsList() } catch { case e: Exception => e.printStackTrace } 

I get this error:

  17/09/18 08:44:48 ERROR TaskSetManager: Task 0 in stage 298.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 298.0 failed 1 times, most recent failure: Lost task 0.0 in stage 298.0 (TID 298, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableRandomAccessList is not a valid external type for schema of string if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, altitude), DoubleType) AS altitude#1678 if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, detect_type), StringType), true) AS detect_type#1679 if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, gtype), LongType) AS gtype#1680L if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, toDateFormatLong), LongType) AS toDateFormatLong#1681L if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, change_area), LongType) AS change_area#1682L if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, latitude), DoubleType) AS latitude#1683 if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, longitude), DoubleType) AS longitude#1684 if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, fromDateFormatLong), LongType) AS fromDateFormatLong#1685L if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, iids), StringType), true) AS iids#1686 if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, detect_strength), DoubleType) AS detect_strength#1687 if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, fromDate), StringType), true) AS fromDate#1688 if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, bbox), StringType), true) AS bbox#1689 at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573) 

Data is returned if I do not enable bbox.

In neo4j browser, I can run the problem request and the results will return:

 -1 "Detected" 1 20161104 3318 37.5049815 43.4171031 20161023 "filename.val" 9.2 "23OCT16" [43.4171031, 37.5049815, 43.4171031, 37.5049815] 

This is a secondary list, I may have to return node.bbox.somevalue1 as bbbox1, but I don’t know what kind of syntax will be ....

I think this is a similar problem with what I had ...

Exceptional Neo4j connector loadDataFrame gives error

and is solved by:

https://github.com/neo4j-contrib/neo4j-spark-connector/issues/40

He seems to want more from what I am returning.

+5
source share
1 answer

loadDataFrame you need to remove the dataframe schema using (fieldName and fieldtype)

like this:

  val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN n.user as user,n.other as other,n.direction as direction,n.duration as duration,n.timestamp as timestamp") .loadDataFrame(schema = ("user","object"),("other","object"),("direction","string"),("duration","String"),("timestamp","String")) rawGraphnode.printSchema() rawGraphnode.show(10) 
-1
source

Source: https://habr.com/ru/post/1271960/


All Articles