如何动态将Source添加到现有图? [英] How do I dynamically add Source to existing Graph?
问题描述
动态更改运行图可以替代什么?这是我的情况。我有将文章吸收到数据库中的图。文章来自3个格式不同的插件。因此,我有几个流程
What can be alternative to dynamically changing running graph ? Here is my situation. I have graph that ingests articles into DB. Articles come from 3 plugins in different format. Thus I have several flows
val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]
// These are being created every time I poll plugins
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]
val merged = Source.combine(
sourceContentProvider.via(converterFlow1),
sourceNews.via(converterFlow2),
sourceCit)(Merge(_))
val res = merged
.buffer(10, OverflowStrategy.backpressure)
.toMat(sinkDB)(Keep.both)
.run()
问题是我每24小时从内容提供商那里获取一次数据,每2小时从新闻提供商那里获取一次数据,并且由于来自人类的消息可能随时随地出现。
Problem is that I get data from content provider once per 24 hrs, from news once per 2 hrs and last source may come at any time because it's coming from humans.
我意识到图是不可变的,但是我该如何周期化lly将新的 Source
实例附加到我的图形上,这样我就可以对摄取过程进行单点调节?
I realize that graphs are immutable but how I can periodically attach new instances of Source
to my graph so that I have single point of throttling of the process of ingesting ?
更新:您可以说我的数据是 Source
-s的流,在我的情况下是三个源。但是我无法更改它,因为我从外部类(即所谓的插件)中获得了 Source
的实例。这些插件与我的摄取类无关。我不能将它们组合成一个庞大的类,以拥有单个 Source
。
UPDATE: You can say my data is stream of Source
-s, three sources in my case. But I cannot change that because I get instances of Source
from external classes (so called plugins). These plugins work independently from my ingestion class. I can't combine them into one gigantic class to have single Source
.
推荐答案
好吧,通常,正确的方法是将一连串的源合并为一个源,即从 Source [Source [T,_],Whatever]
到 Source [T,随便]
。可以使用 flatMapConcat
或使用 flatMapMerge
。因此,如果可以获得 Source [Source [Article,NotUsed],NotUsed]
,则可以使用 flatMap * $ c $中的一个c>变体,并获得最终的
Source [Article,NotUsed]
。对您的每个来源都进行此操作(不要双关语),然后您的原始方法应该可以使用。
Okay, in general the correct way would be to join a stream of sources into a single source, i.e. go from Source[Source[T, _], Whatever]
to Source[T, Whatever]
. This can be done with flatMapConcat
or with flatMapMerge
. Therefore, if you can get a Source[Source[Article, NotUsed], NotUsed]
, you can use one of flatMap*
variants and obtain a final Source[Article, NotUsed]
. Do it for each of your sources (no pun intended), and then your original approach should work.
这篇关于如何动态将Source添加到现有图?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!