How to use pyspark mllib RegressionMetrics with real forecasts

With pyspark 1.4, I'm trying to use RegressionMetrics () for predictions created by LinearRegressionWithSGD.

All the examples for RegressionMetrics () given in the pyspark mllib documentation are for artificial forecasts and observations like

predictionAndObservations = sc.parallelize([ (2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)])

For such an "artificial" (generated using sc.parallelize) RDD, everything works fine. However, when you do the same with another RDD generated in a different way, I get

TypeError: DoubleType can not accept object in type <type 'numpy.float64'>

The following is a brief reproducible example.

What could be the problem?

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics

dataRDD = sc.parallelize([LabeledPoint(1, [1,1]), LabeledPoint(2, [2,2]), LabeledPoint(3, [3,3])])
lrModel = LinearRegressionWithSGD.train(dataRDD)
prediObserRDD = dataRDD.map(lambda p: (lrModel.predict(p.features), p.label)).cache()

Let's see if RDD (prediction pairs, observations) are really

prediObserRDD.take(4) # looks OK

Now try to calculate indicators

metrics = RegressionMetrics(prediObserRDD)

It gives the following error:

TypeError                                 Traceback (most recent call last)
<ipython-input-1-ca9ad8e9faf1> in <module>()
      7 prediObserRDD = dataRDD.map(lambda p: (lrModel.predict(p.features), p.label)).cache()
      8 prediObserRDD.take(4)
----> 9 metrics = RegressionMetrics(prediObserRDD)
     10 #metrics.explainedVariance
     11 #metrics.meanAbsoluteError

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/mllib/evaluation.py in __init__(self, predictionAndObservations)
     99         df = sql_ctx.createDataFrame(predictionAndObservations, schema=StructType([
    100             StructField("prediction", DoubleType(), nullable=False),
--> 101             StructField("observation", DoubleType(), nullable=False)]))
    102         java_class = sc._jvm.org.apache.spark.mllib.evaluation.RegressionMetrics
    103         java_model = java_class(df._jdf)

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio)
    337 
    338         for row in rows:
--> 339             _verify_type(row, schema)
    340 
    341         # convert python objects to sql data

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/types.py in _verify_type(obj, dataType)
   1027                              "length of fields (%d)" % (len(obj), len(dataType.fields)))
   1028         for v, f in zip(obj, dataType.fields):
-> 1029             _verify_type(v, f.dataType)
   1030 
   1031 _cached_cls = weakref.WeakValueDictionary()

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/types.py in _verify_type(obj, dataType)
   1011     if type(obj) not in _acceptable_types[_type]:
   1012         raise TypeError("%s can not accept object in type %s"
-> 1013                         % (dataType, type(obj)))
   1014 
   1015     if isinstance(dataType, ArrayType):

TypeError: DoubleType can not accept object in type <type 'numpy.float64'>

BinaryClassificationMetrics.

+4
1

, TypeError: DoubleType can not accept object in type <type 'numpy.float64'>

numpy.float64 Double, .

TypeError, .

:

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics

dataRDD = sc.parallelize([LabeledPoint(1, [1,1]), LabeledPoint(2, [2,2]), LabeledPoint(3, [3,3])])
lrModel = LinearRegressionWithSGD.train(dataRDD)
prediObserRDD = dataRDD.map(lambda p: (float(lrModel.predict(p.features)), p.label)).cache()

, , float Python.

:

>>> metrics = RegressionMetrics(prediObserRDD)
>>> metrics.explainedVariance
1.0
+6

Source: https://habr.com/ru/post/1598184/


All Articles