How to load sequence files (from HDFS) in parallel and process each of them in parallel through Spark ..?

I need to upload HDFS files in parallel and process (read and filter it based on some criteria) each file in parallel. The following code downloads files in sequential order. Launching the Spark application with three workers (4 cores each). I even tried to set the parion parameter in the parallelization method, but without performance improvement. I am sure that my cluster has enough resources for parallel tasks. What changes should be made to make it parallel?

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); JavaRDD<String> files = sparkContext.parallelize(fileList); Iterator<String> localIterator = files.toLocalIterator(); while (localIterator.hasNext()) { String hdfsPath = localIterator.next(); long startTime = DateUtil.getCurrentTimeMillis(); JavaPairRDD<IntWritable, BytesWritable> hdfsContent = sparkContext.sequenceFile(hdfsPath, IntWritable.class, BytesWritable.class); try { JavaRDD<Message> logs = hdfsContent.map(new Function<Tuple2<IntWritable, BytesWritable>, Message>() { public Message call(Tuple2<IntWritable, BytesWritable> tuple2) throws Exception { BytesWritable value = tuple2._2(); BytesWritable tmp = new BytesWritable(); tmp.setCapacity(value.getLength()); tmp.set(value); return (Message) getProtos(logtype, tmp.getBytes()); } }); final JavaRDD<Message> filteredLogs = logs.filter(new Function<Message, Boolean>() { public Boolean call(Message msg) throws Exception { FieldDescriptor fd = msg.getDescriptorForType().findFieldByName("method"); String value = (String) msg.getField(fd); if (value.equals("POST")) { return true; } return false; } }); long timetaken = DateUtil.getCurrentTimeMillis() - startTime; LOGGER.log(Level.INFO, "HDFS: {0} Total Log Count : {1} Filtered Log Count : {2} TimeTaken : {3}", new Object[] { hdfsPath, logs.count(), filteredLogs.count(), timetaken }); } catch (Exception e) { LOGGER.log(Level.INFO, "Exception : ", e); } } 

Instead of repeating RDD files, I also tried Spark features like map and foreach. But he throws out Sparks Sparks. External variables are not mentioned inside the closure, and my class (OldLogAnalyzer) already implements the Serializable interface. Also KryoSerializer and Javaserializer are configured in SparkConf. I am puzzled by what is not serializable in my code.

  Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1622) at org.apache.spark.rdd.RDD.map(RDD.scala:286) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:81) at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32) at com.test.logs.spark.OldLogAnalyzer.main(OldLogAnalyzer.java:423) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext Serialization stack: - object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@68f277a2 ) - field (class: com.test.logs.spark.OldLogAnalyzer$10, name: val$sparkContext, type: class org.apache.spark.api.java.JavaSparkContext) - object (class com.test.logs.spark.OldLogAnalyzer$10, com.test.logs.spark.OldLogAnalyzer$10@2f80b005 ) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 15 more 
+5
source share

Source: https://habr.com/ru/post/1235030/


All Articles