在Java Spark中使用withcolumn遍历不同的列 [英] Iterate over different columns using withcolumn in Java Spark

查看:437
本文介绍了在Java Spark中使用withcolumn遍历不同的列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须根据 List< Row> 中的某些规则修改 Dataset< Row> .我想使用 Dataset.withColumn(...)遍历 Datset< Row> 列,如下面的示例所示:

I have to modify a Dataset<Row> according to some rules that are in a List<Row>. I want to iterate over the Datset<Row> columns using Dataset.withColumn(...) as seen in the next example:

(import necesary libraries...)

SparkSession spark = SparkSession
                .builder()
                .appName("appname")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

Dataset<Row> dfToModify = spark.read().table("TableToModify");

List<Row> ListListWithInfo = new ArrayList<>(Arrays.asList());

ListWithInfo.add(0,RowFactory.create("field1", "input1", "output1", "conditionAux1"));
ListWithInfo.add(1,RowFactory.create("field1", "input1", "output1", "conditionAux2"));
ListWithInfo.add(2,RowFactory.create("field1", "input2", "output3", "conditionAux3"));
ListWithInfo.add(3,RowFactory.create("field2", "input3", "output4", "conditionAux4"));
.
.
.

for (Row row : ListWithInfo) {

            String field = row.getString(0);
            String input = row.getString(1);
            String output = row.getString(2);
            String conditionAux = row.getString(3);

            dfToModify = dfToModify.withColumn(field,
                                    when(dfToModify.col(field).equalTo(input)
                                    .and(dfToModify.col("conditionAuxField").equalTo(conditionAux))
                                    ,output)
                                    .otherwise(dfToModify.col(field)));

        }

代码确实可以正常工作,但是当列表中有50个以上的规则"时,程序将无法完成,并且此输出将显示在屏幕中:

The code does works as it should, but when there are more than 50 "rules" in the List, the program doesn't finish and this output is shown in the screen:

0/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1653
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1650
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1635
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1641
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1645
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1646
20/01/27 17:48:18 INFO storage.BlockManagerInfo: Removed broadcast_113_piece0 on **************** in memory (size: 14.5 KB, free: 3.0 GB)
20/01/27 17:48:18 INFO storage.BlockManagerInfo: Removed broadcast_113_piece0 on ***************** in memory (size: 14.5 KB, free: 3.0 GB)
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1639
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1649
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1651
20/01/27 17:49:18 INFO spark.ExecutorAllocationManager: Request to remove executorIds: 6
20/01/27 17:49:18 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 6
20/01/27 17:49:18 INFO cluster.YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 6
20/01/27 17:49:18 INFO spark.ExecutorAllocationManager: Removing executor 6 because it has been idle for 60 seconds (new desired total will be 0)
20/01/27 17:49:19 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:19 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 6.
20/01/27 17:49:19 INFO scheduler.DAGScheduler: Executor lost: 6 (epoch 0)
20/01/27 17:49:19 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:19 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 6 from BlockManagerMaster.
20/01/27 17:49:19 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(6, *********************, 43387, None)
20/01/27 17:49:19 INFO storage.BlockManagerMaster: Removed 6 successfully in removeExecutor
20/01/27 17:49:19 INFO cluster.YarnScheduler: Executor 6 on **************** killed by driver.
20/01/27 17:49:19 INFO spark.ExecutorAllocationManager: Existing executor 6 has been removed (new total is 0)
20/01/27 17:49:20 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:21 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:22 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
.
.
.
.

有什么方法可以使用Java Spark使其更高效?(不使用for循环或类似方法)

Is there any way to make it more efficient using Java Spark? (without using for loop or something similar)

推荐答案

最后,我使用了 Dataset< Row> 对象的withColumns方法.此方法需要两个参数:

Finally I used withColumns method of Dataset<Row> objet. This method need two arguments:

.withColumns(Seq< String> ColumnsNames,Seq< Column> ColumnsValues);

并且在 Seq< String> 中不能重复.

代码如下:


SparkSession spark = SparkSession
                .builder()
                .appName("appname")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

Dataset<Row> dfToModify = spark.read().table("TableToModify");

List<Row> ListListWithInfo = new ArrayList<>(Arrays.asList());

ListWithInfo.add(0,RowFactory.create("field1", "input1", "output1", "conditionAux1"));
ListWithInfo.add(1,RowFactory.create("field1", "input1", "output1", "conditionAux2"));
ListWithInfo.add(2,RowFactory.create("field1", "input2", "output3", "conditionAux3"));
ListWithInfo.add(3,RowFactory.create("field2", "input3", "output4", "conditionAux4"));
.
.
.
// initialize values for fields and conditions
String field_ant = ListWithInfo.get(0).getString(0).toLowerCase();
String first_input = ListWithInfo.get(0).getString(1);
String first_output = ListWithInfo.get(0).getString(2);
String first_conditionAux = ListWithInfo.get(0).getString(3);
Column whenColumn = when(dfToModify.col(field_ant).equalTo(first_input)
                .and(dfToModify.col("conditionAuxField").equalTo(lit(first_conditionAux)))
                ,first_output);

// lists with the names of the fields and the conditions        
List<Column> whenColumnList = new ArrayList(Arrays.asList());
List<String> fieldsNameList = new ArrayList(Arrays.asList());

for (Row row : ListWithInfo.subList(1,ListWithInfo.size())) {

            String field = row.getString(0);
            String input = row.getString(1);
            String output = row.getString(2);
            String conditionAux = row.getString(3);

           if (field.equals(field_ant)) {
                 // if field is equals to fiel_ant the new condition is added to the previous one
                whenColumn = whenColumn.when(dfToModify.col(field).equalTo(input)
                        .and(dfToModify.col("conditionAuxField").equalTo(lit(conditionAux)))
                        ,output);
            } else {
                // if field is diferent to the previous:
                // close the conditions for this field
                whenColumn = whenColumn.otherwise(dfToModify.col(field_ant));

                // add to the lists the field(String) and the conditions (columns)
                whenColumnList.add(whenColumn);
                fieldsNameList.add(field_ant);

                // and initialize the conditions for the new field
                whenColumn = when(dfToModify.col(field).equalTo(input)
                                .and(dfToModify.col("branchField").equalTo(lit(branch)))
                        ,output);
            }

            field_ant = field;

        }

// add last values
whenColumnList.add(whenColumn);
fieldsNameList.add(field_ant);

// transform list to Seq
Seq<Column> whenColumnSeq = JavaConversions.asScalaBuffer(whenColumnList).seq();
Seq<String> fieldsNameSeq = JavaConversions.asScalaBuffer(fieldsNameList).seq();

Dataset<Row>  dfModified = dfToModify.withColumns(fieldsNameSeq, whenColumnSeq);

这篇关于在Java Spark中使用withcolumn遍历不同的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆