Apache Beam 根据上一行的值更新当前行值 [英] Apache Beam update current row values based on the values from previous row

查看:33
本文介绍了Apache Beam 根据上一行的值更新当前行值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Apache Beam 根据上一行的值更新值

Apache Beam update values based on the values from the previous row

我已将 CSV 文件中的值分组.在分组的行中,我们发现一些缺失值需要根据前一行的值进行更新.如果该行的第一列为空,则需要将其更新为 0.

I have grouped the values from a CSV file. Here in the grouped rows, we find a few missing values which need to be updated based on the values from the previous row. If the first column of the row is empty then we need to update it by 0.

我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?

I am able to group the records, But unable to figure out a logic to update the values, How do I achieve this?

记录

<头>
客户 ID日期金额
BS:894811/1/2012100
BS:894821/1/2012
BS:894831/1/2012300
BS:894811/2/2012900
BS:894821/2/2012200
BS:894831/2/2012

分组记录

<头>
客户 ID日期金额
BS:894811/1/2012100
BS:894811/2/2012900
BS:894821/1/2012
BS:894821/2/2012200
BS:894831/1/2012300
BS:894831/2/2012

更新缺失值

<头>
客户 ID日期金额
BS:894811/1/2012100
BS:894811/2/2012900
BS:894821/1/2012000
BS:894821/2/2012200
BS:894831/1/2012300
BS:894831/2/2012300

到目前为止的代码:

public class GroupByTest {
    public static void main(String[] args) throws IOException {
        System.out.println("We are about to start!!");

        final File schemaFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\schema_transform2.avsc");

        File csvFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\CustomerRequest-case2.csv");
        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions 
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows
        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

        // Transformation
        //Convert row to KV
        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
            .aggregateField("balance", Sum.ofDoubles(), "balances");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
        
                        
        
        pipeline.run().waitUntilFinish();
        System.out.println("The end");

    }

    private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
        String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
        LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
        if (logicalType != null) {
            type = logicalType.getName();
        }

        switch (type) {
        case "string":
            return row.getString(columnName);
        case "int":
            return Objects.requireNonNull(row.getInt32(columnName)).toString();
        case "bigint":
            return Objects.requireNonNull(row.getInt64(columnName)).toString();
        case "double":
            return Objects.requireNonNull(row.getDouble(columnName)).toString();
        case "timestamp-millis":
            return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

        default:
            return row.getString(columnName);

        }
    }



}

修改后的代码:原始代码

final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
        .aggregateField("amount", Sum.ofDoubles(), "balances");

按客户 ID 分组

class ToKV extends DoFn<Row, KV<String, Row>> {

    private static final long serialVersionUID = -8093837716944809689L;
    String columnName1 = null;

    @ProcessElement
    public void processElement(ProcessContext context) {
        Row row = context.element();
        org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
        context.output(KV.of(row.getValue(columnName1).toString(), row));
    }

    public void setColumnName1(String columnName1) {
        this.columnName1 = columnName1;
    }


}

按客户 ID 分组:

ToKV toKV = new ToKV();
toKV.setColumnName1("ID");
PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
    
    
PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());

//尝试按日期分组

    PCollection<Row> outputRow = 
            groupedKVRows
            .apply(ParDo.of(new GroupByDate()))
            .setCoder(RowCoder.of(AvroUtils.toBeamSchema(schema)));

如何编写将 Iterable 转换为 pCollection 的逻辑,以便对日期进行排序.

How to write the logic to convert Iterable to pCollection so that the date can be sorted.

class GroupByDate extends DoFn<KV<String,Iterable<Row>>, Row> {

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        String strKey = context.element().getKey();
        Iterable<Row> rows = context.element().getValue();
        
    
        
        
    }

Avro 架构:

{
  "type" : "record",
  "name" : "Entry",
  "namespace" : "transform",
  "fields" : [  {
    "name" : "customerId",
    "type" : [ "string", "null" ]
  }, {
    "name" : "date",
    "type" : [ "string", "null" ],
    "logicalType": "date"
    
  }, {
    "name" : "amount",
    "type" : [ "double", "null" ]
  } ]
}

更新以将 PCollection 转换为 Row[]

class KVToRow extends DoFn<KV<String, Iterable<Row>>, Row[]> {

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        String strKey = context.element().getKey();
        List<Row> rowList = new ArrayList();
        Iterable<Row> rowValue = context.element().getValue();
        rowValue.forEach(data -> {
            rowList.add(data);

        });
        Row[] rowArray = new Row[rowList.size()-1];
        rowArray=rowList.toArray(rowArray);
        context.output(rowArray);
    }
}

建议代码

Row[] rowArray = Iterables.toArray(rows, Row.class);

错误:

Iterables 类型中的 toArray(Iterable, Class) 方法不适用于参数 (PCollection, Class)

The method toArray(Iterable<? extends T>, Class) in the type Iterables is not applicable for the arguments (PCollection, Class)

将可迭代对象转换为数组

Row[] rowArray =  groupedKVRows.apply(ParDo.of(new KVToRow()));

错误:

此行有多个标记- 类型不匹配:无法从 PCollection 转换到行[]- 1 行更改,2 行删除

Multiple markers at this line - Type mismatch: cannot convert from PCollection<Row[]> to Row[] - 1 changed line, 2 deleted

推荐答案

Beam 不提供任何顺序保证,因此您必须像以前一样将它们分组.

Beam does not provide any order guarantees, so you will have to group them as you did.

但据我所知,您需要按 customerId 分组.之后,您可以应用像 ParDo 这样的 PTransform 按 date 对分组的行进行排序,并根据需要填充缺失值.

But as far as I can understand from your case, you need to group by customerId. After that, you can apply a PTransform like ParDo to sort the grouped Rows by date and fill missing values however you wish.

通过转换为数组的排序示例

// Create a formatter for parsing dates
DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");

// Convert iterable to array
Row[] rowArray = Iterables.toArray(rows, Row.class);

// Sort array using dates
Arrays
    .sort(
        rowArray,
        Comparator
            .comparingLong(row -> formatter.parseDateTime(row.getString("date")).getMillis())
    );

// Store the last amount
Double lastAmount = 0.0;

// Iterate over the array and fill in the missing parts
for (Row row : rowArray) {

    // Get current amount
    Double currentAmount = row.getDouble("amount");

    // If null, fill the previous value
    ...
}

这篇关于Apache Beam 根据上一行的值更新当前行值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆