Pyspark: using repartitionAndSortWithinPartitions with several types of Crititia

Assuming I have the following RDD:

rdd = sc.parallelize([('a', (5,1)), ('d', (8,2)), ('2', (6,3)), ('a', (8,2)), ('d', (9,6)), ('b', (3,4)),('c', (8,3))])

How can I use repartitionAndSortWithinPartitionsand sort by x [0] and after x [1] [0]. Using the following, I only sort the key (x [0]):

Npartitions = sc.defaultParallelism
rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: hash(x) % Npartitions, 2)

The way to do this is as follows, but there should be something simpler, I think:

Npartitions = sc.defaultParallelism 
partitioned_data = rdd
  .partitionBy(2)
  .map(lambda x:(x[0],x[1][0],x[1][1]))
  .toDF(['letter','number2','number3'])
  .sortWithinPartitions(['letter','number2'],ascending=False)
  .map(lambda x:(x.letter,(x.number2,x.number3)))

>>> partitioned_data.glom().collect()

[[],
[(u'd', (9, 6)), (u'd', (8, 2))],
[(u'c', (8, 3)), (u'c', (6, 3))],
[(u'b', (3, 4))],
[(u'a', (8, 2)), (u'a', (5, 1))]

As you can see, I have to convert it to a Dataframe to use sortWithinPartitions. Is there another way? Using repartitionAndSortWithinPartitions?

(It doesn't matter that the data is not sorted all over the world. I only need to sort inside the partitions.)

+4
source share
1 answer

, :

from pyspark.rdd import portable_hash

n = 2

def partitioner(n):
    """Partition by the first item in the key tuple"""
    def partitioner_(x):
        return portable_hash(x[0]) % n
    return partitioner_


(rdd
  .keyBy(lambda kv: (kv[0], kv[1][0]))  # Create temporary composite key
  .repartitionAndSortWithinPartitions(
      numPartitions=n, partitionFunc=partitioner(n), ascending=False)
  .map(lambda x: x[1]))  # Drop key (note: there is no partitioner set anymore)

:

  • keyBy(lambda kv: (kv[0], kv[1][0])) , . , :

    (0, (5,1))
    

    ((0, 5), (0, (5, 1)))
    

    ,

    ((0, 5), 1)
    
  • partitioner :

    partitioner(7)((0, 5))
    ## 0
    
    partitioner(7)((0, 6))
    ## 0
    
    partitioner(7)((0, 99))
    ## 0
    
    partitioner(7)((3, 99))
    ## 3
    

    , .

  • keyfunc, (lambda x: x) , Python tuple:

    (0, 5) < (1, 5)
    ## True
    
    (0, 5) < (0, 4)
    ## False
    

, :

rdd.map(lambda kv: ((kv[0], kv[1][0]), kv[1][1]))

map, .

+9

Source: https://habr.com/ru/post/1651074/


All Articles