FPgrowth computation combination in pyspark vs scala

Usage :

http://spark.apache.org/docs/1.6.1/mllib-frequent-pattern-mining.html

Python Code:

from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(dataframe,0.01,10)

Scala:

import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD

val data = sc.textFile("data/mllib/sample_fpgrowth.txt")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

val fpg = new FPGrowth()
  .setMinSupport(0.2)
  .setNumPartitions(10)
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
  println(
    rule.antecedent.mkString("[", ",", "]")
      + " => " + rule.consequent .mkString("[", ",", "]")
      + ", " + rule.confidence)
}

From the code here, it shows that the scala part does not have minimal confidence.

def trainFPGrowthModel(
      data: JavaRDD[java.lang.Iterable[Any]],
      minSupport: Double,
      numPartitions: Int): FPGrowthModel[Any] = {
    val fpg = new FPGrowth()
      .setMinSupport(minSupport)
      .setNumPartitions(numPartitions)

    val model = fpg.run(data.rdd.map(_.asScala.toArray))
    new FPGrowthModelWrapper(model)
  }

How to add minConfidence to generate association rules in case of pyspark? We can see that scala has an example, but python has no example.

+4
source share
2 answers

Spark> = 2.2

There is a DataFramebase mlAPI that provides AssociationRules:

from pyspark.ml.fpm import FPGrowth

data = ...

fpm = FPGrowth(minSupport=0.3, minConfidence=0.9).fit(data)
associationRules = fpm.associationRules.

Spark & โ€‹โ€‹lt; 2.2

PySpark (DataFrame FPGrowth API Python - SPARK-1450), .

SBT ( ) .

Scala :

.
โ”œโ”€โ”€ AssociationRulesExtractor.scala
โ””โ”€โ”€ build.sbt

, .

build.sbt ( Scala Spark , ):

name := "fpm"

version := "1.0"

scalaVersion := "2.10.6"

val sparkVersion = "1.6.2"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion
)

AssociationRulesExtractor.scala:

package com.example.fpm

import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.rdd.RDD

object AssociationRulesExtractor {
  def apply(rdd: RDD[Rule[String]]) = {
    rdd.map(rule => Array(
      rule.confidence, rule.javaAntecedent, rule.javaConsequent
    ))
  }
}

, :

sbt package

jar . , Scala 2.10 :

target/scala-2.10/fpm_2.10-1.0.jar

PySpark spark-submit jar --driver-class-path:

bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar

:

bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar

.

:

from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowthModel
from pyspark.mllib.common import _java2py
from collections import namedtuple


rule = namedtuple("Rule", ["confidence", "antecedent", "consequent"])

def generateAssociationRules(model, minConfidence):
    # Get active context
    sc = SparkContext.getOrCreate()

    # Retrieve extractor object
    extractor = sc._gateway.jvm.com.example.fpm.AssociationRulesExtractor

    # Compute rules
    java_rules = model._java_model.generateAssociationRules(minConfidence)

    # Convert rules to Python RDD
    return _java2py(sc, extractor.apply(java_rules)).map(lambda x:rule(*x))

, :

generateAssociationRules(model, 0.9)

:

FPGrowthModel.generateAssociationRules = generateAssociationRules
model.generateAssociationRules(0.9)

PySpark, , .

+4

PySpark, Spark & โ€‹โ€‹lt; 2.2 py4j:

# model was produced by FPGrowth.train() method
rules = sorted(model._java_model.generateAssociationRules(0.9).collect(), 
    key=lambda x: x.confidence(), reverse=True)
for rule in rules[:200]:
    # rule variable has confidence(), consequent() and antecedent() 
    # methods for individual value access.
    print rule
0

Source: https://habr.com/ru/post/1658085/


All Articles