在DirectPipelineRunner上使用自定义DataFlow无限制源 [英] Using custom DataFlow unbounded source on DirectPipelineRunner
问题描述
我正在编写一个自Kafka 0.8读取的自定义DataFlow无界数据源.我想使用DirectPipelineRunner在本地运行它.但是,我得到了以下堆栈跟踪:
Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)
这很有道理,因为我没有在任何时候为自己的自定义来源注册评估程序.
阅读 https://github.com/GoogleCloudPlatform/DataflowJavaSDK ,似乎只有 bounded 源已注册.为自定义无限制源定义和注册评估程序的推荐方法是什么?
DirectPipelineRunner
当前仅在有界输入上运行.我们正在积极致力于消除此限制,并希望不久后将其释放.
同时,您可以使用withMaxNumRecords
将任何UnboundedSource
转换为BoundedSource
,以进行测试,如下例所示:
UnboundedSource<String> unboundedSource = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
p.apply(Read.from(unboundedSource).withMaxNumRecords(10));
有关更多详细信息,请参见此问题在GitHub上.
另外,在开发Kafka连接器方面有数项努力.您可能希望通过我们的GitHub存储库与我们和其他贡献者联系. >
I'm writing a custom DataFlow unbounded data source that reads from Kafka 0.8. I'd like to run it locally using the DirectPipelineRunner. However, I'm getting the following stackstrace:
Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)
Which makes some sense, as I haven't registered an evaluator for my custom source at any time.
Reading https://github.com/GoogleCloudPlatform/DataflowJavaSDK, it seems like only evaluators for bounded sources are registered. What's the recommended way to define and register an evaluator for an custom unbounded source?
DirectPipelineRunner
currently runs over bounded input only. We are actively working on removing this restriction, and expect to release it shortly.
In the meanwhile, you can trivially turn any UnboundedSource
into a BoundedSource
, for testing purposes, by using withMaxNumRecords
, as in the following example:
UnboundedSource<String> unboundedSource = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
p.apply(Read.from(unboundedSource).withMaxNumRecords(10));
See this issue on GitHub for more details.
Separately, there are several efforts on contributing the Kafka connector. You may want to engage with us and other contributors about that via our GitHub repository.
这篇关于在DirectPipelineRunner上使用自定义DataFlow无限制源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!