Spark> = 2.2
There is a DataFramebase mlAPI that provides AssociationRules:
from pyspark.ml.fpm import FPGrowth
data = ...
fpm = FPGrowth(minSupport=0.3, minConfidence=0.9).fit(data)
associationRules = fpm.associationRules.
Spark & โโlt; 2.2
PySpark (DataFrame FPGrowth API Python - SPARK-1450), .
SBT ( ) .
Scala :
.
โโโ AssociationRulesExtractor.scala
โโโ build.sbt
, .
build.sbt ( Scala Spark , ):
name := "fpm"
version := "1.0"
scalaVersion := "2.10.6"
val sparkVersion = "1.6.2"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
AssociationRulesExtractor.scala:
package com.example.fpm
import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.rdd.RDD
object AssociationRulesExtractor {
def apply(rdd: RDD[Rule[String]]) = {
rdd.map(rule => Array(
rule.confidence, rule.javaAntecedent, rule.javaConsequent
))
}
}
, :
sbt package
jar . , Scala 2.10 :
target/scala-2.10/fpm_2.10-1.0.jar
PySpark spark-submit jar --driver-class-path:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar
:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar
.
:
from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowthModel
from pyspark.mllib.common import _java2py
from collections import namedtuple
rule = namedtuple("Rule", ["confidence", "antecedent", "consequent"])
def generateAssociationRules(model, minConfidence):
sc = SparkContext.getOrCreate()
extractor = sc._gateway.jvm.com.example.fpm.AssociationRulesExtractor
java_rules = model._java_model.generateAssociationRules(minConfidence)
return _java2py(sc, extractor.apply(java_rules)).map(lambda x:rule(*x))
, :
generateAssociationRules(model, 0.9)
:
FPGrowthModel.generateAssociationRules = generateAssociationRules
model.generateAssociationRules(0.9)
PySpark, , .