I use Zeppelin Notebooks / Apache Spark and I often get the following error:
org.apache.thrift.transport.TTransportException on org.apache.thrift.transport.TIOStreamTransport.read (TIOStreamTransport.java:132) on org.apache.thrift.transport.TTransport.readAll (TTransport.java:86) on org. apache. java: 219) at org.apache.thrift.TServiceClient.receiveBase (TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService $ Client.recv_interpret (RemoteInterpreterService.java:249ze org .interpreter. (LazyOpenInter preter.java:94) at org.apache.zeppelin.notebook.Paragraph.jobRun (Paragraph.java:279) at org.apache.zeppelin.scheduler.Job.run (Job.java:176) at org.apache.zeppelin .scheduler.RemoteScheduler $ JobRunner.run (RemoteScheduler.java:328) in java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java∗11) in java.util.concurrent.FutureTask.run (FutureTask.java:266 ) to java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201 (ScheduledThreadPoolExecutor.java:180) to java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run (ScheduledThreadPoolExoolecoolecoolecoolecoolecoolecoolecoolecoolecoolecoolecool (ThreadPoolExecutor.java:1142) in java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) in java.lang.Thread.run (Thread.java:745)
If I try to run the same code again (just ignoring the error), I get this (top line only):
java.net.SocketException: corrupted channel (write failed)
Then, if I try to run it a third time (or anytime after that), I get this error:
java.net.ConnectException: connection rejected (connection rejected)
If I restart the interpreter in Zeppelin Notebooks, then it works (initially), but in the end I get this error again.
This error occurred at different stages of my process (data cleaning, vectorization, etc.), but the most frequent time that it happens (at the moment) is when I fit the model. To give you a better idea of what I'm actually doing, and when this usually happens, I will guide you through my process:
I am using Apache Spark ML and have done standard vectorization, weighting, etc. (CountVectorizer, IDF), and then built a model for this data.
I used VectorAssember to create my function vector, translated it into a dense vector, and converted it to a data framework:
assembler = VectorAssembler(inputCols = ["fileSize", "hour", "day", "month", "punct_title", "cap_title", "punct_excerpt", "title_tfidf", "ct_tfidf", "excerpt_tfidf", "regex_tfidf"], outputCol="features") vector_train = assembler.transform(train_raw).select("Target", "features") vector_test = assembler.transform(test_raw).select("Target", "features") train_final = vector_train.rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray()))) test_final = vector_test.rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray()))) train_final_df = sqlContext.createDataFrame(train_final) test_final_df = sqlContext.createDataFrame(test_final)
Thus, the training set that will be loaded into the model looks like this (the actual data set has ~ 15 thousand columns, and I reduced it to 5 thousand examples to try to run it):
[String (functions = DenseVector ([7016.0, 9.0, 16.0, 2.0, 2.0, 4.0, 5.0, 0.0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0, 0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 1,315, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, ..................... 0,0, 0,0, 0 , 0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0 , 0,0, 0,0, 0,0, 0,0, 0,0, 7.235, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 , 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), label = 0)]
The next step is to fit the model in which the error usually appears. I tried installing one model and running CV (w / ParamGrid):
Single Model:
from pyspark.ml.classification import GBTClassifier from pyspark.ml.evaluation import BinaryClassificationEvaluator gbt = GBTClassifier(labelCol="label", featuresCol="features", maxDepth=8, maxBins=16, maxIter=40) GBT_model = gbt.fit(train_final_df) predictions_GBT = GBT_model.transform(test_final_df) predictions_GBT.cache() evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction") auroc = evaluator.evaluate(predictions_GBT, {evaluator.metricName: "areaUnderROC"}) aupr = evaluator.evaluate(predictions_GBT, {evaluator.metricName: "areaUnderPR"})
With CV / PG:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.classification import GBTClassifier GBT_model = GBTClassifier() paramGrid = ParamGridBuilder() \ .addGrid(GBT_model.maxDepth, [2,4]) \ .addGrid(GBT_model.maxBins, [2,4]) \ .addGrid(GBT_model.maxIter, [10,20]) \ .build() evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", metricName="areaUnderPR") crossval = CrossValidator(estimator=GBT_model, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) cvModel = crossval.fit(train_final_df)
I know that this has something to do with the interpreter, but I can’t understand: (a) what I am doing wrong or (b) What to do to get around this failure
UPDATE : I was asked about the versions and memory configuration in SO Apache Spark chat, so I decided that I was offering an update here.
Versions:
- Spark: 2.0.1
- Zeppelin: 0.6.2
Memory configuration:
- I am starting an EMR cluster using an instance of c1.xlarge EC2 (7 GiB) as my wizard and r3.8xlarge (244 GiB) for my main nodes.
- I went into Zeppelin and changed spark.driver.memory to 4g and spark.executor.memory to 128g
After I logged in and installed these Zeppelin memory configurations, I ran my code again and still got the same error.
I recently started using Spark, are there any other memory configurations that need to be installed? Are these memory configurations reasonable?