Spark Ml Evaluation Method

I have a spark data frame as shown below:

predictions.show(5) +------+----+------+-----------+ | user|item|rating| prediction| +------+----+------+-----------+ |379433| 31| 1| 0.08203495| | 1834| 31| 1| 0.4854447| |422635| 31| 1|0.017672742| | 839| 31| 1| 0.39273006| | 51444| 31| 1| 0.09795039| +------+----+------+-----------+ only showing top 5 rows 

The forecast is the projected ratings, and the rating is an implicit rating (quantity).

Now I want to check the AUC of my recommendation algorithm.

First I tried pyspark.ml.BinaryClassificationEvaluator as it works directly with the data frame.

 # getting the evaluationa metric from pyspark.ml.evaluation import BinaryClassificationEvaluator evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction") print evaluator.evaluate(predictions) 

This gives me the following error:

 --------------------------------------------------------------------------- IllegalArgumentException Traceback (most recent call last) <ipython-input-65-c642ea9c2cf5> in <module>() 4 5 evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction") ----> 6 print evaluator.evaluate(predictions) 7 8 #print evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"}) /Users/i854319/spark/python/pyspark/ml/evaluation.py in evaluate(self, dataset, params) 67 return self.copy(params)._evaluate(dataset) 68 else: ---> 69 return self._evaluate(dataset) 70 else: 71 raise ValueError("Params must be a param map but got %s." % type(params)) /Users/i854319/spark/python/pyspark/ml/evaluation.py in _evaluate(self, dataset) 97 """ 98 self._transfer_params_to_java() ---> 99 return self._java_obj.evaluate(dataset._jdf) 100 101 def isLargerBetter(self): /Users/i854319/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 811 answer = self.gateway_client.send_command(command) 812 return_value = get_return_value( --> 813 answer, self.gateway_client, self.target_id, self.name) 814 815 for temp_arg in temp_args: /Users/i854319/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw) 51 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 52 if s.startswith('java.lang.IllegalArgumentException: '): ---> 53 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace) 54 raise 55 return deco IllegalArgumentException: u'requirement failed: Column prediction must be of type org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually FloatType.' 

So I tried the pyspark.mllib.evaluation BinaryClassificationMetrics method

To do this, I needed RDD (points, label). Therefore, from the same frame of the forecast data, I compared the last two columns as a list of tuples. Used the following display function

 ### Creating an RDD of Scores and Prediction values from Validation dataset def getScoresnLabels(x): """ This function takes the valdiation or test dataset and maps the raw and actual scores together as one RDD """ data_row=x.asDict() ret_tuple=(data_row['prediction'],data_row['rating']) return ret_tuple scoresnLabels=predictions.map(getScoresnLabels) 

It looks like below:

 scoresnLabels.take(5) Out[81]: [(0.08203495293855667, 1), (0.48544469475746155, 1), (0.017672741785645485, 1), (0.39273005723953247, 1), (0.09795039147138596, 1)] 

Then I use this in the next Evaluator.

 ### Using the mllib evaluation metric from pyspark.mllib.evaluation import BinaryClassificationMetrics metrics=BinaryClassificationMetrics(scoresnLabels) metrics.areaUnderROC 

But I get the following error:

I am very confused now.

*** 1) Firstly, why are there two ML packages in Spark and which one is suitable. Both have different syntax, and there is no sequence in calling methods similar to those found in Sci-kit learn packages.

2) Secondly, why am I getting an error if I try to use both packages. ***

 Exception AttributeError: "'BinaryClassificationMetrics' object has no attribute '_sc'" in <bound method BinaryClassificationMetrics.__del__ of <pyspark.mllib.evaluation.BinaryClassificationMetrics object at 0x126483d50>> ignored --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-82-81c08d4e6f1d> in <module>() 3 from pyspark.mllib.evaluation import BinaryClassificationMetrics 4 metrics=BinaryClassificationMetrics(scoresnLabels) ----> 5 metrics.areaUnderROC /Users/i854319/spark/python/pyspark/mllib/evaluation.py in areaUnderROC(self) 60 (ROC) curve. 61 """ ---> 62 return self.call("areaUnderROC") 63 64 @property /Users/i854319/spark/python/pyspark/mllib/common.pyc in call(self, name, *a) 144 def call(self, name, *a): 145 """Call method of java_model""" --> 146 return callJavaFunc(self._sc, getattr(self._java_model, name), *a) 147 148 /Users/i854319/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args) 121 """ Call Java Function """ 122 args = [_py2java(sc, a) for a in args] --> 123 return _java2py(sc, func(*args)) 124 125 /Users/i854319/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 811 answer = self.gateway_client.send_command(command) 812 return_value = get_return_value( --> 813 answer, self.gateway_client, self.target_id, self.name) 814 815 for temp_arg in temp_args: /Users/i854319/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw) 43 def deco(*a, **kw): 44 try: ---> 45 return f(*a, **kw) 46 except py4j.protocol.Py4JJavaError as e: 47 s = e.java_exception.toString() /Users/i854319/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 306 raise Py4JJavaError( 307 "An error occurred while calling {0}{1}{2}.\n". --> 308 format(target_id, ".", name), value) 309 else: 310 raise Py4JError( Py4JJavaError: An error occurred while calling o562.areaUnderROC. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1505.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1505.0 (TID 9224, localhost): java.lang.NullPointerException: Value at index 1 in null at org.apache.spark.sql.Row$class.getAnyValAs(Row.scala:475) at org.apache.spark.sql.Row$class.getDouble(Row.scala:243) at org.apache.spark.sql.catalyst.expressions.GenericRow.getDouble(rows.scala:192) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics$$anonfun$$init$$1.apply(BinaryClassificationMetrics.scala:61) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics$$anonfun$$init$$1.apply(BinaryClassificationMetrics.scala:61) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:264) at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:126) at org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62) at org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:61) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.OrderedRDDFunctions.sortByKey(OrderedRDDFunctions.scala:61) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4$lzycompute(BinaryClassificationMetrics.scala:153) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4(BinaryClassificationMetrics.scala:144) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:146) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:146) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:222) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:85) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:96) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException: Value at index 1 in null at org.apache.spark.sql.Row$class.getAnyValAs(Row.scala:475) at org.apache.spark.sql.Row$class.getDouble(Row.scala:243) at org.apache.spark.sql.catalyst.expressions.GenericRow.getDouble(rows.scala:192) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics$$anonfun$$init$$1.apply(BinaryClassificationMetrics.scala:61) at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics$$anonfun$$init$$1.apply(BinaryClassificationMetrics.scala:61) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more 

UPDATE:

ALS code for forecasts

 from pyspark.ml.recommendation import ALS # Build the recommendation model using ALS on the training data als = ALS(rank=120, maxIter=15, regParam=0.01, implicitPrefs=True) model = als.fit(train) predictions=model.transform(validation) 
+8
source share

Source: https://habr.com/ru/post/1011923/


All Articles