Spark: SAXParseException when writing parquet on s3

I am trying to read in json, output the circuit and write it again as parquet in s3 (s3a). For some reason, about a third of the path through the recording portion of the run, the spark always produces an error with the error below. I cannot find any obvious reasons for this question: it is not from memory; no long pauses gc. There are no additional error messages in the individual artist logs.

The script works fine on another dataset that I have, which has a very similar structure, but several orders of magnitude less.

I run spark 2.0.1-hadoop-2.7 and use FileOutputCommitter. The version of the algorithm does not seem to matter.

Edit: This does not seem to be a problem in badly formed json or corrupted files. I unzipped and read in each file individually without errors.

Here's a simplified version of the script:

object Foo { def parseJson(json: String): Option[Map[String, Any]] = { if (json == null) Some(Map()) else parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String, Any]]) } } } // read in as text and parse json using json4s val jsonRDD: RDD[String] = sc.textFile(inputPath) .map(row -> Foo.parseJson(row)) // infer a schema that will encapsulate the most rows in a sample of size sampleRowNum val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD, sampleRowNum) // get documents compatibility with schema val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD .map(js => (js, Infer.getSchemaCompatibility(schema, Infer.inferSchema(js)).toBoolean)) .repartition(partitions) val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD .filter { case (js: String, compatible: Boolean) => compatible } .map { case (js: String, _: Boolean) => js } // create a dataframe from documents with compatible schema val dataFrame: DataFrame = spark.read.schema(schema).json(jsonCompatibleRDD) 

It completes the previous circuit, displaying the step successfully. The error itself occurs on the last line, but I assume that it can include at least the previous statemnt, if not earlier:

 org.apache.spark.SparkException: Task failed while writing rows at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Failed to commit task at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) ... 8 more Suppressed: java.lang.NullPointerException at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:282) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:258) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1354) ... 9 more Caused by: com.amazonaws.AmazonClientException: Unable to unmarshall response (Failed to parse XML document with handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler). Response Code: 200, Response Text: OK at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:738) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:399) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3480) at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:604) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:962) at org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:1147) at org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:1136) at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:142) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:400) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:267) ... 13 more Caused by: com.amazonaws.AmazonClientException: Failed to parse XML document with handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:150) at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:279) at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:75) at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:72) at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62) at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31) at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:712) ... 29 more Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 2; XML document structures must start and end within the same entity. at org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown Source) at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown Source) at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source) at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source) at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source) at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source) at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.endEntity(Unknown Source) at org.apache.xerces.impl.XMLDocumentScannerImpl.endEntity(Unknown Source) at org.apache.xerces.impl.XMLEntityManager.endEntity(Unknown Source) at org.apache.xerces.impl.XMLEntityScanner.load(Unknown Source) at org.apache.xerces.impl.XMLEntityScanner.skipChar(Unknown Source) at org.apache.xerces.impl.XMLDocumentScannerImpl$PrologDispatcher.dispatch(Unknown Source) at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source) at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source) at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source) at org.apache.xerces.parsers.XMLParser.parse(Unknown Source) at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source) at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:141) ... 35 more 

Here is my conf:

 spark.executor.extraJavaOptions -XX:+UseG1GC -XX:MaxPermSize=1G -XX:+HeapDumpOnOutOfMemoryError spark.executor.memory 16G spark.executor.uri https://s3.amazonaws.com/foo/spark-2.0.1-bin-hadoop2.7.tgz spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.hadoop.fs.s3a.buffer.dir /raid0/spark spark.hadoop.fs.s3n.buffer.dir /raid0/spark spark.hadoop.fs.s3a.connection.timeout 500000 spark.hadoop.fs.s3n.multipart.uploads.enabled true spark.hadoop.parquet.block.size 2147483648 spark.hadoop.parquet.enable.summary-metadata false spark.jars.packages com.databricks:spark-avro_2.11:3.0.1 spark.local.dir /raid0/spark spark.mesos.coarse false spark.mesos.constraints priority:1 spark.network.timeout 600 spark.rpc.message.maxSize 500 spark.speculation false spark.sql.parquet.mergeSchema false spark.sql.planner.externalSort true spark.submit.deployMode client spark.task.cpus 1 
+6
source share
3 answers

I can think for three possible reasons for this problem.

+2
source

A SAXParseException may point to a poorly formatted XML file. Since the task takes place approximately one third of the path sequentially, this means that it probably does not work in the same place every time (a file whose section occupies about one third of the path in the list of sections).

Can you insert your script? Perhaps you can complete the Spark step in a try / catch loop that will print the file if this error occurs, which allows you to easily scale up the problem.

+1
source

From the magazines:

Throws: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 2; XML document structures must begin and end within the same object.

and

Called: com.amazonaws.AmazonClientException: Failed to parse an XML document with the com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser $ ListBucketHandler handler class

It looks like you have a damaged / incorrectly formatted file, and your error actually occurs while reading part of the task. You can confirm this by trying another operation that forces the read to read as count() .

If this is confirmed, the goal will be to search for the damaged file. You can do this by specifying s3 files, sc.parallelize() this list, and then try to read the files in a user-defined function using map() .

 import boto3 from pyspark.sql import Row def scanKeys(startKey, endKey): bucket = boto3.resource('s3').Bucket('bucketName') for obj in bucket.objects.filter(Prefix='prefix', Marker=startKey): if obj.key < endKey: yield obj.key else: return def testFile(s3Path): s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=key) body = s3obj.get()['Body'] ... logic to test file format, or use a try/except and attempt to parse it ... if fileFormatedCorrectly == True: return Row(status='Good', key = s3Path) else: return Row(status='Fail', key = s3Path) keys = list(scanKeys(startKey, endKey)) keyListRdd = sc.parallelize(keys, 1000) keyListRdd.map(testFile).filter(lambda x: x.asDict.get('status') == 'Fail').collect() 

This will return s3 paths for files formatted incorrectly

+1
source

Source: https://habr.com/ru/post/1011729/


All Articles