广播中的变量星火计划发布 [英] BroadCast Variable publish in Spark Program
问题描述
在火花 - java程序,我需要读取配置文件并填充一个HashMap,我需要发布广播变量,以便它可以跨越所有的数据节点可用
。我需要在CustomInputFormat类,这是要在数据节点中运行这个广播变量的值。我怎么能在我的CustomInputFormat类指定要获得特定广播变量值,因为广播变量在我的司机程序中声明?
我添加了一些code来解释它更多:
在此scenario1我使用它在驱动程序本身即变量是在同一个类中使用:在这里,我可以使用Broadcat.value()方法
>最后的广播LT;的String []>签署prefixes =
> sc.broadcast(loadCallSignTable());
> JavaPairRDD<字符串,整数> countryContactCounts = contactCounts.mapToPair(
>新PairFunction< Tuple2<字符串,整数>中字符串,整数> (){
>公共Tuple2<字符串,整数>调用(Tuple2<字符串,整数> callSignCount){
>串符号= callSignCount._1();
>字符串国家= lookupCountry(标志,标志prefixes.value());
>返回新Tuple2(国家,callSignCount._2());
> }})reduceByKey(新SumInts())。
在方案2中,我将用我的自定义输入格式类的内部广播变量:
驱动程序的:
>最后JavaSparkContext SC =新
> JavaSparkContext(sConf.setAppName(ParserSpark)setMaster(纱集群));
>广播LT; INT [] GT; broadcastVar = sc.broadcast(新INT [] {1,2,3});
>
> JavaPairRDD< NullWritable,ArrayList的<记录和GT;> baseRDD =
> sc.newAPIHadoopFile(参数[2],InputFormat.class,NullWritable.class,
> ArrayList.class,CONF);
InputFormat.class
>公共类的InputFormat扩展FileInputFormat {
>
> @覆盖公共RecordReader< NullWritable,ArrayList的<记录和GT;>
> createRecordReader(InputSplit分裂,TaskAttemptContext上下文)
>抛出IOException异常,InterruptedException的{
> //我想获得广播可变这里 - 我会怎么做
>
> RecordReader读卡器=新RecordReader(); reader.initialize(分割,背景);返回阅读器; @覆盖
>保护布尔isSplitable(JobContext背景下,路径文件){
>返回false; }}
您将驾驶员创建广播变种W / VAL bcVariable = sc.broadcast(myVariableToBroadcast)
并访问后来瓦特/ bcVariable.value
In the spark - java program I need to read a config file and populate a HashMap , which I need to publish as broadcast variable so that it will be available across all the datanodes .
I need to get the value of this broadcast variable in the CustomInputFormat class which is going to run in the datanodes . How can i specify in my CustomInputFormat class to get value from the specific broadcast variable since the broadcast variable is declared in my driver program ?
I am adding some code to explain it in more :
In this scenario1 I am using it in Driver Program itself ie the variable is used in the same class : Here I can use Broadcat.value() method
> final Broadcast<String[]> signPrefixes =
> sc.broadcast(loadCallSignTable());
> JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
> new PairFunction<Tuple2<String, Integer>, String, Integer> (){
> public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
> String sign = callSignCount._1();
> String country = lookupCountry(sign, signPrefixes.value());
> return new Tuple2(country, callSignCount._2());
> }}).reduceByKey(new SumInts());
In the scenario 2 I am going to use the Broadcast Variable inside my Custom Input Format class :
Driver Program :
> final JavaSparkContext sc= new
> JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster"));
> Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
>
> JavaPairRDD<NullWritable, ArrayList<Record>> baseRDD =
> sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class,
> ArrayList.class, conf);
InputFormat.class
> public class InputFormat extends FileInputFormat {
>
> @Override public RecordReader<NullWritable, ArrayList<Record>>
> createRecordReader(InputSplit split, TaskAttemptContext context)
> throws IOException, InterruptedException{
> //I want to get the Broadcast Variable Here -- How will I do it
>
> RecordReader reader = new RecordReader(); reader.initialize(split, context); return reader; } @Override
> protected boolean isSplitable(JobContext context, Path file) {
> return false; } }
You would create the broadcast var on the driver w/ val bcVariable = sc.broadcast(myVariableToBroadcast)
and access it later w/ bcVariable.value
这篇关于广播中的变量星火计划发布的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!