The output order is not in the order specified

Description of the problem

Combine two datasets that have different column names into Apache sparks; after the function, the isin()order changes in the dataset.

Even I tried to sort, orderbybut did not work.

input 1:

RowFactory.create("405-048011-62815", "CRC Industries"),
RowFactory.create("630-0746","Dixon value"),
RowFactory.create("4444-444","3M INdustries"),
RowFactory.create("555-55","Dixon coupling valve")

Input2:

RowFactory.create("222-2222-5555", "Tata"),
RowFactory.create("7777-88886","WestSide"),
RowFactory.create("22222-22224","Reliance"),
RowFactory.create("33333-3333","V industries")


List<Row> data = Arrays.asList(
RowFactory.create("405-048011-62815", "CRC Industries"),
RowFactory.create("630-0746","Dixon value"),
RowFactory.create("4444-444","3M INdustries"),
RowFactory.create("555-55","Dixon coupling valve"));

StructType schema = new StructType(new StructField[]  {
new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

List<String> listStrings = new ArrayList<String>();
listStrings.add("405-048011-62815");
listStrings.add("630-0746");
listStrings.add("4444-444");
listStrings.add("555-55");

Dataset<Row> matchFound1 = sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
matchFound1.show();


listStrings.clear();
listStrings.add("222-2222-5555");
listStrings.add("7777-88886");
listStrings.add("22222-22224");
listStrings.add("33333-3333");
StringIndexer indexer = new StringIndexer()
  .setInputCol("label1")
  .setOutputCol("label1Index1");
Dataset<Row> Dataset1 = indexer.fit(matchFound1).transform(matchFound1);
Dataset1.show();


List<Row> data2 = Arrays.asList(
    RowFactory.create("222-2222-5555", "Tata"),
    RowFactory.create("7777-88886","WestSide"),
    RowFactory.create("22222-22224","Reliance"),
    RowFactory.create("33333-3333","V industries"));
StructType schema2 = new StructType(new StructField[] {
new StructField("label2", DataTypes.StringType,  false,Metadata.empty()),
new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) });

Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

Dataset<Row> matchFound2 = sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
matchFound2.show();

StringIndexer indexer1 = new StringIndexer()
  .setInputCol("label2")
  .setOutputCol("label2Index1");
Dataset<Row> Dataset2 = indexer1.fit(matchFound2).transform(matchFound2);
Dataset2.show();

Dataset<Row> Finalresult = Dataset1.join(Dataset2 , Dataset1.col("label1Index1").equalTo(Dataset2.col("label2Index1"))).drop(Dataset1.col("label1Index1")).drop(Dataset2.col("label2Index1"));
Finalresult.show();

Actual output:

    +----------------+--------------------+-------------+------------+
    |          label1|           sentence1|       label2|   sentence2|
    +----------------+--------------------+-------------+------------+
    |405-048011-62815|      CRC Industries|   33333-3333|V industries|
    |        630-0746|         Dixon value|222-2222-5555|        Tata|
    |        4444-444|       3M INdustries|   7777-88886|    WestSide|
    |          555-55|Dixon coupling valve|  22222-22224|    Reliance|
    +----------------+--------------------+-------------+------------+

Expected Result:

    +----------------+--------------------+-------------+------------+
    |          label1|           sentence1|       label2|   sentence2|
    +----------------+--------------------+-------------+------------+
    |405-048011-62815|      CRC Industries|222-2222-5555|V industries|
    |        630-0746|         Dixon value|  7777-88886 |        Tata|
    |        4444-444|       3M INdustries|  22222-22224|    WestSide|
    |          555-55|Dixon coupling valve|  33333-3333 |    Reliance|
    +----------------+--------------------+-------------+------------+
+6
source share
1 answer

Instead of a string indexer, you can add a constant column with unique sequential numbers with monotonically_increasing_id()and recreate a DataFrame, as shown below:

Dataset<Row> Test2=Dataset2.withColumn("rowId2", monotonically_increasing_id()) ;
Dataset<Row> Test1=Dataset1.withColumn("rowId1", monotonically_increasing_id()) ;

Then attach both datasets:

Dataset<Row> Finalresult = Test1.join(Test2 , Test1.col("rowId1").equalTo(Test2.col("rowId2")));
+7

Source: https://habr.com/ru/post/1017308/


All Articles