Equivalent to Spark Dataset for scala "collect", taking partial function

Regular scala collections have a great method collectthat allows me to perform an operation filter-mapin a single pass using a partial function. Is there an equivalent spark operation Datasets?

I would like for two reasons:

  • syntactic simplicity
  • it reduces style operations filter-mapto a single pass (although in the lawsuit I assume there are optimizations that define these things for you).

Here is an example to show what I mean. Suppose I have a sequence of parameters, and I want to extract and double only certain integers (in Some):

val input = Seq(Some(3), None, Some(-1), None, Some(4), Some(5)) 

Method 1 - collect

input.collect {
  case Some(value) => value * 2
} 
// List(6, -2, 8, 10)

collect does it pretty neatly syntactically and does one pass.

2 - filter-map

input.filter(_.isDefined).map(_.get * 2)

, .

, isDefined get . , Some s. . , , , .

3 - fold*

input.foldRight[List[Int]](Nil) {
  case (nextOpt, acc) => nextOpt match {
    case Some(next) => next*2 :: acc
    case None => acc
  }
}

, , , .

, , .


collect , , - .

+4
4

collect, RDD Dataset, .

, - Collections API collect, : , , .

, , flatMap, RDD, Dataset s:

// Assumes the usual spark-shell environment
// sc: SparkContext, spark: SparkSession
val collection = Seq(Some(1), None, Some(2), None, Some(3))
val rdd = sc.parallelize(collection)
val dataset = spark.createDataset(rdd)

// Both operations will yield `Array(2, 4, 6)`
rdd.flatMap(_.map(_ * 2)).collect
dataset.flatMap(_.map(_ * 2)).collect

// You can also express the operation in terms of a for-comprehension
(for (option <- rdd; n <- option) yield n * 2).collect
(for (option <- dataset; n <- option) yield n * 2).collect

// The same approach is valid for traditional collections as well
collection.flatMap(_.map(_ * 2))
for (option <- collection; n <- option) yield n * 2

, RDD collect, RDD, , . , Spark, , , .

+2

:

RDD API , /DataFrame RDD, collect , :

val dataset = Seq(Some(1), None, Some(2)).toDS()
val dsResult = dataset.rdd.collect { case Some(i) => i * 2 }.toDS()

, , , , ( , @stefanobaghino).

DataFrames, ( Option) , DataFrame "" ( null None), :

val dataframe = Seq(Some(1), None, Some(2)).toDF("opt")
dataframe.withColumn("opt", $"opt".multiply(2)).filter(not(isnull($"opt")))

, , , "" - .

+3

, , Spark.

RDD , . .collect(). . RDD.scala @line 955:

/**
 * Return an RDD that contains all matching values by applying `f`.
 */
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  filter(cleanF.isDefinedAt).map(cleanF)
}

RDD, .collect() RDD.scala @line 923:

/**
 * Return an array that contains all of the elements in this RDD.
 */
def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

,

def collect[U](f: PartialFunction[T, U]): RDD[U]

:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@collect[U](f:PartialFunction[T,U])(implicitevidence$29:scala.reflect.ClassTag[U]):org.apache.spark.rdd.RDD[U]

Spark, .


: ! , DataSets, RDD. , ,

, Spark, " , , ".

! .collect() - . .collect(partial_function) ​​ , .filter() .map() , .

+3

stefanobaghino, for case, , , case.

Also, parameters are monodes that make the accepted answer very simple in this case, since the forvalues ​​drop out neatly None, but this approach does not apply to non-monads, such as case classes:

case class A(b: Boolean, i: Int, d: Double)

val collection = Seq(A(true, 3), A(false, 10), A(true, -1))
val rdd = ...
val dataset = ...

// Select out and double all the 'i' values where 'b' is true:
for {
  A(b, i, _) <- dataset
  if b
} yield i * 2
+1
source

Source: https://habr.com/ru/post/1667745/


All Articles