Python Spark How to map fields from one rdd to another rdd

I am very new to python curvature as described above. I want to map the fields of one Rdd to the field of another Rdd. Here is an example

rdd1:

c_id    name 
121210  abc
121211  pqr

rdd2:

c_id   cn_id cn_value
121211  0     0
121210  0     1

Accordingly, c_id will replace the name with cnid and aggregate cn_value . Thus, the output will look like this: abc 0 0 pqr 0 1

from pyspark import SparkContext
import csv
sc = SparkContext("local", "spark-App")
file1 = sc.textFile('/home/hduser/sample.csv').map(lambda line:line.split(',')).filter(lambda line:len(line)>1)
file2 = sc.textFile('hdfs://localhost:9000/sample2/part-00000').map(lambda line:line.split(','))
file1_fields = file1.map(lambda x: (x[0],x[1]))
file2_fields = file2.map(lambda x: (x[0],x[1],float(x[2])))

How can I achieve my goal by adding code here.

Any help would be much appreciated thank you

+1
source share
1 answer

, , join. , , DataFrames spark-csv ( , , ). :

file1 = ... # path to the first file
file2 = ... # path to the second file

with open(file1, "w") as fw:
    fw.write("c_id,name\n121210,abc\n121211,pqr")

with open(file2, "w") as fw:
    fw.write("121211,0,0\n121210,0,1")

:

df1 = (sqlContext.read 
    .format('com.databricks.spark.csv')
    .options(header='true', inferSchema='true')
    .load(file1))

:

schema = StructType(
    [StructField(x, LongType(), False) for x in ("c_id", "cn_id", "cn_value")])

df2 = (sqlContext.read 
    .format('com.databricks.spark.csv')
    .schema(schema)
    .options(header='false')
    .load(file2))

, :

combined = df1.join(df2, df1["c_id"] == df2["c_id"])
combined.show()

## +------+----+------+-----+--------+
## |  c_id|name|  c_id|cn_id|cn_value|
## +------+----+------+-----+--------+
## |121210| abc|121210|    0|       1|
## |121211| pqr|121211|    0|       0|
## +------+----+------+-----+--------+

Edit

RDD - :

file1_fields.join(file2_fields.map(lambda x: (x[0], x[1:])))
+2

Source: https://habr.com/ru/post/1620103/


All Articles