SparkContext setLocalProperties

As a continuation of this question , could you tell me which properties I can change from SparkContext.setLocalProperties? Can I change the kernels, RAM, etc.

+6
source share
3 answers

According to the description of the documentation localPropertiesis a protected[spark]property SparkContext, which are properties with which you can create logical groups of tasks. In the other hand, they are Inheritablestreaming local variables. This means that they are used in preference to the usual local thread variables, when the per-thread attribute supported in the variable should be automatically passed to any child threads that are created. Scrolling through local properties for workers begins with a request SparkContextto start or send a Spark job, which in turn passes them to DAGScheduler.

In general, it is Local propertiesused to group tasks into pools in the FAIR task scheduler using the spark.scheduler.poolper-thread property and in the SQLExecution.withNewExecutionIdinstallation method spark.sql.execution.id.

, . .

+3

spark.executor.memory ( ), Spark, , , , , (, , ) , , , - , ( ):

Thread 1 Before sleeping mem: 512
Thread 2 Before sleeping mem: 1024
Thread 1 After sleeping mem: 512
Thread 2 After sleeping mem: 1024

, , , , - .

public class App {
    private static JavaSparkContext sc;
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local")
                .setAppName("Testing App");
        sc = new JavaSparkContext(conf);
        SparkThread Thread1 = new SparkThread(1);
        SparkThread Thread2 = new SparkThread(2);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Future ThreadCompletion1 = executor.submit(Thread1);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        Future ThreadCompletion2 = executor.submit(Thread2);
        try {
            ThreadCompletion1.get();
            ThreadCompletion2.get();
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    private static class SparkThread implements Runnable{
        private int i = 1;
        public  SparkThread(int i) {
            this.i = i;

        }
        @Override
        public void run() {
            int mem = 512;
            sc.setLocalProperty("spark.executor.memory", Integer.toString(mem * i));
            JavaRDD<String> input = sc.textFile("test" + i);

            FlatMapFunction<String, String> tt = s -> Arrays.asList(s.split(" "))
                    .iterator();
            JavaRDD<String> words = input.flatMap(tt);
            System.out.println("Thread " + i + " Before sleeping mem: " + sc.getLocalProperty("spark.executor.memory"));

            try {
                Thread.sleep(7000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            //do some work 
            JavaPairRDD<String, Integer> counts = words.mapToPair(t -> new Tuple2(t, 1))
                    .reduceByKey((x, y) -> (int) x + (int) y);

            counts.saveAsTextFile("output" + i);
            System.out.println("Thread " + i + " After sleeping mem: " + sc.getLocalProperty("spark.executor.memory"));
        }

    }
}
+3

LocalProperties () . TaskContext . SQL Execution ID

0

Source: https://habr.com/ru/post/1015832/


All Articles