如何使用数据流在嵌套数组中加载数据 [英] How to load data in nested array using dataflow
问题描述
我正在尝试将数据加载到下表中.我能够将数据加载到"array_data"中. 但是如何将数据加载到嵌套数组"inside_array"中.我已经尝试了注释部分将数据加载到inside_array数组中,但是没有用. 在此处输入图片描述
I am trying to load the data into below table. I am able to load the data in "array_data". But how to load the data in nested array "inside_array".I have tried the commented part to load the data in inside_array array but it did not work. enter image description here
这是我的代码.- 管道p = Pipeline.create(options);
Here is my code.- Pipeline p = Pipeline.create(options);
org.apache.beam.sdk.values.PCollection<TableRow> output = p.apply(org.apache.beam.sdk.transforms.Create.of("temp"))
.apply("O/P",ParDo.of(new DoFn<String, TableRow>() {
/**
*
*/
private static final long serialVersionUID = 307542945272055650L;
@ProcessElement
public void processElemet(ProcessContext c) {
TableRow row = new TableRow();
row.set("name","Jack");
row.set("phone","9874563210");
TableRow ip = new TableRow().set("address", "M G Road").set("email","abc@gmail.com");
TableRow ip1 = new TableRow().set("address","F C Road").set("email","xyz@gmail.com");
java.util.List<TableRow> metadata = new ArrayList<TableRow>();
metadata.add(ip);
metadata.add(ip1);
row.set("array_data",metadata);
LOG.info("O/P:"+row);
c.output(row);
}}));
output.apply("Write to table",BigQueryIO.writeTableRows().withoutValidation().to("AA.nested_array")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run();
任何人都有任何线索或建议.谢谢.
Anyone has any clue or suggestion.Thanks in advance.
推荐答案
要使用数据流处理嵌套数组,请创建一个单独的List并将其添加到tablerow的主数组中.
To Handle the nested array using dataflow create a seprate List and add it into your main array of tablerow.
我在这里尝试过这种方式,并且得到了预期的输出.
Here I tried this way and I got the expected output.
管道p = Pipeline.create(options); org.apache.beam.sdk.values.PCollection输出= p.apply(org.apache.beam.sdk.transforms.Create.of("temp")) .apply("O/P",ParDo.of(new DoFn< String,TableRow>(){
Pipeline p = Pipeline.create(options); org.apache.beam.sdk.values.PCollection output = p.apply(org.apache.beam.sdk.transforms.Create.of("temp")) .apply("O/P",ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElemet(ProcessContext c) {
TableRow row = new TableRow();
row.set("name","Jack");
row.set("phone","9874563210");
List<TableRow> listDest = new ArrayList<>();
TableRow t=new TableRow().set("detail1","one" ).set("detail2", "two");
TableRow t1=new TableRow().set("detail1","three" ).set("detail2", "four");
listDest.add(t);
listDest.add(t1);
TableRow ip = new TableRow().set("address", "M G Road").set("email","abc@gmail.com").set("inside_array", listDest);
TableRow ip1 = new TableRow().set("address","F C Road").set("email","xyz@gmail.com").set("inside_array", listDest);
java.util.List<TableRow> metadata = new ArrayList<TableRow>();
metadata.add(ip);
metadata.add(ip1);
row.set("array_data",metadata);
LOG.info("O/P:"+row);
c.output(row);
}}));
还添加了带有数据的表格图像.
Adding the image of table with data as well.
希望如果有人在使用相同类型的桌子,这将很有帮助.
hope It will helpful if anyone is working on the same kind of table.
这篇关于如何使用数据流在嵌套数组中加载数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!