The return value of the second field of an existing RDD based on the value from the first field of another RDD

I have data in three files as shown below in HDFS

EmployeeManagers.txt (EmpID, ManagerID)

1,5
2,4
3,4
4,6
5,6

EmployeeNames.txt (EmpID, Name)

1,Ronald Rays
2,Jimmy Kent
3,Shannon Witt
4,Krinton Kale
5,Harry Donal
6,Christina Fernandez

EmployeeSalary.txt (EmpID, salary)

1,1000
2,2000
3,3000
4,4000
5,5000
6,6000

I want to print data by creating RDD from these files in ID, Employee Name, Salary, Name Manager format.

I joined 3 key-based RDDs, i.e. the first column in each text file, and was able to print the manager identifier, but not the manager name.

Here is the code I wrote for this.

val manager = sc.textFile("EmployeeManagers")
val managerRDD = manager.map(x => (x.split(",")(0), x.split(",")(1)))
val name = sc.textFile("EmployeeNames")
val namePairRDD = name.map(x => (x.split(",")(0), x.split(",")(1)))
val salary = sc.textFile("EmployeeSalary")
val salaryPairRDD = salary.map(x => (x.split(",")(0), x.split(",")(1)))
val data = namePair.join(salaryPair).join(managerPair)

The current output looks something like below

scala> data.collect();
res4: Array[(String, ((String, String), String))] = Array((4,((Krinton Kale,4000),6)), (5,((Harry Donal,5000),6)), (2,((Jimmy Kent,2000),4)), (3,((Shannon Witt,3000),4)), (1,((Ronald Rays,1000),5)))
+4
source share
1 answer

, namePairRDD, :

val result = namePairRDD
  .join(salaryPairRDD)
  .join(managerPairRDD)
  .map { case (id, ((name, salary), mngrId)) => (mngrId, (id, name, salary)) }
  .join(namePairRDD) // join again, this time on managerId
  .map { case (_, ((id, name, salary), mngrName)) => (id, name, salary, mngrName) }

result.foreach(println)
// (2,Jimmy Kent,2000.0,Krinton Kale)
// (3,Shannon Witt,3000.0,Krinton Kale)
// (1,Ronald Rays,1000.0,Harry Donal)
// (4,Krinton Kale,4000.0,Christina Fernandez)
// (5,Harry Donal,5000.0,Christina Fernandez)
+3

Source: https://habr.com/ru/post/1667761/


All Articles