How can we join two Spark SQL frameworks using the "LIKE" SQL-esque criterion?

We use PySpark libraries that interact with Spark 1.3.1.

We have two documents_df := {document_id, document_text} data frames documents_df := {document_id, document_text} and keywords_df := {keyword} . We would like to JOIN two data frames and return the resulting frame with the {document_id, keyword} pairs, using the criteria in which the keyword keyword_df.key appears in the line document_df.document_text.

In PostgreSQL, for example, we could achieve this using the ON clause of the form:

document_df.document_text ilike '%' || keyword_df.keyword || '%'

However, in PySpark, I cannot get any form syntax to work. Has anyone achieved something like this before?

Yours faithfully,

Will

+4
source share
1 answer

This is possible in two different ways, but, generally speaking, is not recommended. First, let's create dummy data:

 from pyspark.sql import Row document_row = Row("document_id", "document_text") keyword_row = Row("keyword") documents_df = sc.parallelize([ document_row(1L, "apache spark is the best"), document_row(2L, "erlang rocks"), document_row(3L, "but haskell is better") ]).toDF() keywords_df = sc.parallelize([ keyword_row("erlang"), keyword_row("haskell"), keyword_row("spark") ]).toDF() 
  • Street UDF

     documents_df.registerTempTable("documents") keywords_df.registerTempTable("keywords") query = """SELECT document_id, keyword FROM documents JOIN keywords ON document_text LIKE CONCAT('%', keyword, '%')""" like_with_hive_udf = sqlContext.sql(query) like_with_hive_udf.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 1| spark| ## | 2| erlang| ## | 3|haskell| ## +-----------+-------+ 
  • Python UDF

     from pyspark.sql.functions import udf, col from pyspark.sql.types import BooleanType # Of you can replace `in` with a regular expression contains = udf(lambda s, q: q in s, BooleanType()) like_with_python_udf = (documents_df.join(keywords_df) .where(contains(col("document_text"), col("keyword"))) .select(col("document_id"), col("keyword"))) like_with_python_udf.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 1| spark| ## | 2| erlang| ## | 3|haskell| ## +-----------+-------+ 

Why not recommended? Since in both cases it requires a Cartesian product:

 like_with_hive_udf.explain() ## TungstenProject [document_id#2L,keyword#4] ## Filter document_text#3 LIKE concat(%,keyword#4,%) ## CartesianProduct ## Scan PhysicalRDD[document_id#2L,document_text#3] ## Scan PhysicalRDD[keyword#4] like_with_python_udf.explain() ## TungstenProject [document_id#2L,keyword#4] ## Filter pythonUDF#13 ## !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3,keyword#4), ... ## CartesianProduct ## Scan PhysicalRDD[document_id#2L,document_text#3] ## Scan PhysicalRDD[keyword#4] 

There are other ways to achieve a similar effect without a complete Cartesian.

  • Joining a tokenized document - useful if the list of keywords should be large in order to be processed in the memory of one machine

     from pyspark.ml.feature import Tokenizer from pyspark.sql.functions import explode tokenizer = Tokenizer(inputCol="document_text", outputCol="words") tokenized = (tokenizer.transform(documents_df) .select(col("document_id"), explode(col("words")).alias("token"))) like_with_tokenizer = (tokenized .join(keywords_df, col("token") == col("keyword")) .drop("token")) like_with_tokenizer.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 3|haskell| ## | 1| spark| ## | 2| erlang| ## +-----------+-------+ 

    This requires shuffling, but not Cartesian:

     like_with_tokenizer.explain() ## TungstenProject [document_id#2L,keyword#4] ## SortMergeJoin [token#29], [keyword#4] ## TungstenSort [token#29 ASC], false, 0 ## TungstenExchange hashpartitioning(token#29) ## TungstenProject [document_id#2L,token#29] ## !Generate explode(words#27), true, false, [document_id#2L, ... ## ConvertToSafe ## TungstenProject [document_id#2L,UDF(document_text#3) AS words#27] ## Scan PhysicalRDD[document_id#2L,document_text#3] ## TungstenSort [keyword#4 ASC], false, 0 ## TungstenExchange hashpartitioning(keyword#4) ## ConvertToUnsafe ## Scan PhysicalRDD[keyword#4] 
  • Python UDF and broadcast variable - if the list of keywords is relatively small

     from pyspark.sql.types import ArrayType, StringType keywords = sc.broadcast(set( keywords_df.map(lambda row: row[0]).collect())) bd_contains = udf( lambda s: list(set(s.split()) & keywords.value), ArrayType(StringType())) like_with_bd = (documents_df.select( col("document_id"), explode(bd_contains(col("document_text"))).alias("keyword"))) like_with_bd.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 1| spark| ## | 2| erlang| ## | 3|haskell| ## +-----------+-------+ 

    It does not require shuffling, nor Cartesian, but you still have to pass a broadcast variable to each node worker.

     like_with_bd.explain() ## TungstenProject [document_id#2L,keyword#46] ## !Generate explode(pythonUDF#47), true, false, ... ## ConvertToSafe ## TungstenProject [document_id#2L,pythonUDF#47] ## !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3), ... ## Scan PhysicalRDD[document_id#2L,document_text#3] 
  • Starting with Spark 1.6.0, you can mark a small data frame using sql.functions.broadcast to get the same effect as above without using UDF and explicit broadcast variables. Reuse of tokenized data:

     from pyspark.sql.functions import broadcast like_with_tokenizer_and_bd = (broadcast(tokenized) .join(keywords_df, col("token") == col("keyword")) .drop("token")) like_with_tokenizer.explain() ## TungstenProject [document_id#3L,keyword#5] ## BroadcastHashJoin [token#10], [keyword#5], BuildLeft ## TungstenProject [document_id#3L,token#10] ## !Generate explode(words#8), true, false, ... ## ConvertToSafe ## TungstenProject [document_id#3L,UDF(document_text#4) AS words#8] ## Scan PhysicalRDD[document_id#3L,document_text#4] ## ConvertToUnsafe ## Scan PhysicalRDD[keyword#5] 

Similar

+11
source

Source: https://habr.com/ru/post/1240994/


All Articles