SparkContext setLocalProperties [英] SparkContext setLocalProperties

查看:158
本文介绍了SparkContext setLocalProperties的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

继续这个问题你可以告诉我我可以改变哪些属性 SparkContext.setLocalProperties 我可以更改内核,RAM等吗?

As continuation of this question, could you please tell me what properties I can change from SparkContext.setLocalProperties? Could I change cores, RAM etc?

推荐答案

我使用属性 spark进行了一些测试。 executor.memory (可用属性为此处),实际上在一个非常简单的本地Spark上,每个具有不同设置的两个线程似乎被限制在线程中,代码(可能不是您将部署到生产中的代码)在此结束时邮报,马国王一些交错的线程,以确保它不是通过一些纯粹的调度运气,我得到以下输出(清理火花输出到我的控制台):

I made some testing with the property spark.executor.memory (the available properties are here), , and actually on a very simple local Spark, starting two threads each with different settings seem to be confined to the threads, with the code (probably not a code you would deploy into production) at the end of this post, making some interleaving of threads to be sure it's not through some sheer scheduling luck, I obtain the following output (cleaning spark output to my console):

Thread 1 Before sleeping mem: 512
Thread 2 Before sleeping mem: 1024
Thread 1 After sleeping mem: 512
Thread 2 After sleeping mem: 1024

非常干净地观察一个线程中的声明属性保留在所述线程内部,尽管我很确定它很容易导致荒谬的情况,所以我仍然建议在应用这些技术之前谨慎。

Pretty neat to observe a declared property in a thread stays inside the said thread, although I am pretty sure that it can easily lead to nonsensical situation, so I'd still recommend caution before applying such techniques.

public class App {
    private static JavaSparkContext sc;
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local")
                .setAppName("Testing App");
        sc = new JavaSparkContext(conf);
        SparkThread Thread1 = new SparkThread(1);
        SparkThread Thread2 = new SparkThread(2);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Future ThreadCompletion1 = executor.submit(Thread1);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        Future ThreadCompletion2 = executor.submit(Thread2);
        try {
            ThreadCompletion1.get();
            ThreadCompletion2.get();
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    private static class SparkThread implements Runnable{
        private int i = 1;
        public  SparkThread(int i) {
            this.i = i;

        }
        @Override
        public void run() {
            int mem = 512;
            sc.setLocalProperty("spark.executor.memory", Integer.toString(mem * i));
            JavaRDD<String> input = sc.textFile("test" + i);

            FlatMapFunction<String, String> tt = s -> Arrays.asList(s.split(" "))
                    .iterator();
            JavaRDD<String> words = input.flatMap(tt);
            System.out.println("Thread " + i + " Before sleeping mem: " + sc.getLocalProperty("spark.executor.memory"));

            try {
                Thread.sleep(7000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            //do some work 
            JavaPairRDD<String, Integer> counts = words.mapToPair(t -> new Tuple2(t, 1))
                    .reduceByKey((x, y) -> (int) x + (int) y);

            counts.saveAsTextFile("output" + i);
            System.out.println("Thread " + i + " After sleeping mem: " + sc.getLocalProperty("spark.executor.memory"));
        }

    }
}

这篇关于SparkContext setLocalProperties的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆