Choosing a range of elements in a spark sql array

I use spark-shell to perform the following operations.

Recently uploaded a table with an array column in spark-sql.

Here is the DDL for the same:

 create table test_emp_arr{ dept_id string, dept_nm string, emp_details Array<string> } 

the data looks something like this

 +-------+-------+-------------------------------+ |dept_id|dept_nm| emp_details| +-------+-------+-------------------------------+ | 10|Finance|[Jon, Snow, Castle, Black, Ned]| | 20| IT| [Ned, is, no, more]| +-------+-------+-------------------------------+ 

I can query the emp_details column something like this:

 sqlContext.sql("select emp_details[0] from emp_details").show 

problem

I want to request a range of elements in a collection:

Expected Job Request

 sqlContext.sql("select emp_details[0-2] from emp_details").show 

or

 sqlContext.sql("select emp_details[0:2] from emp_details").show 

Expected Result

 +-------------------+ | emp_details| +-------------------+ |[Jon, Snow, Castle]| | [Ned, is, no]| +-------------------+ 

In pure Scala, if I have an array, for example:

 val emp_details = Array("Jon","Snow","Castle","Black") 

I can get elements from 0 to 2 ranges using

 emp_details.slice(0,3) 

brings me back

 Array(Jon, Snow,Castle) 

I cannot apply the above array operation in spark-sql.

thanks

+8
source share
8 answers

Here is a solution using a user-defined function that has the advantage of working with any slice size you want. It simply creates a UDF function based on the built-in slice scala method:

 import sqlContext.implicits._ import org.apache.spark.sql.functions._ val slice = udf((array : Seq[String], from : Int, to : Int) => array.slice(from,to)) 

Example with a sample of your data:

 val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details") df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).show 

Produces the expected result

 +--------------------+-------------------+ | emp_details| slice| +--------------------+-------------------+ |[Jon, Snow, Castl...|[Jon, Snow, Castle]| +--------------------+-------------------+ 

You can also register UDF in your sqlContext and use it as follows

 sqlContext.udf.register("slice", (array : Seq[String], from : Int, to : Int) => array.slice(from,to)) sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice(array('Jonβ€Œβ€‹','Snow','Castle','Black','Ned'),0,3)") 

You don't need lit anymore this solution

+8
source

Edit2: for those who want to avoid udf due to readability; -)

If you really want to do this in one step, you will need to use Scala to create a lambda function that returns a Column sequence and transfers it to an array. This is a bit related, but this is one step:

 val df = List(List("Jon", "Snow", "Castle", "Black", "Ned")).toDF("emp_details") df.withColumn("slice", array((0 until 3).map(i => $"emp_details"(i)):_*)).show(false) +-------------------------------+-------------------+ |emp_details |slice | +-------------------------------+-------------------+ |[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]| +-------------------------------+-------------------+ 

_:* works a bit of magic to pass a list of the so-called variational function ( array in this case, which will build an sql array). But I would advise using this solution as is. put a lambda function in a named function

 def slice(from: Int, to: Int) = array((from until to).map(i => $"emp_details"(i)):_*)) 

to read the code. Note that generally sticking to Column expressions (without using `udf) has better characteristics.

Edit: to do this in an SQL statement (as you ask in your question ...), following the same logic, you must generate an SQL query using Scala logic (not counting it as the most readable)

 def sliceSql(emp_details: String, from: Int, to: Int): String = "Array(" + (from until to).map(i => "emp_details["+i.toString+"]").mkString(",") + ")" val sqlQuery = "select emp_details,"+ sliceSql("emp_details",0,3) + "as slice from emp_details" sqlContext.sql(sqlQuery).show +-------------------------------+-------------------+ |emp_details |slice | +-------------------------------+-------------------+ |[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]| +-------------------------------+-------------------+ 

note that you can replace until with to to provide the last element, not an element in which iteration ends.

+3
source

Starting with Spark 2.4, you can use the slice function. In Python ):

 pyspark.sql.functions.slice(x, start, length) 

Collection function: returns an array containing all elements in x from the beginning of the index (or starting from the end, if the beginning is negative) with the specified length.

...

New in version 2.4.

 from pyspark.sql.functions import slice df = spark.createDataFrame([ (10, "Finance", ["Jon", "Snow", "Castle", "Black", "Ned"]), (20, "IT", ["Ned", "is", "no", "more"]) ], ("dept_id", "dept_nm", "emp_details")) df.select(slice("emp_details", 1, 3).alias("empt_details")).show() 
 +-------------------+ | empt_details| +-------------------+ |[Jon, Snow, Castle]| | [Ned, is, no]| +-------------------+ 

In scala

 def slice(x: Column, start: Int, length: Int): Column 

Returns an array containing all elements at x from the beginning of the index (or starting from the end, if the beginning is negative) with the specified length.

 import org.apache.spark.sql.functions.slice val df = Seq( (10, "Finance", Seq("Jon", "Snow", "Castle", "Black", "Ned")), (20, "IT", Seq("Ned", "is", "no", "more")) ).toDF("dept_id", "dept_nm", "emp_details") df.select(slice($"emp_details", 1, 3) as "empt_details").show 
 +-------------------+ | empt_details| +-------------------+ |[Jon, Snow, Castle]| | [Ned, is, no]| +-------------------+ 

The same can, of course, be done in SQL

 SELECT slice(emp_details, 1, 3) AS emp_details FROM df 

Important :

Note that unlike Seq.slice , values ​​are indexed from scratch, and the second argument is the length, not the end position.

+3
source

You can use the array function to build a new array of three values:

 import org.apache.spark.sql.functions._ val input = sqlContext.sql("select emp_details from emp_details") val arr: Column = col("emp_details") val result = input.select(array(arr(0), arr(1), arr(2)) as "emp_details") val result.show() // +-------------------+ // | emp_details| // +-------------------+ // |[Jon, Snow, Castle]| // | [Ned, is, no]| // +-------------------+ 
+1
source

use the selecrExpr () and split () functions in apache sparks.

eg:

 fs.selectExpr("((split(emp_details, ','))[0]) as e1,((split(emp_details, ','))[1]) as e2,((split(emp_details, ','))[2]) as e3); 
0
source

Here is my general UDF slice supporting an array of any type. A bit ugly because you need to know the type of the element in advance.

 import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ def arraySlice(arr: Seq[AnyRef], from: Int, until: Int): Seq[AnyRef] = if (arr == null) null else arr.slice(from, until) def slice(elemType: DataType): UserDefinedFunction = udf(arraySlice _, ArrayType(elemType) fs.select(slice(StringType)($"emp_details", 1, 2)) 
0
source

For those of you who stick with Spark <2.4 and don't have the slice function, here is a solution in pySpark (very similar to Scala) that does not use udfs. Instead, it uses the sql functions concat_ws , substring_index and split .

This will only work with string arrays. To make it work with arrays of other types, you will first have to cast them to strings, and then return them to the original type after you "sliced" the array.

 from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = (SparkSession.builder .master('yarn') .appName("array_slice") .getOrCreate() ) emp_details = [ ["Jon", "Snow", "Castle", "Black", "Ned"], ["Ned", "is", "no", "more"] ] df1 = spark.createDataFrame( [tuple([emp]) for emp in emp_details], ["emp_details"] ) df1.show(truncate=False) 
 +-------------------------------+ |emp_details | +-------------------------------+ |[Jon, Snow, Castle, Black, Ned]| |[Ned, is, no, more] | +-------------------------------+ 
 last_string = 2 df2 = ( df1 .withColumn('last_string', (F.lit(last_string))) .withColumn('concat', F.concat_ws(" ", F.col('emp_details'))) .withColumn('slice', F.expr("substring_index(concat, ' ', last_string + 1)" )) .withColumn('slice', F.split(F.col('slice'), ' ')) .select('emp_details', 'slice') ) df2.show(truncate=False) 
 +-------------------------------+-------------------+ |emp_details |slice | +-------------------------------+-------------------+ |[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]| |[Ned, is, no, more] |[Ned, is, no] | +-------------------------------+-------------------+ 
0
source

Use nested split:

split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',')

 scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> val spark=SparkSession.builder().getOrCreate() spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d637673 scala> val df = spark.read.json("file:///Users/gengmei/Desktop/test/test.json") 18/12/11 10:09:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException df: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field] scala> df.createOrReplaceTempView("raw_data") scala> df.show() +-------+-------+--------------------+ |dept_id|dept_nm| emp_details| +-------+-------+--------------------+ | 10|Finance|[Jon, Snow, Castl...| | 20| IT| [Ned, is, no, more]| +-------+-------+--------------------+ scala> val df2 = spark.sql( | s""" | |select dept_id,dept_nm,split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') as emp_details from raw_data | """) df2: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field] scala> df2.show() +-------+-------+-------------------+ |dept_id|dept_nm| emp_details| +-------+-------+-------------------+ | 10|Finance|[Jon, Snow, Castle]| | 20| IT| [Ned, is, no]| +-------+-------+-------------------+ 
-one
source

Source: https://habr.com/ru/post/1011482/


All Articles