PySpark DataFrames - enumeration method without conversion to Pandas?

I have a very large pyspark.sql.dataframe.DataFrame named df. I need some way to enumerate records, thus accessing a record with a specific index. (or select a group of records with a range of indices)

In pandas I could only do

indexes=[2,3,6,7] df[indexes] 

Here I want something similar (and without converting data to pandas)

The closest I can get to is:

  • Listing all the objects in the original data frame:

     indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes) 
    • Finding the values ​​I need with the where () function.

QUESTIONS:

  • Why is this not working and how to make it work? How to add a row to a dataframe?
  • Will work later to do something like:

      indexes=[2,3,6,7] df1.where("index in indexes").collect() 
  • What would be a quick and easy way to handle this?

+14
python bigdata apache-spark pyspark rdd
Sep 24 '15 at 12:06
source share
4 answers

This does not work because:

  • the second argument to withColumn should be a Column not a collection. np.array will not work here.
  • when you pass "index in indexes" because the SQL expression for where indexes is out of scope and not allowed as a valid identifier

PySpark> = 1.4.0

You can add line numbers using the appropriate window function and query using the Column.isin method or a well-formed query string:

 from pyspark.sql.functions import col, rowNumber from pyspark.sql.window import Window w = Window.orderBy() indexed = df.withColumn("index", rowNumber().over(w)) # Using DSL indexed.where(col("index").isin(set(indexes))) # Using SQL expression indexed.where("index in ({0})".format(",".join(str(x) for x in indexes))) 

It seems that window functions, called without a PARTITION BY , move all the data into one partition, so the above might not be the best solution in the end.

Any faster and easier way to handle this?

Not really. Spark DataFrames does not support random row access.

PairedRDD can be obtained using the lookup method, which is relatively fast if the data is partitioned using the HashPartitioner . There is also an indexed-rdd project that supports efficient search.

Edit

Regardless of the version of PySpark, you can try something like this:

 from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType row = Row("char") row_with_index = Row("char", "index") df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF() df.show(5) ## +----+ ## |char| ## +----+ ## | a| ## | b| ## | c| ## | d| ## | e| ## +----+ ## only showing top 5 rows # This part is not tested but should work and save some work later schema = StructType( df.schema.fields[:] + [StructField("index", LongType(), False)]) indexed = (df.rdd # Extract rdd .zipWithIndex() # Add index .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows .toDF(schema)) # It will work without schema but will be more expensive # inSet in Spark < 1.3 indexed.where(col("index").isin(indexes)) 
+12
Sep 24 '15 at 12:18
source share

If you need a range of numbers that is guaranteed not to collide, but does not require .over(partitionBy()) , you can use monotonicallyIncreasingId() .

 from pyspark.sql.functions import monotonicallyIncreasingId df.select(monotonicallyIncreasingId().alias("rowId"),"*") 

Please note that the values ​​are not particularly "neat". Each section is assigned a range of values, and the output will not be adjacent. For example. 0, 1, 2, 8589934592, 8589934593, 8589934594 .

This was added to Spark on April 28, 2015 here: https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2

+10
Apr 29 '16 at 20:29
source share

Of course, you can add an array for indexing, an array of your choice: In Scala, we first need to create an index array:

 val index_array=(1 to df.count.toInt).toArray index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 

Now you can add this column to your DF. First, for this you need to open our DF and get it as an array, then zip it with index_array and then convert the new array to RDD. The final step is to get it as DF:

 final_df = sc.parallelize((df.collect.map( x=>(x(0),x(1))) zip index_array).map( x=>(x._1._1.toString,x._1._2.toString,x._2))). toDF("column_name") 

After that, indexing will become more understandable.

+1
Jul 11 '17 at 20:47
source share

monotonicallyIncreasingId() - this will assign line numbers in incresing order, but not in sequence.

two-column selection:

|---------------------|------------------| | RowNo | Heading 2 | |---------------------|------------------| | 1 | xy | |---------------------|------------------| | 12 | xz | |---------------------|------------------|

If you want to assign line numbers, use the following trick.

Tested in spark versions 2.0.1 and later.

df.createOrReplaceTempView("df") dfRowId = spark.sql("select *, row_number() over (partition by 0) as rowNo from df")

two-column selection:

|---------------------|------------------| | RowNo | Heading 2 | |---------------------|------------------| | 1 | xy | |---------------------|------------------| | 2 | xz | |---------------------|------------------|

Hope this helps.

0
Mar 29 '18 at 9:48
source share



All Articles