With pyspark 1.4, I'm trying to use RegressionMetrics () for predictions created by LinearRegressionWithSGD.
All the examples for RegressionMetrics () given in the pyspark mllib documentation are for artificial forecasts and observations like
predictionAndObservations = sc.parallelize([ (2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)])
For such an "artificial" (generated using sc.parallelize) RDD, everything works fine. However, when you do the same with another RDD generated in a different way, I get
TypeError: DoubleType can not accept object in type <type 'numpy.float64'>
The following is a brief reproducible example.
What could be the problem?
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics
dataRDD = sc.parallelize([LabeledPoint(1, [1,1]), LabeledPoint(2, [2,2]), LabeledPoint(3, [3,3])])
lrModel = LinearRegressionWithSGD.train(dataRDD)
prediObserRDD = dataRDD.map(lambda p: (lrModel.predict(p.features), p.label)).cache()
Let's see if RDD (prediction pairs, observations) are really
prediObserRDD.take(4)
Now try to calculate indicators
metrics = RegressionMetrics(prediObserRDD)
It gives the following error:
TypeError Traceback (most recent call last)
<ipython-input-1-ca9ad8e9faf1> in <module>()
7 prediObserRDD = dataRDD.map(lambda p: (lrModel.predict(p.features), p.label)).cache()
8 prediObserRDD.take(4)
10
11
/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/mllib/evaluation.py in __init__(self, predictionAndObservations)
99 df = sql_ctx.createDataFrame(predictionAndObservations, schema=StructType([
100 StructField("prediction", DoubleType(), nullable=False),
102 java_class = sc._jvm.org.apache.spark.mllib.evaluation.RegressionMetrics
103 java_model = java_class(df._jdf)
/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio)
337
338 for row in rows:
340
341
/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/types.py in _verify_type(obj, dataType)
1027 "length of fields (%d)" % (len(obj), len(dataType.fields)))
1028 for v, f in zip(obj, dataType.fields):
-> 1029 _verify_type(v, f.dataType)
1030
1031 _cached_cls = weakref.WeakValueDictionary()
/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/types.py in _verify_type(obj, dataType)
1011 if type(obj) not in _acceptable_types[_type]:
1012 raise TypeError("%s can not accept object in type %s"
-> 1013 % (dataType, type(obj)))
1014
1015 if isinstance(dataType, ArrayType):
TypeError: DoubleType can not accept object in type <type 'numpy.float64'>
BinaryClassificationMetrics.