从数据流中的文本文件处理多行事件 [英] Processing multiline events from a text file in Dataflow

查看:71
本文介绍了从数据流中的文本文件处理多行事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试建立一个数据流管道来处理一个文本文件,该文本文件包含跨越多行的事件.数据流SDK TextIO类假定每一行都是一个新事件.

I am attempting to build a dataflow pipeline to process a text file which contains events that span multiple lines. The dataflow SDK TextIO class assumes each line is a new event.

我的计划是创建一个新的TextReader并将其注册到DataPipelineRunner中.新读者将知道如何将多行汇总为一行.

My plan is to create a new TextReader and register it with the DataPipelineRunner. This new reader will know how to aggregate the multiple lines into a single line.

我很确定这种方法会奏效,但我想知道这是否是正确的方法,或者是否有更简单的解决方案?

I am pretty sure that this approach will work but I am wondering if this is the right way to do it or if there is a simpler solution?

我要解析的文本是:

==============> len:45 pktype:4 mtype:2
SYMBOL: USOCSTIA151632.00
OPEN_INT: 212
PR_OPEN_INTEREST: 212
TIME_STAMP: 04/10/2015 06:30:17:420  val:1428661817

结果应该是将最后4行连接在一起,并将第一行删除.

The result should be the last 4 lines concatenated together and the first line dropped.

最诚挚的问候, 彼得

推荐答案

请注意,TextReader是内部实现详细信息类,因此强烈建议不要对其进行子类化,并且很难正确执行.

Note that TextReader is an internal implementation detail class, so subclassing it would be highly discouraged and challenging to do properly.

定义新的基于文件的格式(如您的格式)的推荐方法是使用

The recommended way to define a new file-based format like yours is to subclass FileBasedSource using the user-defined source API.

在您的情况下,我建议您将类基于文档中的LineIO示例,并将其中定义的LineReader包装到您自己的类中,该类将使用LineReader作为读取单个行的助手,但是:

In your case, I would recommend to base your class on the LineIO example from documentation, and wrap the LineReader defined there into your own class which would use LineReader as a helper for reading individual lines, but:

  • In startReading() it would skip until the line starting with "====>"
  • In readNextRecord() it would read lines until the next "====>" and bundle them into a single record.

请确保仔细阅读FileBasedSource和FileBasedReader的文档:并行化机制依赖于此处描述的一致性属性,您的格式必须满足该一致性属性,以确保记录不会在相邻处理分片之间的边界上重复或省略. . XmlSource测试是如何对这些属性进行单元测试的一个很好的例子.

Please make sure to carefully read the documentation to FileBasedSource and FileBasedReader: the parallelization mechanism relies on the consistency properties described there, which your format has to satisfy, for ensuring that records are not duplicated or omitted on the boundaries between adjacent processing shards. XmlSource tests are a good example of how to unit-test these properties.

请告诉我们进展如何,如有任何问题或疑问,请向我们报告-我们对该API的反馈非常感兴趣.

Please tell us how it goes and report back with any problems or questions - we are very interested in feedback on this API.

这篇关于从数据流中的文本文件处理多行事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆