RDD drivers and actions can only be called by the driver

Mistake:

org.apache.spark.SparkException: RDD transformations and actions can only be called by the driver, and not inside other transformations; for example, rdd1.map (x => rdd2.values.count () * x) is invalid because the conversion of values ​​and the counting action cannot be performed inside the conversion of rdd1.map. See SPARK-5063 for more information.

def computeRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
  val numDistinctUsers = test_data.map(x => x.user).distinct().count()
  val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
    (u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
  })
  val hitsAndMiss: RDD[(Int, Double)] = userRecs.map(x => (x._1, x._2.intersect(x._3).size.toDouble))

  val hits = hitsAndMiss.map(x => x._2).sum() / numDistinctUsers

  return hits
}

I use the method in MatrixFactorizationModel.scala, I need to map users, and then call the method to get the results for each user. By doing this, I introduce a nested mapping which, in my opinion, is causing the problem:

I know that the problem is actually happening at:

val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
  (u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
})

Because while the mapping over I call model.recommendProducts

+4
1

MatrixFactorizationModel - , . , , :

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating}

def computeRatio(model: MatrixFactorizationModel, testUsers: RDD[Rating]) = {
  val testData = testUsers.map(r => (r.user, r.product)).groupByKey
  val n = testData.count

  val recommendations = model
     .recommendProductsForUsers(20)
     .mapValues(_.map(r => r.product))

  val hits = testData
    .join(recommendations)
    .values
    .map{case (xs, ys) => xs.toSet.intersect(ys.toSet).size}
    .sum

  hits / n
}

:

  • distinct ,
  • groupBy, (map), . , .
+2

Source: https://habr.com/ru/post/1615141/


All Articles