Pyspark: org.apache.thrift.transport.TTransportException on ERROR

I use Zeppelin Notebooks / Apache Spark and I often get the following error:

org.apache.thrift.transport.TTransportException on org.apache.thrift.transport.TIOStreamTransport.read (TIOStreamTransport.java:132) on org.apache.thrift.transport.TTransport.readAll (TTransport.java:86) on org. apache. java: 219) at org.apache.thrift.TServiceClient.receiveBase (TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService $ Client.recv_interpret (RemoteInterpreterService.java:249ze org .interpreter. (LazyOpenInter preter.java:94) at org.apache.zeppelin.notebook.Paragraph.jobRun (Paragraph.java:279) at org.apache.zeppelin.scheduler.Job.run (Job.java:176) at org.apache.zeppelin .scheduler.RemoteScheduler $ JobRunner.run (RemoteScheduler.java:328) in java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java∗11) in java.util.concurrent.FutureTask.run (FutureTask.java:266 ) to java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201 (ScheduledThreadPoolExecutor.java:180) to java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run (ScheduledThreadPoolExoolecoolecoolecoolecoolecoolecoolecoolecoolecoolecoolecool (ThreadPoolExecutor.java:1142) in java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) in java.lang.Thread.run (Thread.java:745)

If I try to run the same code again (just ignoring the error), I get this (top line only):

java.net.SocketException: corrupted channel (write failed)

Then, if I try to run it a third time (or anytime after that), I get this error:

java.net.ConnectException: connection rejected (connection rejected)

If I restart the interpreter in Zeppelin Notebooks, then it works (initially), but in the end I get this error again.

This error occurred at different stages of my process (data cleaning, vectorization, etc.), but the most frequent time that it happens (at the moment) is when I fit the model. To give you a better idea of ​​what I'm actually doing, and when this usually happens, I will guide you through my process:

I am using Apache Spark ML and have done standard vectorization, weighting, etc. (CountVectorizer, IDF), and then built a model for this data.

I used VectorAssember to create my function vector, translated it into a dense vector, and converted it to a data framework:

assembler = VectorAssembler(inputCols = ["fileSize", "hour", "day", "month", "punct_title", "cap_title", "punct_excerpt", "title_tfidf", "ct_tfidf", "excerpt_tfidf", "regex_tfidf"], outputCol="features") vector_train = assembler.transform(train_raw).select("Target", "features") vector_test = assembler.transform(test_raw).select("Target", "features") train_final = vector_train.rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray()))) test_final = vector_test.rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray()))) train_final_df = sqlContext.createDataFrame(train_final) test_final_df = sqlContext.createDataFrame(test_final) 

Thus, the training set that will be loaded into the model looks like this (the actual data set has ~ 15 thousand columns, and I reduced it to 5 thousand examples to try to run it):

[String (functions = DenseVector ([7016.0, 9.0, 16.0, 2.0, 2.0, 4.0, 5.0, 0.0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0, 0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 1,315, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, ..................... 0,0, 0,0, 0 , 0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0 , 0,0, 0,0, 0,0, 0,0, 0,0, 7.235, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 , 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), label = 0)]

The next step is to fit the model in which the error usually appears. I tried installing one model and running CV (w / ParamGrid):

Single Model:

 from pyspark.ml.classification import GBTClassifier from pyspark.ml.evaluation import BinaryClassificationEvaluator gbt = GBTClassifier(labelCol="label", featuresCol="features", maxDepth=8, maxBins=16, maxIter=40) GBT_model = gbt.fit(train_final_df) predictions_GBT = GBT_model.transform(test_final_df) predictions_GBT.cache() evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction") auroc = evaluator.evaluate(predictions_GBT, {evaluator.metricName: "areaUnderROC"}) aupr = evaluator.evaluate(predictions_GBT, {evaluator.metricName: "areaUnderPR"}) 

With CV / PG:

 from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.classification import GBTClassifier GBT_model = GBTClassifier() paramGrid = ParamGridBuilder() \ .addGrid(GBT_model.maxDepth, [2,4]) \ .addGrid(GBT_model.maxBins, [2,4]) \ .addGrid(GBT_model.maxIter, [10,20]) \ .build() evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", metricName="areaUnderPR") crossval = CrossValidator(estimator=GBT_model, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) cvModel = crossval.fit(train_final_df) 

I know that this has something to do with the interpreter, but I can’t understand: (a) what I am doing wrong or (b) What to do to get around this failure

UPDATE : I was asked about the versions and memory configuration in SO Apache Spark chat, so I decided that I was offering an update here.

Versions:

  • Spark: 2.0.1
  • Zeppelin: 0.6.2

Memory configuration:

  • I am starting an EMR cluster using an instance of c1.xlarge EC2 (7 GiB) as my wizard and r3.8xlarge (244 GiB) for my main nodes.
  • I went into Zeppelin and changed spark.driver.memory to 4g and spark.executor.memory to 128g

After I logged in and installed these Zeppelin memory configurations, I ran my code again and still got the same error.

I recently started using Spark, are there any other memory configurations that need to be installed? Are these memory configurations reasonable?

+5
source share

Source: https://habr.com/ru/post/1260734/


All Articles