How to filter Spark RDD based on specific field value in Java?

I am creating a Spark job in Java. Here is my code.

I am trying to filter entries from a CSV file. The header contains fields OID, COUNTRY_NAME......

Instead of just filtering based on s.contains("CANADA"), I would like to be more specific, for example, I want to filter based on COUNTRY_NAME.equals("CANADA"). Any thoughts on how I can do this?

public static void main(String[] args) {
    String gaimFile = "hdfs://xx.yy.zz.com/sandbox/data/acc/mydata"; 

    SparkConf conf = new SparkConf().setAppName("Filter App");
    JavaSparkContext sc = new JavaSparkContext(conf);
    try{
        JavaRDD<String> gaimData = sc.textFile(gaimFile);

        JavaRDD<String> canadaOnly = gaimData.filter(new Function<String, Boolean>() {

            private static final long serialVersionUID = -4438640257249553509L;

            public Boolean call(String s) { 
               // My file id csv with header OID, COUNTRY_NAME, .....
               // here instead of just saying s.contains 
               // i would like to be more specific and say 
               // if COUNTRY_NAME.eqauls("CANADA)
               return s.contains("CANADA"); 
            }
        }); 

    }
    catch(Exception e){
        System.out.println("ERROR: G9 MatchUp Failed");
    }
    finally{
        sc.close();
    }
}
+4
source share
1 answer

First you need to map your values ​​to a custom class:

rdd.map(lines=>ConvertToCountry(line))
   .filter(country=>country == "CANADA")

class Country{
  ...ctor that takes an array and fills properties...
  ...properties for each field from the csv...
}

ConvertToCountry(line: String){
  return new Country(line.split(','))
}

The above is a combination of Scala and pseudo code, but you should get the point.

+3
source

Source: https://habr.com/ru/post/1598424/


All Articles