一种从Apache Beam中的有界源定期执行管道的方法 [英] A way to execute pipeline periodically from bounded source in Apache Beam

查看:110
本文介绍了一种从Apache Beam中的有界源定期执行管道的方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个管道,该管道从MySQl服务器获取数据,然后使用DataFlow Runner将其插入数据存储区. 作为一次执行的批处理作业,它可以正常工作.问题是我想将新数据从MySQL服务器几乎实时地获取到数据存储区中,但是JdbcIO会将有限数据作为源(因为这是查询的结果),因此我的管道仅执行一次. /p>

我是否必须每30秒执行一次管道并重新提交一个Dataflow作业? 还是有一种方法可以使管道自动重做而无需提交其他作业?

它与主题运行定期Dataflow作业类似,但是我找不到CountingInput班级.我以为GenerateSequence类可能已更改,但我不太了解如何使用它.

任何帮助都将受到欢迎!

解决方案

这是可能的,您可以通过几种方法来解决.这取决于数据库的结构以及是否允许有效地查找自上次同步以来出现的新元素.例如,您的元素有插入时间戳吗?您能负担得起MySQL中的另一个表,其中包含已保存到数据存储区的最后一个时间戳吗?

  • 实际上,您可以使用GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)),它会为您提供一个PCollection<Long>,每秒向其中发射1个元素.您可以使用ParDo(或更复杂的转换链)在该PCollection上执行必要的定期同步.您可能会发现JdbcIO.readAll()很方便,因为它可以使用PCollection的查询参数,因此每次出现在PCollection中的新元素时都可以触发.

  • 如果MySql中的数据量不是那么大(最多,大约数十万条记录),则可以使用Watch.growthOf()转换来连续轮询整个数据库(使用常规JDBC API)并发出新的元素.

也就是说,安德鲁建议的(除了发布Pubsub之外还发送记录)也是一种非常有效的方法.

I have a pipeline taking data from a MySQl server and inserting into a Datastore using DataFlow Runner. It works fine as a batch job executing once. The thing is that I want to get the new data from the MySQL server in near real-time into the Datastore but the JdbcIO gives bounded data as source (as it is the result of a query) so my pipeline is executing only once.

Do I have to execute the pipeline and resubmit a Dataflow job every 30 seconds? Or is there a way to make the pipeline redoing it automatically without having to submit another job?

It is similar to the topic Running periodic Dataflow job but I can not find the CountingInput class. I thought that maybe it changed for the GenerateSequence class but I don't really understand how to use it.

Any help would be welcome!

解决方案

This is possible and there's a couple ways you can go about it. It depends on the structure of your database and whether it admits efficiently finding new elements that appeared since the last sync. E.g., do your elements have an insertion timestamp? Can you afford to have another table in MySQL containing the last timestamp that has been saved to Datastore?

  • You can, indeed, use GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)) that will give you a PCollection<Long> into which 1 element per second is emitted. You can piggyback on that PCollection with a ParDo (or a more complex chain of transforms) that does the necessary periodic synchronization. You may find JdbcIO.readAll() handy because it can take a PCollection of query parameters and so can be triggered every time a new element in a PCollection appears.

  • If the amount of data in MySql is not that large (at most, something like hundreds of thousands of records), you can use the Watch.growthOf() transform to continually poll the entire database (using regular JDBC APIs) and emit new elements.

That said, what Andrew suggested (emitting records additionally to Pubsub) is also a very valid approach.

这篇关于一种从Apache Beam中的有界源定期执行管道的方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆