How to write data to Elasticsearch from Pyspark?

I have an ELK with Pyspark.

saved RDD as ELK data in local file system

rdd.saveAsTextFile("/tmp/ELKdata") logData = sc.textFile('/tmp/ELKdata/*') errors = logData.filter(lambda line: "raw1-VirtualBox" in line) errors.count() 

value i have 35

 errors.first() 

I got a way out

(u'AVI0UK0KZsowGuTwoQnN ', {u'host': u'raw1-VirtualBox ', u'ident': u'NetworkManager ', u'pid': u'748 ', u'message': u "(eth0): device status change: ip-config → secondary (reason “no”) [70 90 0] ", u '@timestamp': u'2016-01-12T10: 59: 48 + 05: 30 '})

when I try to write data in elastic search from pyspark, I get errors

 errors.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf= {"es.resource" : "logstash-2016.01.12/errors}) 

Huge Java Errors

 org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
     at org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 113)
     at org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 108)
     at scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
     at scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
     at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 921)
     at org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 903)
     at org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 62)
     at org.apache.spark.scheduler.Task.run (Task.scala: 54)
     at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 177)
     at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
     at java.lang.Thread.run (Thread.java:745)
 16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes)
 16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
         org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 113)
         org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 108)
         scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
         scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
         org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 921)
         org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 903)
         org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 62)
         org.apache.spark.scheduler.Task.run (Task.scala: 54)
         org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 177)
         java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
         java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
         java.lang.Thread.run (Thread.java:745)
 16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times;  aborting job
 16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31
 16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was canceled
 16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62)
 16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala: 665
 Traceback (most recent call last):
   File "", line 6, in 
   File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile
     keyConverter, valueConverter, jconf)
   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
 py4j.protocol.Py4JJavaError16 / 01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62)
 16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62)
 org.apache.spark.TaskKilledException
     at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 168)
     at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
     at java.lang.Thread.run (Thread.java:745)
 16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException: 
         org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 168)
         java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
         java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
         java.lang.Thread.run (Thread.java:745)
 16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool 
 : An error occurred while calling z: org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
         org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 113)
         org.apache.spark.api.python.SerDeUtil $$ anonfun $ pythonToPairRDD $ 1 $$ anonfun $ apply $ 3.apply (SerDeUtil.scala: 108)
         scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
         scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328)
         org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 921)
         org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ 12.apply (PairRDDFunctions.scala: 903)
         org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 62)
         org.apache.spark.scheduler.Task.run (Task.scala: 54)
         org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 177)
         java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
         java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615)
         java.lang.Thread.run (Thread.java:745)
 Driver stacktrace:
     at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1185)
     at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1174)
     at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1173)
     at scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59)
     at scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 47)
     at org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1173)
     at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 688)
     at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 688)
     at scala.Option.foreach (Option.scala: 236)
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 688)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor $$ anonfun $ receive $ 2.applyOrElse (DAGScheduler.scala: 1391)
     at akka.actor.ActorCell.receiveMessage (ActorCell.scala: 498)
     at akka.actor.ActorCell.invoke (ActorCell.scala: 456)
     at akka.dispatch.Mailbox.processMailbox (Mailbox.scala: 237)
     at akka.dispatch.Mailbox.run (Mailbox.scala: 219)
     at akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinTask.exec (AbstractDispatcher.scala: 386)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260)
     at scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask (ForkJoinPool.java:1339)
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979)
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107)


if I did it manually, I can write the data

 errors = logData.filter(lambda line: "raw1-VirtualBox" in line) errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox", "ident": "NetworkManager", "pid": "69", "message": " sucess <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]", "@timestamp": "2016-01-12T10:59:48+05:30" })) 

but I want to write filtered data and managed data in search of elasticity.

+5
source share
2 answers

I had a similar problem, and here is how I managed to solve it. At first I used dataframe vs using RDD.

Once in a data frame

 from pyspark.sql import SQLContext df.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save() 
+4
source

Like the accepted answer right now, I was in the same boat, trying to write data as RDD. The answer above is very close, but there are many configuration options that will also be useful. If you do not use localhost by default for your node, this answer will not work.

An information frame is a way, much simpler, simpler. If you use the pyspark shell when you start the shell add the path to jas.

From cli, run the shell using:

 $ pyspark2 --jars <pathtojar>/elasticsearch-hadoop-5.XXjar 

You do not need the following line:

 from pyspark.sql import SQLContext 

When you have your framework, you just need the following additional features:

 df.write.format("org.elasticsearch.spark.sql") .option("es.resource", "<index/type>") .option("es.nodes", "<enter node address or name>").save() 

If the index / type you specified does not already exist in Elasticsearch, it will be created.

You can add additional parameters that can be found here: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

0
source

Source: https://habr.com/ru/post/1240896/


All Articles