广播中的变量星火计划发布 [英] BroadCast Variable publish in Spark Program

查看:228
本文介绍了广播中的变量星火计划发布的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在火花 - java程序,我需要读取配置文件并填充一个HashMap,我需要发布广播变量,以便它可以跨越所有的数据节点可用

我需要在CustomInputFormat类,这是要在数据节点中运行这个广播变量的值。我怎么能在我的CustomInputFormat类指定要获得特定广播变量值,因为广播变量在我的司机程序中声明?

我添加了一些code来解释它更多:

在此scenario1我使用它在驱动程序本身即变量是在同一个类中使用:在这里,我可以使用Broadcat.value()方法

 >最后的广播LT;的String []>签署prefixes =
> sc.broadcast(loadCallSignTable());
> JavaPairRDD<字符串,整数> countryContactCounts = contactCounts.mapToPair(
>新PairFunction< Tuple2<字符串,整数>中字符串,整数> (){
>公共Tuple2<字符串,整数>调用(Tuple2<字符串,整数> callSignCount){
>串符号= callSignCount._1();
>字符串国家= lookupCountry(标志,标志prefixes.value());
>返回新Tuple2(国家,callSignCount._2());
> }})reduceByKey(新SumInts())。

在方案2中,我将用我的自定义输入格式类的内部广播变量:

驱动程序的:

 >最后JavaSparkContext SC =新
> JavaSparkContext(sConf.setAppName(ParserSpark)setMaster(纱集群));
>广播LT; INT [] GT; broadcastVar = sc.broadcast(新INT [] {1,2,3});
>
> JavaPairRDD< NullWritable,ArrayList的<记录和GT;> baseRDD =
> sc.newAPIHadoopFile(参数[2],InputFormat.class,NullWritable.class,
> ArrayList.class,CONF);

InputFormat.class

 >公共类的InputFormat扩展FileInputFormat {
>
> @覆盖公共RecordReader< NullWritable,ArrayList的<记录和GT;>
> createRecordReader(InputSplit分裂,TaskAttemptContext上下文)
>抛出IOException异常,InterruptedException的{
> //我想获得广播可变这里 - 我会怎么做
>
> RecordReader读卡器=新RecordReader(); reader.initialize(分割,背景);返回阅读器; @覆盖
>保护布尔isSplitable(JobContext背景下,路径文件){
>返回false; }}


解决方案

您将驾驶员创建广播变种W / VAL bcVariable = sc.broadcast(myVariableToBroadcast)并访问后来瓦特/ bcVariable.value

In the spark - java program I need to read a config file and populate a HashMap , which I need to publish as broadcast variable so that it will be available across all the datanodes .

I need to get the value of this broadcast variable in the CustomInputFormat class which is going to run in the datanodes . How can i specify in my CustomInputFormat class to get value from the specific broadcast variable since the broadcast variable is declared in my driver program ?

I am adding some code to explain it in more :

In this scenario1 I am using it in Driver Program itself ie the variable is used in the same class : Here I can use Broadcat.value() method

> final Broadcast<String[]> signPrefixes =
> sc.broadcast(loadCallSignTable());
>     JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
>       new PairFunction<Tuple2<String, Integer>, String, Integer> (){
>         public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
>           String sign = callSignCount._1();
>           String country = lookupCountry(sign, signPrefixes.value());
>           return new Tuple2(country, callSignCount._2());
>         }}).reduceByKey(new SumInts());

In the scenario 2 I am going to use the Broadcast Variable inside my Custom Input Format class :

Driver Program :

> final JavaSparkContext sc=    new
> JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster"));
> Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
> 
> JavaPairRDD<NullWritable, ArrayList<Record>> baseRDD =
> sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class,
> ArrayList.class, conf);

InputFormat.class

> public class InputFormat extends  FileInputFormat {
> 
>   @Override   public RecordReader<NullWritable, ArrayList<Record>> 
>   createRecordReader(InputSplit split,            TaskAttemptContext context)
> throws IOException,           InterruptedException{
>       //I want to get the Broadcast Variable Here -- How will I do it 
>       
>         RecordReader reader = new RecordReader();         reader.initialize(split, context);      return reader;  }   @Override
>   protected boolean isSplitable(JobContext context, Path file) {
>       return false;    } }

解决方案

You would create the broadcast var on the driver w/ val bcVariable = sc.broadcast(myVariableToBroadcast) and access it later w/ bcVariable.value

这篇关于广播中的变量星火计划发布的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆