I have an ELK with Pyspark.
saved RDD as ELK data in local file system
rdd.saveAsTextFile("/tmp/ELKdata") logData = sc.textFile('/tmp/ELKdata/*') errors = logData.filter(lambda line: "raw1-VirtualBox" in line) errors.count()
value i have 35
errors.first()
I got a way out
(u'AVI0UK0KZsowGuTwoQnN ', {u'host': u'raw1-VirtualBox ', u'ident': u'NetworkManager ', u'pid': u'748 ', u'message': u "(eth0): device status change: ip-config → secondary (reason “no”) [70 90 0] ", u '@timestamp': u'2016-01-12T10: 59: 48 + 05: 30 '})
when I try to write data in elastic search from pyspark, I get errors
errors.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf= {"es.resource" : "logstash-2016.01.12/errors})
Huge Java Errors
org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
at org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 113)
at org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 108)
at scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
at scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 921)
at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 903)
at org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 62)
at org.apache.spark.scheduler.Task.run (Task.scala: 54)
at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 177)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
at java.lang.Thread.run (Thread.java:745)
16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 113)
org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 108)
scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 921)
org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 903)
org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 62)
org.apache.spark.scheduler.Task.run (Task.scala: 54)
org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 177)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
java.lang.Thread.run (Thread.java:745)
16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; aborting job
16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31
16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was canceled
16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala: 665
Traceback (most recent call last):
File "", line 6, in
File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile
keyConverter, valueConverter, jconf)
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError16 / 01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62)
org.apache.spark.TaskKilledException
at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 168)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
at java.lang.Thread.run (Thread.java:745)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException:
org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 168)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
java.lang.Thread.run (Thread.java:745)
16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool
: An error occurred while calling z: org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 113)
org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 108)
scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 921)
org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 903)
org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 62)
org.apache.spark.scheduler.Task.run (Task.scala: 54)
org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 177)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
java.lang.Thread.run (Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1185)
at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1174)
at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1173)
at scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59)
at scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 47)
at org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1173)
at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 688)
at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 688)
at scala.Option.foreach (Option.scala: 236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor $$ anonfun $ receive $ 2.applyOrElse (DAGScheduler.scala: 1391)
at akka.actor.ActorCell.receiveMessage (ActorCell.scala: 498)
at akka.actor.ActorCell.invoke (ActorCell.scala: 456)
at akka.dispatch.Mailbox.processMailbox (Mailbox.scala: 237)
at akka.dispatch.Mailbox.run (Mailbox.scala: 219)
at akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinTask.exec (AbstractDispatcher.scala: 386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask (ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107)
if I did it manually, I can write the data
errors = logData.filter(lambda line: "raw1-VirtualBox" in line) errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox", "ident": "NetworkManager", "pid": "69", "message": " sucess <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]", "@timestamp": "2016-01-12T10:59:48+05:30" }))
but I want to write filtered data and managed data in search of elasticity.