DataflowAssert未通过TableRow测试 [英] DataflowAssert doesn't pass TableRow test

查看:107
本文介绍了DataflowAssert未通过TableRow测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们不知道为什么运行此简单测试时DataflowAssert失败:

We don't know why when running this simple test, DataflowAssert fails:

  @Test
  @Category(RunnableOnService.class)
  public void testTableRow() throws Exception {
      Pipeline p = TestPipeline.create();
      PCollection<TableRow> pCollectionTable1 = p.apply("a",Create.of(TABLEROWS_ARRAY_1));
      PCollection<TableRow> pCollectionTable2 = p.apply("b",Create.of(TABLEROWS_ARRAY_2));
      PCollection<TableRow> joinedTables = Table.join(pCollectionTable1, pCollectionTable2);
      DataflowAssert.that(joinedTables).containsInAnyOrder(TABLEROW_TEST);
      p.run();
  }

我们收到以下异常:

    Sep 25, 2015 10:42:50 AM com.google.cloud.dataflow.sdk.testing.DataflowAssert$TwoSideInputAssert$CheckerDoFn processElement 
SEVERE: DataflowAssert failed expectations.
 java.lang.AssertionError: 
   Expected: iterable over [<{id=x}>] in any order
     but: Not matched: <{id=x}>
    at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
    at org.junit.Assert.assertThat(Assert.java:865)
    at org.junit.Assert.assertThat(Assert.java:832)
    at ...

为了简化DataflowAssert测试,我们对Table.join的输出进行了硬编码以匹配DataflowAssert,具有:

In order to simplify the DataflowAssert test we hardcoded the output of Table.join to match DataflowAssert,having:

private static final TableRow TABLEROW_TEST = new TableRow()
        .set("id", "x");


static PCollection<TableRow> join(PCollection<TableRow> pCollectionTable1,
        PCollection<TableRow> pCollectionTable2) throws Exception {

    final TupleTag<String> pCollectionTable1Tag = new TupleTag<String>();
    final TupleTag<String> pCollectionTable2Tag = new TupleTag<String>();

    PCollection<KV<String, String>> table1Data = pCollectionTable1
            .apply(ParDo.of(new ExtractTable1DataFn()));
    PCollection<KV<String, String>> table2Data = pCollectionTable2
            .apply(ParDo.of(new ExtractTable2DataFn()));

    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
            .of(pCollectionTable1Tag, table1Data).and(pCollectionTable2Tag, table2Data)
            .apply(CoGroupByKey.<String> create());

    PCollection<KV<String, String>> resultCollection = kvpCollection
            .apply(ParDo.named("Process join")
                    .of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
                        private static final long serialVersionUID = 0;

                        @Override
                        public void processElement(ProcessContext c) {
                            // System.out.println(c);
                            KV<String, CoGbkResult> e = c.element();
                            String key = e.getKey();
                            String value = null;
                            for (String table1Value : c.element().getValue().getAll(pCollectionTable2Tag)) {

                                for (String table2Value : c.element().getValue().getAll(pCollectionTable2Tag)) {
                                    value = table1Value + "," + table2Value;
                                }
                            }
                            c.output(KV.of(key, value));
                        }
                    }));

    PCollection<TableRow> formattedResults = resultCollection.apply(
            ParDo.named("Format join").of(new DoFn<KV<String, String>, TableRow>() {
                private static final long serialVersionUID = 0;

                public void processElement(ProcessContext c) {
                    TableRow row = new TableRow().set("id", "x");
                    c.output(row);                      
                }
            }));

    return formattedResults;
}

有人知道我们在做什么错吗?

Does anyone know what we are doing wrong?

推荐答案

我认为错误消息告诉您实际集合包含该元素的副本多于预期.

I think the error message is telling you that the actual collection contains more copies of that element than the expectation.

Expected: iterable over [<{id=x}>] in any order
 but: Not matched: <{id=x}>

这很棘手,表明您希望对单个元素进行迭代,但是实际的集合中有一个不匹配的项目.由于所有来自格式连接"的项目都具有相同的值,因此这使它比原来更难阅读.

This is hamcrest indicating that you wanted an iterable over a single element, but the actual collection had an item which wasn't matched. Since all of the items coming out of "format join" have the same value, it made this harder to read than it should have been.

具体来说,这是我运行以下测试时生成的消息,该测试检查带有两个row副本的集合是否包含正好是一个row副本:

Specifically, this is the message produced when I run the following test, which checks to see if the collection with two copies of row is the contains exactly one copy of row:

@Category(RunnableOnService.class)
@Test
public void testTableRow() throws Exception {
  Pipeline p = TestPipeline.create();

  TableRow row = new TableRow().set("id", "x");

  PCollection<TableRow> rows = p.apply(Create.<TableRow>of(row, row));
  DataflowAssert.that(rows).containsInAnyOrder(row);

  p.run();
}


为了用您的代码获得该结果,我不得不利用一个事实,即您仅对table2中的条目进行迭代.具体来说:


In order to get that result with your code, I had to take advantage of the fact that you only iterate over entries in table2. Specifically:

// Use these as the input tables.
table1 = [("keyA", "A1a"), ("keyA", "A1b]
table2 = [("keyA", "A2a"), ("keyA", "A2b"), ("keyB", "B2")]

// The CoGroupByKey returns
[("keyA", (["A1a", "A1b"], ["A2a", "A2b"])),
 ("keyB", ([], ["B2"]))]

// When run through "Process join" this produces.
// For details on why see the next section.
["A2b,A2b",
 "B2,B2"]

// When run through "Format join" this becomes the following.
[{id=x}, {id=x}]


请注意,流程联接"的DoFn可能无法产生预期的结果,如下所述:


Note that the DoFn for "Process join" may not produce the expected results as commented below:

String key = e.getKey();
String value = null;
// NOTE: Both table1Value and table2Value iterate over pCollectionTable2Tag
for (String table1Value : c.element().getValue().getAll(pCollectionTable2Tag)) {
    for (String table2Value : c.element().getValue().getAll(pCollectionTable2Tag)) {
        // NOTE: this updates value, and doesn't output it. So for each
        // key there will be a single output with the *last* value
        // rather than one for each pair.
        value = table1Value + "," + table2Value;
    }
}
c.output(KV.of(key, value));

这篇关于DataflowAssert未通过TableRow测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆