如何将 Source 动态添加到现有 Graph? [英] How do I dynamically add Source to existing Graph?
问题描述
有什么可以替代动态改变运行图?这是我的情况.我有将文章摄取到数据库中的图表.文章来自 3 个不同格式的插件.因此我有几个流程
What can be alternative to dynamically changing running graph ? Here is my situation. I have graph that ingests articles into DB. Articles come from 3 plugins in different format. Thus I have several flows
val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]
// These are being created every time I poll plugins
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]
val merged = Source.combine(
sourceContentProvider.via(converterFlow1),
sourceNews.via(converterFlow2),
sourceCit)(Merge(_))
val res = merged
.buffer(10, OverflowStrategy.backpressure)
.toMat(sinkDB)(Keep.both)
.run()
问题是我每 24 小时从内容提供商那里获取一次数据,每 2 小时从新闻获取一次数据,最后一个来源可能随时出现,因为它来自人类.
Problem is that I get data from content provider once per 24 hrs, from news once per 2 hrs and last source may come at any time because it's coming from humans.
我意识到图是不可变的,但是我如何定期将 Source
的新实例附加到我的图中,以便我对摄取过程进行单点节流?
I realize that graphs are immutable but how I can periodically attach new instances of Source
to my graph so that I have single point of throttling of the process of ingesting ?
更新:你可以说我的数据是 Source
-s 的流,在我的例子中是三个来源.但我无法改变它,因为我从外部类(所谓的插件)获得了 Source
的实例.这些插件独立于我的摄取类工作.我无法将它们组合成一个巨大的类来拥有单个 Source
.
UPDATE: You can say my data is stream of Source
-s, three sources in my case. But I cannot change that because I get instances of Source
from external classes (so called plugins). These plugins work independently from my ingestion class. I can't combine them into one gigantic class to have single Source
.
推荐答案
好的,一般来说,正确的方法是将源流合并为一个源,即从 Source[Source[T, _], 随便]
到 Source[T, 随便]
.这可以通过 flatMapConcat
或flatMapMerge
.因此,如果你能得到一个 Source[Source[Article, NotUsed], NotUsed]
,你可以使用 flatMap*
变体之一并获得最终的
Okay, in general the correct way would be to join a stream of sources into a single source, i.e. go from Source[Source[T, _], Whatever]
to Source[T, Whatever]
. This can be done with flatMapConcat
or with flatMapMerge
. Therefore, if you can get a Source[Source[Article, NotUsed], NotUsed]
, you can use one of flatMap*
variants and obtain a final Source[Article, NotUsed]
. Do it for each of your sources (no pun intended), and then your original approach should work.
这篇关于如何将 Source 动态添加到现有 Graph?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!