是否可以让spark结构化流(更新模式)写入db? [英] is it possible to let spark structured stream(update mode) to write to db?

查看:94
本文介绍了是否可以让spark结构化流(更新模式)写入db?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用spark(3.0.0)结构化流从kafka中读取主题.

I use spark(3.0.0) structured streaming to read topic from kafka.

我先使用 joins ,然后再使用 mapGropusWithState 来获取我的流数据,所以我必须使用 update https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

I've used joins and then used mapGropusWithState to get my stream data, so I have to use update mode, based on my understanding from the spark offical guide: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

spark官方指南的以下部分对数据库接收器没有任何说明,并且对于 update模式,它也不支持对 files 的写操作: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

Below section of the spark offical guide says nothing about DB sink, and It does not support write to files either for update mode: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

当前,我将其输出到 console ,并且我想将数据存储在文件或DB中.

Currently I output it to console, and I would like to to store the data in files or DB.

所以我的问题是:如何在我的情况下将流数据写入db或文件?我是否必须将数据写入kafka,然后使用kafka connect将其读回文件/db?

So my question is: how can I write the stream data to db or file in my situation? Do i have to write the data to kafka and then use kafka connect to read them back to files/db?

p.s.我按照文章进行操作,以获取聚合流查询.

p.s. I followed the articles to get the aggregated streaming query.

- https://stackoverflow.com/questions/62738727/how-to-deduplicate-and-keep-latest-based-on-timestamp-field-in-spark-structured
- https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
- will also try one more time for below using java api
(https://stackoverflow.com/questions/50933606/spark-streaming-select-record-with-max-timestamp-for-each-id-in-dataframe-pysp)

推荐答案

我对OUTPUT和WRITE感到困惑.我也错误地认为DB和FILE Sink在文档的OUTPUT SINK部分中是并行的(因此无法在指南的OUTPUT SINKs部分中看到DB Sink:

I got confused by the OUTPUT and WRITE. Also I was wrongly assuming the DB and FILE Sink are in parallel term in the OUTPUT SINK section of the doc(and so one cannot see DB sink in the OUTPUT SINKs section of the guide: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks).

我刚刚意识到OUTPUT模式(追加/更新/完成)将执行查询流查询约束.但这与如何写入SINK无关.我还意识到可以通过使用FOREACH SINK来完成DB编写(起初我只是了解它是为了进行额外的转换).

I just realized that the OUTPUT mode (append/update/complete) is to do query streaming query constraints. But it has nothing to do with how to WRITE to the SINK. I also realized the DB writing can be achieved by using the FOREACH SINK (initially I just understood it is for extra transformation).

我发现这些文章/讨论很有用

  • https://www.waitingforcode.com/apache-spark-structured-streaming/output-modes-structured-streaming/read#what_is_the_difference_with_SaveMode
  • How to write streaming dataframe to PostgreSQL?
  • https://linuxize.com/post/how-to-list-databases-tables-in-postgreqsl/

因此,稍后,请再次阅读官方指南,确认每个批次在写入存储时也可以执行自定义逻辑等.

so later on, read the official guide again, confirmed the for each batch can also do custom logic etc when WRITING to a STORAGE.

这篇关于是否可以让spark结构化流(更新模式)写入db?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆