如何将 Source 动态添加到现有 Graph? [英] How do I dynamically add Source to existing Graph?

查看:47
本文介绍了如何将 Source 动态添加到现有 Graph?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有什么可以替代动态改变运行图?这是我的情况.我有将文章摄取到数据库中的图表.文章来自 3 个不同格式的插件.因此我有几个流程

What can be alternative to dynamically changing running graph ? Here is my situation. I have graph that ingests articles into DB. Articles come from 3 plugins in different format. Thus I have several flows

val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]

// These are being created every time I poll plugins    
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]

val merged = Source.combine(
    sourceContentProvider.via(converterFlow1),
    sourceNews.via(converterFlow2),
    sourceCit)(Merge(_))

val res = merged
  .buffer(10, OverflowStrategy.backpressure)
  .toMat(sinkDB)(Keep.both)
  .run()

问题是我每 24 小时从内容提供商那里获取一次数据,每 2 小时从新闻获取一次数据,最后一个来源可能随时出现,因为它来自人类.

Problem is that I get data from content provider once per 24 hrs, from news once per 2 hrs and last source may come at any time because it's coming from humans.

我意识到图是不可变的,但是我如何定期将 Source 的新实例附加到我的图中,以便我对摄取过程进行单点节流?

I realize that graphs are immutable but how I can periodically attach new instances of Source to my graph so that I have single point of throttling of the process of ingesting ?

更新:你可以说我的数据是 Source-s 的流,在我的例子中是三个来源.但我无法改变它,因为我从外部类(所谓的插件)获得了 Source 的实例.这些插件独立于我的摄取类工作.我无法将它们组合成一个巨大的类来拥有单个 Source.

UPDATE: You can say my data is stream of Source-s, three sources in my case. But I cannot change that because I get instances of Source from external classes (so called plugins). These plugins work independently from my ingestion class. I can't combine them into one gigantic class to have single Source.

推荐答案

好的,一般来说,正确的方法是将源流合并为一个源,即从 Source[Source[T, _], 随便]Source[T, 随便].这可以通过 flatMapConcatflatMapMerge.因此,如果你能得到一个 Source[Source[Article, NotUsed], NotUsed],你可以使用 flatMap* 变体之一并获得最终的 Source[文章,未使用].为您的每个来源执行此操作(无意双关语),然后您的原始方法应该会起作用.

Okay, in general the correct way would be to join a stream of sources into a single source, i.e. go from Source[Source[T, _], Whatever] to Source[T, Whatever]. This can be done with flatMapConcat or with flatMapMerge. Therefore, if you can get a Source[Source[Article, NotUsed], NotUsed], you can use one of flatMap* variants and obtain a final Source[Article, NotUsed]. Do it for each of your sources (no pun intended), and then your original approach should work.

这篇关于如何将 Source 动态添加到现有 Graph?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆