一种从Apache Beam中的有界源定期执行管道的方法 [英] A way to execute pipeline periodically from bounded source in Apache Beam
问题描述
我有一个管道,该管道从MySQl服务器获取数据,然后使用DataFlow Runner将其插入数据存储区. 作为一次执行的批处理作业,它可以正常工作.问题是我想将新数据从MySQL服务器几乎实时地获取到数据存储区中,但是JdbcIO会将有限数据作为源(因为这是查询的结果),因此我的管道仅执行一次. /p>
我是否必须每30秒执行一次管道并重新提交一个Dataflow作业? 还是有一种方法可以使管道自动重做而无需提交其他作业?
它与主题运行定期Dataflow作业类似,但是我找不到CountingInput班级.我以为GenerateSequence类可能已更改,但我不太了解如何使用它.
任何帮助都将受到欢迎!
这是可能的,您可以通过几种方法来解决.这取决于数据库的结构以及是否允许有效地查找自上次同步以来出现的新元素.例如,您的元素有插入时间戳吗?您能负担得起MySQL中的另一个表,其中包含已保存到数据存储区的最后一个时间戳吗?
-
实际上,您可以使用
GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1))
,它会为您提供一个PCollection<Long>
,每秒向其中发射1个元素.您可以使用ParDo
(或更复杂的转换链)在该PCollection
上执行必要的定期同步.您可能会发现JdbcIO.readAll()
很方便,因为它可以使用PCollection
的查询参数,因此每次出现在PCollection
中的新元素时都可以触发. -
如果MySql中的数据量不是那么大(最多,大约数十万条记录),则可以使用
Watch.growthOf()
转换来连续轮询整个数据库(使用常规JDBC API)并发出新的元素.
也就是说,安德鲁建议的(除了发布Pubsub之外还发送记录)也是一种非常有效的方法.
I have a pipeline taking data from a MySQl server and inserting into a Datastore using DataFlow Runner. It works fine as a batch job executing once. The thing is that I want to get the new data from the MySQL server in near real-time into the Datastore but the JdbcIO gives bounded data as source (as it is the result of a query) so my pipeline is executing only once.
Do I have to execute the pipeline and resubmit a Dataflow job every 30 seconds? Or is there a way to make the pipeline redoing it automatically without having to submit another job?
It is similar to the topic Running periodic Dataflow job but I can not find the CountingInput class. I thought that maybe it changed for the GenerateSequence class but I don't really understand how to use it.
Any help would be welcome!
This is possible and there's a couple ways you can go about it. It depends on the structure of your database and whether it admits efficiently finding new elements that appeared since the last sync. E.g., do your elements have an insertion timestamp? Can you afford to have another table in MySQL containing the last timestamp that has been saved to Datastore?
You can, indeed, use
GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1))
that will give you aPCollection<Long>
into which 1 element per second is emitted. You can piggyback on thatPCollection
with aParDo
(or a more complex chain of transforms) that does the necessary periodic synchronization. You may findJdbcIO.readAll()
handy because it can take aPCollection
of query parameters and so can be triggered every time a new element in aPCollection
appears.If the amount of data in MySql is not that large (at most, something like hundreds of thousands of records), you can use the
Watch.growthOf()
transform to continually poll the entire database (using regular JDBC APIs) and emit new elements.
That said, what Andrew suggested (emitting records additionally to Pubsub) is also a very valid approach.
这篇关于一种从Apache Beam中的有界源定期执行管道的方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!