Using Word2VecModel.transform () does not work in map function

I created a Word2Vec model using Spark and saved it as a model. Now I want to use it in another code as a standalone model. I downloaded the model and used it to represent a word vector (e.g. Hello) and it works well. But I need to name it for many words in RDD using a map.

When I call model.transform () in the map function, it throws this error:

"It looks like you are trying to reference a SparkContext from a broadcast" Exception: It looks like you are trying to reference a SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used in the driver, and not in the code that it runs on workers. See SPARK-5063 for more information.

code:

from pyspark import SparkContext
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import Word2VecModel

sc = SparkContext('local[4]',appName='Word2Vec')

model=Word2VecModel.load(sc, "word2vecModel")

x= model.transform("Hello")
print(x[0]) # it works fine and returns [0.234, 0.800,....]

y=sc.parallelize([['Hello'],['test']])
y.map(lambda w: model.transform(w[0])).collect() #it throws the error

.

+4
1

. MLlib, Python - Scala, JVM. Py4J (. Java/Scala ?), Java/ Scala .

MLlib , RDD, . Word2VecModel getVectors, , , , JavaMap, . - :

from pyspark.mllib.linalg import DenseVector

vectors_ = model.getVectors() # py4j.java_collections.JavaMap
vectors = {k: DenseVector([x for x in vectors_.get(k)])
    for k in vectors_.keys()}

Python, . , Python, Py4J, . DataFrame:

lookup = sqlContext.read.parquet("path_to_word2vec_model/data").alias("lookup")

:

lookup.printSchema()
## root
## |-- word: string (nullable = true)
## |-- vector: array (nullable = true)
## |    |-- element: float (containsNull = true)

, , join:

from pyspark.sql.functions import col

words = sc.parallelize([('hello', ), ('test', )]).toDF(["word"]).alias("words")

words.join(lookup, col("words.word") == col("lookup.word"))

## +-----+-----+--------------------+
## | word| word|              vector|
## +-----+-----+--------------------+
## |hello|hello|[-0.030862354, -0...|
## | test| test|[-0.13154022, 0.2...|
## +-----+-----+--------------------+

/, :

lookup_bd = sc.broadcast(lookup.rdd.collectAsMap())
rdd = sc.parallelize([['Hello'],['test']])
rdd.map(lambda ws: [lookup_bd.value.get(w) for w in ws])
+6

Source: https://habr.com/ru/post/1621576/


All Articles