在DirectPipelineRunner上使用自定义DataFlow无限制源 [英] Using custom DataFlow unbounded source on DirectPipelineRunner

查看:53
本文介绍了在DirectPipelineRunner上使用自定义DataFlow无限制源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个自Kafka 0.8读取的自定义DataFlow无界数据源.我想使用DirectPipelineRunner在本地运行它.但是,我得到了以下堆栈跟踪:

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
        at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
        at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)

这很有道理,因为我没有在任何时候为自己的自定义来源注册评估程序.

阅读 https://github.com/GoogleCloudPlatform/DataflowJavaSDK ,似乎只有 bounded 源已注册.为自定义无限制源定义和注册评估程序的推荐方法是什么?

解决方案

DirectPipelineRunner当前仅在有界输入上运行.我们正在积极致力于消除此限制,并希望不久后将其释放.

同时,您可以使用withMaxNumRecords将任何UnboundedSource转换为BoundedSource,以进行测试,如下例所示:

UnboundedSource<String> unboundedSource  = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
    p.apply(Read.from(unboundedSource).withMaxNumRecords(10));

有关更多详细信息,请参见此问题在GitHub上.


另外,在开发Kafka连接器方面有数项努力.您可能希望通过我们的GitHub存储库与我们和其他贡献者联系. >

I'm writing a custom DataFlow unbounded data source that reads from Kafka 0.8. I'd like to run it locally using the DirectPipelineRunner. However, I'm getting the following stackstrace:

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
        at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
        at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)

Which makes some sense, as I haven't registered an evaluator for my custom source at any time.

Reading https://github.com/GoogleCloudPlatform/DataflowJavaSDK, it seems like only evaluators for bounded sources are registered. What's the recommended way to define and register an evaluator for an custom unbounded source?

解决方案

DirectPipelineRunner currently runs over bounded input only. We are actively working on removing this restriction, and expect to release it shortly.

In the meanwhile, you can trivially turn any UnboundedSource into a BoundedSource, for testing purposes, by using withMaxNumRecords, as in the following example:

UnboundedSource<String> unboundedSource  = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
    p.apply(Read.from(unboundedSource).withMaxNumRecords(10));

See this issue on GitHub for more details.


Separately, there are several efforts on contributing the Kafka connector. You may want to engage with us and other contributors about that via our GitHub repository.

这篇关于在DirectPipelineRunner上使用自定义DataFlow无限制源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆