如何动态将Source添加到现有图? [英] How do I dynamically add Source to existing Graph?

查看:136
本文介绍了如何动态将Source添加到现有图?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

动态更改运行图可以替代什么?这是我的情况。我有将文章吸收到数据库中的图。文章来自3个格式不同的插件。因此,我有几个流程

What can be alternative to dynamically changing running graph ? Here is my situation. I have graph that ingests articles into DB. Articles come from 3 plugins in different format. Thus I have several flows

val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]

// These are being created every time I poll plugins    
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]

val merged = Source.combine(
    sourceContentProvider.via(converterFlow1),
    sourceNews.via(converterFlow2),
    sourceCit)(Merge(_))

val res = merged
  .buffer(10, OverflowStrategy.backpressure)
  .toMat(sinkDB)(Keep.both)
  .run()

问题是我每24小时从内容提供商那里获取一次数据,每2小时从新闻提供商那里获取一次数据,并且由于来自人类的消息可能随时随地出现。

Problem is that I get data from content provider once per 24 hrs, from news once per 2 hrs and last source may come at any time because it's coming from humans.

我意识到图是不可变的,但是我该如何周期化lly将新的 Source 实例附加到我的图形上,这样我就可以对摄取过程进行单点调节?

I realize that graphs are immutable but how I can periodically attach new instances of Source to my graph so that I have single point of throttling of the process of ingesting ?

更新:您可以说我的数据是 Source -s的流,在我的情况下是三个源。但是我无法更改它,因为我从外部类(即所谓的插件)中获得了 Source 的实例。这些插件与我的摄取类无关。我不能将它们组合成一个庞大的类,以拥有单个 Source

UPDATE: You can say my data is stream of Source-s, three sources in my case. But I cannot change that because I get instances of Source from external classes (so called plugins). These plugins work independently from my ingestion class. I can't combine them into one gigantic class to have single Source.

推荐答案

好吧,通常,正确的方法是将一连串的源合并为一个源,即从 Source [Source [T,_],Whatever] Source [T,随便] 。可以使用 flatMapConcat 或使用 flatMapMerge 。因此,如果可以获得 Source [Source [Article,NotUsed],NotUsed] ,则可以使用 flatMap * 变体,并获得最终的 Source [Article,NotUsed] 。对您的每个来源都进行此操作(不要双关语),然后您的原始方法应该可以使用。

Okay, in general the correct way would be to join a stream of sources into a single source, i.e. go from Source[Source[T, _], Whatever] to Source[T, Whatever]. This can be done with flatMapConcat or with flatMapMerge. Therefore, if you can get a Source[Source[Article, NotUsed], NotUsed], you can use one of flatMap* variants and obtain a final Source[Article, NotUsed]. Do it for each of your sources (no pun intended), and then your original approach should work.

这篇关于如何动态将Source添加到现有图?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆