Description of the problem
Combine two datasets that have different column names into Apache sparks; after the function, the isin()
order changes in the dataset.
Even I tried to sort
, orderby
but did not work.
input 1:
RowFactory.create("405-048011-62815", "CRC Industries"),
RowFactory.create("630-0746","Dixon value"),
RowFactory.create("4444-444","3M INdustries"),
RowFactory.create("555-55","Dixon coupling valve")
Input2:
RowFactory.create("222-2222-5555", "Tata"),
RowFactory.create("7777-88886","WestSide"),
RowFactory.create("22222-22224","Reliance"),
RowFactory.create("33333-3333","V industries")
List<Row> data = Arrays.asList(
RowFactory.create("405-048011-62815", "CRC Industries"),
RowFactory.create("630-0746","Dixon value"),
RowFactory.create("4444-444","3M INdustries"),
RowFactory.create("555-55","Dixon coupling valve"));
StructType schema = new StructType(new StructField[] {
new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
List<String> listStrings = new ArrayList<String>();
listStrings.add("405-048011-62815");
listStrings.add("630-0746");
listStrings.add("4444-444");
listStrings.add("555-55");
Dataset<Row> matchFound1 = sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
matchFound1.show();
listStrings.clear();
listStrings.add("222-2222-5555");
listStrings.add("7777-88886");
listStrings.add("22222-22224");
listStrings.add("33333-3333");
StringIndexer indexer = new StringIndexer()
.setInputCol("label1")
.setOutputCol("label1Index1");
Dataset<Row> Dataset1 = indexer.fit(matchFound1).transform(matchFound1);
Dataset1.show();
List<Row> data2 = Arrays.asList(
RowFactory.create("222-2222-5555", "Tata"),
RowFactory.create("7777-88886","WestSide"),
RowFactory.create("22222-22224","Reliance"),
RowFactory.create("33333-3333","V industries"));
StructType schema2 = new StructType(new StructField[] {
new StructField("label2", DataTypes.StringType, false,Metadata.empty()),
new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) });
Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);
Dataset<Row> matchFound2 = sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
matchFound2.show();
StringIndexer indexer1 = new StringIndexer()
.setInputCol("label2")
.setOutputCol("label2Index1");
Dataset<Row> Dataset2 = indexer1.fit(matchFound2).transform(matchFound2);
Dataset2.show();
Dataset<Row> Finalresult = Dataset1.join(Dataset2 , Dataset1.col("label1Index1").equalTo(Dataset2.col("label2Index1"))).drop(Dataset1.col("label1Index1")).drop(Dataset2.col("label2Index1"));
Finalresult.show();
Actual output:
+----------------+--------------------+-------------+------------+
| label1| sentence1| label2| sentence2|
+----------------+--------------------+-------------+------------+
|405-048011-62815| CRC Industries| 33333-3333|V industries|
| 630-0746| Dixon value|222-2222-5555| Tata|
| 4444-444| 3M INdustries| 7777-88886| WestSide|
| 555-55|Dixon coupling valve| 22222-22224| Reliance|
+----------------+--------------------+-------------+------------+
Expected Result:
+----------------+--------------------+-------------+------------+
| label1| sentence1| label2| sentence2|
+----------------+--------------------+-------------+------------+
|405-048011-62815| CRC Industries|222-2222-5555|V industries|
| 630-0746| Dixon value| 7777-88886 | Tata|
| 4444-444| 3M INdustries| 22222-22224| WestSide|
| 555-55|Dixon coupling valve| 33333-3333 | Reliance|
+----------------+--------------------+-------------+------------+