Spark Streaming:微批量并行执行 [英] Spark Streaming: Micro batches Parallel Execution

查看:496
本文介绍了Spark Streaming:微批量并行执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在接收来自Kafka的火花流数据。一旦Spark Streaming开始执行,它将只执行一个批次,其余批次将开始在Kafka中排队。


我们的数据是独立的,可以在并行中进行处理。

我们尝试过使用多个执行器,内核,背压和其他配置的多种配置,但目前为止还没有成功。排队的消息很多,一次只处理一个微批,其余的都保持在队列中。



我们希望最大限度地获得并行性,以便不会有任何微批处理排队,因为我们有足够的可用资源。因此,我们如何通过最大限度地利用资源来缩短时间。



  //开始从Kafka读取消息并获取DStream 
final JavaInputDStream< ConsumerRecord< String,byte []>> consumerStream = KafkaUtils.createDirectStream(
getJavaStreamingContext(),LocationStrategies.PreferConsistent(),
ConsumerStrategies。< String,byte []> Subscribe(TOPIC_NAME,
sparkServiceConf.getKafkaConsumeParams() ));

ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR,CommonUtils.loggerUniqueId());

JavaDStream< byte []> messagesStream = consumerStream.map(new Function< ConsumerRecord< String,byte []>,byte []>(){
private static final long serialVersionUID = 1L;
@Override
public byte []调用(ConsumerRecord< String,byte []> kafkaRecord)throws Exception {
return kafkaRecord.value();
}
});

//解码每个二进制消息并生成JSON数组$ b $ JavaDStream< String> decodeStream = messagesStream.map(new Function< byte [],String>(){
private static final serialVersionUID = 1L;
$ b @Override
public String call(byte [ (asn1Data)throws Exception {
if(asn1Data.length> 0){
try(InputStream inputStream = new ByteArrayInputStream(asn1Data);
Writer writer = new StringWriter();){


ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);

byte [] buffer = new byte [1024 ];
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

int len;
while((len = gzipInputStream.read(buffer))!= -1){
byteArrayOutputStream.write(buffer,0,len);
}


return new String(byteArrayOutputStream.toByteArray());


} catch(Exception e){
//
producer.flush();

throw e;
}
}

返回null;
}
});



$ b //将生成的json gzip发布到kafka
cache.foreachRDD(new VoidFunction< JavaRDD< String>>(){
private static final long serialVersionUID = 1L;

@Override
public void call(JavaRDD< String> jsonRdd4DF)throws Exception {
// Dataset< Row> json = sparkSession .read()。json(jsonRdd4DF);
if(!jsonRdd4DF.isEmpty()){
// JavaRDD< String> jsonRddDF = getJavaSparkContext()。parallelize(jsonRdd4DF.collect());
数据集< Row> json = sparkSession.read().json(jsonRdd4DF);

SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
airMainJsonProcessor.processAIRData(json,sparkSession);
}

}
});

getJavaStreamingContext()。start();
getJavaStreamingContext()。awaitTermination();
getJavaStreamingContext()。stop();

我们正在使用的技术:

  HDFS 2.7.1.2.5 
YARN + MapReduce2 2.7.1.2.5
ZooKeeper 3.4.6.2.5
Ambari Infra 0.1.0
Ambari指标0.1.0
卡夫卡0.10.0.2.5
诺克斯0.9.0.2.5
游侠0.6.0.2.5
游侠KMS 0.6.0.2.5
SmartSense 1.3.0.0-1
Spark2 2.0.x.2.5

我们从差异实验:


实验1



  num_executors = 6 
executor_memory = 8g
executor_cores = 12



<100>文件处理时间48分钟


实验2



  spark.default.parallelism = 12 
num_executors = 6
executor_memory = 8g
executor_cores = 12

100文件处理时间8分钟


实施例periment 3




  spark.default.parallelism = 12 
num_executors = 6
executor_memory = 8g
executor_cores = 12

100文件处理时间7分钟


实验4




  spark.default.parallelism = 16 
num_executors = 6
executor_memory = 8g
executor_cores = 12



<100>文件处理时间10分钟



请指教,我们如何才能最大化处理,以免排队。

解决方案

我遇到同样的问题,我尝试了一些方法来解决问题,并得出以下结论:

首先。直觉表示,每个执行者必须处理一个批次,但相反,一次只处理一个批次,但作业和任务并行处理。

多批处理可以通过使用 spark.streaming.concurrentjobs 来实现,但它没有记录,仍然需要一些修复。其中一个问题是节省卡夫卡补偿。假设我们将这个参数设置为4,并且4个批次并行处理,如果第3个批次在第4个批次之前完成,那么Kafka偏移量将被提交。如果批处理是独立的,这个参数非常有用。

spark.default.parallelism 因为它的名字有时被认为是并行的。但其真正的好处在于分布式洗牌操作。尝试不同的数字并为此找到最佳数字。处理时间会有很大差异。这取决于你工作中的洗牌操作。将其设置得太高会降低性能。实验结果也很明显。

另一种选择是使用 foreachPartitionAsync 替代RDD的foreach。但我认为foreachPartition更好,因为foreachPartitionAsync会排队工作,而批处理似乎被处理,但他们的工作仍然在排队或处理中。可能是我没有得到它的使用权。但它在我的3个服务中表现相同。

FAIR spark.scheduler.mode 必须用于包含大量任务的作业将任务分配给作业,为更小的任务提供机会,以便在处理更大的任务时开始接收资源。



尝试调整批量持续时间+输入大小并始终保持在低于处理时间,否则你会看到很多批次的积压。



这些是我的发现和建议,但是有很多配置和方法可以进行流式处理,而且经常一套操作不适用于其他人。 Spark Streaming完全是关于学习,将您的经验和期望结合在一起,以达到一套最佳配置。



希望它有帮助。如果有人能够具体说明我们如何合理处理批次,这将是一个很大的安慰。

We are receiving data in spark streaming from Kafka. Once execution has been started in Spark Streaming, it executes only one batch and the remaining batches starting queuing up in Kafka.

Our data is independent and can be processes in Parallel.

We tried multiple configurations with multiple executor, cores, back pressure and other configurations but nothing worked so far. There are a lot messages queued and only one micro batch has been processed at a time and rest are remained in queue.

We want to achieve parallelism at maximum, so that not any micro batch is queued, as we have enough resources available. So how we can reduce time by maximum utilization of resources.

// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
        getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, byte[]>Subscribe("TOPIC_NAME",
                sparkServiceConf.getKafkaConsumeParams()));

ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());

JavaDStream<byte[]> messagesStream = consumerStream.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() {
    private static final long serialVersionUID = 1L;
    @Override
    public byte[] call(ConsumerRecord<String, byte[]> kafkaRecord) throws Exception {
        return kafkaRecord.value();
    }
});

    // Decode each binary message and generate JSON array
        JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String call(byte[] asn1Data) throws Exception {
                if(asn1Data.length > 0) {
                    try (InputStream inputStream = new ByteArrayInputStream(asn1Data);
                            Writer writer = new StringWriter(); ) {


                        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data);
                        GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);

                        byte[] buffer = new byte[1024];
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

                        int len;
                        while((len = gzipInputStream.read(buffer)) != -1) {
                            byteArrayOutputStream.write(buffer, 0, len);
                        }


                        return new String(byteArrayOutputStream.toByteArray());


                    } catch (Exception e) {
//                      
                        producer.flush();

                        throw e;
                    }
                } 

                return null;
            }
        });




// publish generated json gzip to kafka 
        cache.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
                //Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
                if(!jsonRdd4DF.isEmpty()) {
                    //JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
                    Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);   

                    SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
                    airMainJsonProcessor.processAIRData(json, sparkSession);
                }

            }               
        });

        getJavaStreamingContext().start();
        getJavaStreamingContext().awaitTermination();
        getJavaStreamingContext().stop();

Technology that we are using:

HDFS  2.7.1.2.5 
YARN + MapReduce2  2.7.1.2.5 
ZooKeeper  3.4.6.2.5 
Ambari Infra  0.1.0 
Ambari Metrics  0.1.0 
Kafka  0.10.0.2.5 
Knox  0.9.0.2.5 
Ranger  0.6.0.2.5 
Ranger KMS  0.6.0.2.5 
SmartSense  1.3.0.0-1
Spark2  2.0.x.2.5 

Statistics that we got from difference experimentations:

Experiment 1

num_executors=6
executor_memory=8g
executor_cores=12

100 Files processing time 48 Minutes

Experiment 2

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100 Files processing time 8 Minutes

Experiment 3

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100 Files processing time 7 Minutes

Experiment 4

spark.default.parallelism=16
num_executors=6
executor_memory=8g
executor_cores=12

100 Files processing time 10 Minutes

Please advise, how we can process maximum so no queued.

解决方案

I was facing same issue and I tried a few things in trying to resolve the issue and came to following findings:

First of all. Intuition says that one batch must be processed per executor but on the contrary, only one batch is processed at a time but jobs and tasks are processed in parallel.

Multiple batch processing can be achieved by using spark.streaming.concurrentjobs, but it's not documented and still needs a few fixes. One of problems is with saving Kafka offsets. Suppose we set this parameter to 4 and 4 batches are processed in parallel, what if 3rd batch finishes before 4th one, which Kafka offsets would be committed. This parameter is quite useful if batches are independent.

spark.default.parallelism because of its name is sometimes considered to make things parallel. But its true benefit is in distributed shuffle operations. Try different numbers and find an optimum number for this. You will get a considerable difference in processing time. It depends upon shuffle operations in your jobs. Setting it too high would decrease the performance. It's apparent from you experiments results too.

Another option is to use foreachPartitionAsync in place of foreach on RDD. But I think foreachPartition is better as foreachPartitionAsync would queue up the jobs whereas batches would appear to be processed but their jobs would still be in the queue or in processing. May be I didn't get its usage right. But it behaved same in my 3 services.

FAIR spark.scheduler.mode must be used for jobs with lots of tasks as round-robin assignment of tasks to jobs, gives opportunity to smaller tasks to start receiving resources while bigger tasks are processing.

Try to tune your batch duration+input size and always keep it below processing duration otherwise you're gonna see a long backlog of batches.

These are my findings and suggestions, however, there are so many configurations and methods to do streaming and often one set of operation doesn't work for others. Spark Streaming is all about learning, putting your experience and anticipation together to get to a set of optimum configuration.

Hope it helps. It would be a great relief if someone could tell specifically how we can legitimately process batches in parallel.

这篇关于Spark Streaming:微批量并行执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆