Spark中的并发作业执行 [英] Concurrent job Execution in Spark

查看:119
本文介绍了Spark中的并发作业执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用了以下格式的输入数据:

0
1
2
3
4
5
…
14

Input Location: hdfs://localhost:9000/Input/datasource

我已使用以下代码段使用多个线程将RDD保存为文本文件:

package org.apache.spark.examples;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

class RunnableDemo implements Runnable
{

    private Thread t;
    private String threadName;
    private String path;
    private JavaRDD<String> javaRDD;
//  private JavaSparkContext javaSparkContext;

    RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path)
    {
        this.threadName=threadName;
        this.javaRDD=javaRDD;
        this.path=path;
//      this.javaSparkContext=javaSparkContext;
    }


    @Override
    public void run() {
        System.out.println("Running " +  threadName );      
        try {
            this.javaRDD.saveAsTextFile(path);
//          System.out.println(this.javaRDD.count());
            Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.println("Thread " +  threadName + " interrupted.");
                }
         System.out.println("Thread " +  threadName + " exiting.");
//       this.javaSparkContext.stop();
    }

    public void start ()
       {
          System.out.println("Starting " +  threadName );
          if (t == null)
          {
             t = new Thread (this, threadName);
             t.start ();
          }
       }

}

public class SparkJavaTest {



    public static void main(String[] args) {

        //Spark Configurations:

        SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest");

        JavaSparkContext ctx=new JavaSparkContext(sparkConf);

        SQLContext sqlContext = new SQLContext(ctx);        

        JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource");




        List<StructField> fields= new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true));

        JavaRDD<Row> rowRDD =dataCollection.map(
                new Function<String, Row>() {
                    @Override
                    public Row call(String record) throws Exception {
                        String[] fields = record.split("\u0001");                       
                        return RowFactory.create(Integer.parseInt(fields[0].trim()));
                    }                   
                });     

        StructType schema = DataTypes.createStructType(fields);

        DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);        
        dataFrame.registerTempTable("data");

        long recordsCount=dataFrame.count();        
        long splitRecordsCount=5;
        long splitCount =recordsCount/splitRecordsCount;
        List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>();

        for(int i=0;i<splitCount;i++)
        {
            long start = i*splitRecordsCount;
            long end = (i+1)*splitRecordsCount;         
            DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);         
            list1.add(temp.toJavaRDD());
        }       

        long length =list1.size();

        int split=0;

        for (int i = 0; i < length; i++) {

            JavaRDD rdd1 =list1.get(i);

            JavaPairRDD rdd3=rdd1.cartesian(rdd1);

            JavaPairRDD<Row,Row> rdd4=rdd3.filter(
                    new Function<Tuple2<Row,Row>,Boolean>()
                    {
                        public Boolean call(Tuple2<Row,Row> s)
                        {
                            Row line1=s._1;
                            Row line2=s._2;

                            long app1 = Integer.parseInt(line1.get(0).toString());

                            long app2 = Integer.parseInt(line2.get(0).toString());


                            if(app1<app2)
                            {
                                return true;
                            }
                            return false;
                        }
                    });

            JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                @Override
                public String call(Tuple2<Row, Row> s)
                        throws Exception {

                    Row data1=s._1;
                    Row data2=s._2;

                    int x =Integer.parseInt(data1.get(0).toString());
                    int y =Integer.parseInt(data2.get(0).toString());

                    String result =x +","+ y+","+(x+y);
                    return result;
                }
            });

            RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

            R.start();
            split++;            
            R.start();

            int index =i;

            while(index<length)
            {
                JavaRDD rdd2 =list1.get(index);
                 rdd3=rdd1.cartesian(rdd2);

                 rdd4=rdd3.filter(
                        new Function<Tuple2<Row,Row>,Boolean>()
                        {
                            public Boolean call(Tuple2<Row,Row> s)
                            {
                                Row line1=s._1;
                                Row line2=s._2;

                                long app1 = Integer.parseInt(line1.get(0).toString());

                                long app2 = Integer.parseInt(line2.get(0).toString());


                                if(app1<app2)
                                {
                                    return true;
                                }
                                return false;
                            }
                        });         

                test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                    @Override
                    public String call(Tuple2<Row, Row> s)
                            throws Exception {

                        Row data1=s._1;
                        Row data2=s._2;

                        int x =Integer.parseInt(data1.get(0).toString());
                        int y =Integer.parseInt(data2.get(0).toString());

                        String result =x +","+ y+","+(x+y);
                        return result;
                    }
                });         

                R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

                R.start();
                split++;            
                index++;                
            }
        }
    }

}

在这种情况下,我遇到了以下异常

我已经尝试了以下链接中提供的解决方案

如何运行并发作业单个Spark上下文在Apache Spark中进行(操作)

但是,我仍然无法解决此问题.

您能指导我解决这个问题吗?

解决方案

首先,您试图使用多个线程在驱动程序节点上执行所有工作.这实际上不是Spark的精神,因为您案例中的每个工作单元都是独立的,并且可以在不同的机器上执行.您在这里有一个玩具示例,但是对于大量数据而言,这将变得非常重要.

更好的方法是使用并行度和分区数(在此处讨论)适当地输入数据.

您的代码的直接问题是主线程启动其他线程,但不等待它们完成.通常,这会导致生成的线程与父代一起终止(请参见答案中,主函数如何在返回之前在衍生的期货上对生成的期货进行get().

I have used input data with the below format:

0
1
2
3
4
5
…
14

Input Location: hdfs://localhost:9000/Input/datasource

I have used the following code snippet to save RDD as text file using multiple threads:

package org.apache.spark.examples;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

class RunnableDemo implements Runnable
{

    private Thread t;
    private String threadName;
    private String path;
    private JavaRDD<String> javaRDD;
//  private JavaSparkContext javaSparkContext;

    RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path)
    {
        this.threadName=threadName;
        this.javaRDD=javaRDD;
        this.path=path;
//      this.javaSparkContext=javaSparkContext;
    }


    @Override
    public void run() {
        System.out.println("Running " +  threadName );      
        try {
            this.javaRDD.saveAsTextFile(path);
//          System.out.println(this.javaRDD.count());
            Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.println("Thread " +  threadName + " interrupted.");
                }
         System.out.println("Thread " +  threadName + " exiting.");
//       this.javaSparkContext.stop();
    }

    public void start ()
       {
          System.out.println("Starting " +  threadName );
          if (t == null)
          {
             t = new Thread (this, threadName);
             t.start ();
          }
       }

}

public class SparkJavaTest {



    public static void main(String[] args) {

        //Spark Configurations:

        SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest");

        JavaSparkContext ctx=new JavaSparkContext(sparkConf);

        SQLContext sqlContext = new SQLContext(ctx);        

        JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource");




        List<StructField> fields= new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true));

        JavaRDD<Row> rowRDD =dataCollection.map(
                new Function<String, Row>() {
                    @Override
                    public Row call(String record) throws Exception {
                        String[] fields = record.split("\u0001");                       
                        return RowFactory.create(Integer.parseInt(fields[0].trim()));
                    }                   
                });     

        StructType schema = DataTypes.createStructType(fields);

        DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);        
        dataFrame.registerTempTable("data");

        long recordsCount=dataFrame.count();        
        long splitRecordsCount=5;
        long splitCount =recordsCount/splitRecordsCount;
        List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>();

        for(int i=0;i<splitCount;i++)
        {
            long start = i*splitRecordsCount;
            long end = (i+1)*splitRecordsCount;         
            DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);         
            list1.add(temp.toJavaRDD());
        }       

        long length =list1.size();

        int split=0;

        for (int i = 0; i < length; i++) {

            JavaRDD rdd1 =list1.get(i);

            JavaPairRDD rdd3=rdd1.cartesian(rdd1);

            JavaPairRDD<Row,Row> rdd4=rdd3.filter(
                    new Function<Tuple2<Row,Row>,Boolean>()
                    {
                        public Boolean call(Tuple2<Row,Row> s)
                        {
                            Row line1=s._1;
                            Row line2=s._2;

                            long app1 = Integer.parseInt(line1.get(0).toString());

                            long app2 = Integer.parseInt(line2.get(0).toString());


                            if(app1<app2)
                            {
                                return true;
                            }
                            return false;
                        }
                    });

            JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                @Override
                public String call(Tuple2<Row, Row> s)
                        throws Exception {

                    Row data1=s._1;
                    Row data2=s._2;

                    int x =Integer.parseInt(data1.get(0).toString());
                    int y =Integer.parseInt(data2.get(0).toString());

                    String result =x +","+ y+","+(x+y);
                    return result;
                }
            });

            RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

            R.start();
            split++;            
            R.start();

            int index =i;

            while(index<length)
            {
                JavaRDD rdd2 =list1.get(index);
                 rdd3=rdd1.cartesian(rdd2);

                 rdd4=rdd3.filter(
                        new Function<Tuple2<Row,Row>,Boolean>()
                        {
                            public Boolean call(Tuple2<Row,Row> s)
                            {
                                Row line1=s._1;
                                Row line2=s._2;

                                long app1 = Integer.parseInt(line1.get(0).toString());

                                long app2 = Integer.parseInt(line2.get(0).toString());


                                if(app1<app2)
                                {
                                    return true;
                                }
                                return false;
                            }
                        });         

                test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                    @Override
                    public String call(Tuple2<Row, Row> s)
                            throws Exception {

                        Row data1=s._1;
                        Row data2=s._2;

                        int x =Integer.parseInt(data1.get(0).toString());
                        int y =Integer.parseInt(data2.get(0).toString());

                        String result =x +","+ y+","+(x+y);
                        return result;
                    }
                });         

                R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

                R.start();
                split++;            
                index++;                
            }
        }
    }

}

In this case I have faced the following exception

I have tried the solution provided in the following link

How to run concurrent jobs(actions) in Apache Spark using single spark context

But still, I can’t resolve this issue.

Could you please guide me to resolve this?

解决方案

First of all, you're trying to execute all the work on the driver node using several threads. This is not really in the spirit of Spark, since each unit of work in your case is independent of the others and can be executed on different machines. Your have a toy example here, but this will become really important with large volumes of data.

A better approach would be to use something like mapPartitions to send the range of keys to each worker and let them execute the corresponding SQL queries, then save the results, with one thread per worker. This will make the code cleaner and easier to reason about (once you get used to the way RDDs work). You'd obviously need to set the level of parallelism and number of partitions (talked about here) for your input data appropriately.

The immediate issue with your code is that the main thread starts other threads, but doesn't wait for them to finish. Normally this causes the spawned threads to terminate along with the parent (see the javadoc). Notice how in the answer to the linked question the main function does a get() on the spawned futures before returning.

这篇关于Spark中的并发作业执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆