Spark.ml StringIndexer casts an "Invisible shortcut" to fit ()

I am preparing a toy spark.ml example. Spark version 1.6.0 , running on top of Oracle JDK version 1.8.0_65 , pyspark, ipython laptop.

Firstly, it hardly has anything to do with Spark, ML, StringIndexer: handling invisible shortcuts . An exception occurs when installing a pipeline in a dataset, and not on converting it. And suppressing the exception may not be the solution here, as I'm afraid the data set will be messy in this case.

My dataset is about 800 MB without compression, so it's hard to reproduce (smaller subsets seem to shy away from this problem).

The data set is as follows:

 +--------------------+-----------+-----+-------+-----+--------------------+ | url| ip| rs| lang|label| txt| +--------------------+-----------+-----+-------+-----+--------------------+ |http://3d-detmold...|217.160.215|378.0| de| 0.0|homwillkommskip c...| | http://3davto.ru/| 188.225.16|891.0| id| 1.0|  ...| | http://404.szm.com/| 85.248.42| 58.0| cs| 0.0|kliknite tu alebo...| | http://404.xls.hu/| 212.52.166|168.0| hu| 0.0|honlapkészítés404...| |http://a--m--a--t...| 66.6.43|462.0| en| 0.0|back top archiv r...| |http://a-wrf.ru/c...| 78.108.80|126.0|unknown| 1.0| | |http://a-wrf.ru/s...| 78.108.80|214.0| ru| 1.0| ...| +--------------------+-----------+-----+-------+-----+--------------------+ 

The predicted value of the label . The entire pipeline applied to it:

 from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF from pyspark.ml.classification import LogisticRegression train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345) pipe_stages = [ StringIndexer(inputCol='lang', outputCol='lang_idx'), OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'), Tokenizer(inputCol='ip', outputCol='ip_tokens'), HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'), Tokenizer(inputCol='txt', outputCol='txt_tokens'), HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'), VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'), LogisticRegression(labelCol='label', featuresCol='features') ] pipe = Pipeline(stages=pipe_stages) pipemodel = pipe.fit(train) 

And here is the stacktrace:

 Py4JJavaError: An error occurred while calling o10793.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL. at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159) at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Unseen label: pl-PL. at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more 

The most interesting line:

 org.apache.spark.SparkException: Unseen label: pl-PL. 

I don't know how pl-PL , which is the value from the lang column, could be implicated in the label column, which is an edited float , not a string : some hasty sayings fixed thanks to @ zero323

I looked further and found that pl-PL is the value from the test part of the dataset, and not for training. So now I don’t even know where to look for the culprit: it can easily be randomSplit code, not StringIndexer , and who knows what else.

How do I know this?

+8
source share
2 answers

Ok, I think I got it. At least I got this job.

Caching a data frame (including trains / test pieces) solves the problem. This is what I found in this JIRA issue: https://issues.apache.org/jira/browse/SPARK-12590 .

Thus, this is not an error, but only the fact that randomSample can give a different result on the same, but in a differently divided data set. And, apparently, some of my munging (or Pipeline ) functions are related to redistribution, so the results of the train recalculation from its definition may differ.

What interests me is reproducibility: it is always the pl-PL string, which is mixed in the wrong part of the data set, i.e. not a random redistribution. It is deterministic, simply inconsistent. I wonder how exactly this happens.

+3
source

Unseen label is a generic message that does not match a specific column . Most likely the problem is in the next step:

 StringIndexer(inputCol='lang', outputCol='lang_idx') 

with pl-PL present in train("lang") and not present in test("lang") .

You can fix this using setHandleInvalid with skip :

 from pyspark.ml.feature import StringIndexer train = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["k", "v"]) test = sc.parallelize([(3, "foo"), (4, "foobar")]).toDF(["k", "v"]) indexer = StringIndexer(inputCol="v", outputCol="vi") indexer.fit(train).transform(test).show() ## Py4JJavaError: An error occurred while calling o112.showString. ## : org.apache.spark.SparkException: Job aborted due to stage failure: ## ... ## org.apache.spark.SparkException: Unseen label: foobar. indexer.setHandleInvalid("skip").fit(train).transform(test).show() ## +---+---+---+ ## | k| v| vi| ## +---+---+---+ ## | 3|foo|1.0| ## +---+---+---+ 

or, in recent versions, keep :

 indexer.setHandleInvalid("keep").fit(train).transform(test).show() ## +---+------+---+ ## | k| v| vi| ## +---+------+---+ ## | 3| foo|0.0| ## | 4|foobar|2.0| ## +---+------+---+ 
+7
source

Source: https://habr.com/ru/post/1242371/


All Articles