如何将流数据帧写入PostgreSQL? [英] How to write streaming dataframe to PostgreSQL?

查看:109
本文介绍了如何将流数据帧写入PostgreSQL?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个流数据帧,我正在尝试将其写入数据库.有用于将rdd或df写入Postgres的文档.但是,我无法找到有关如何在结构化流中完成操作的示例或文档.

I have a streaming dataframe that I am trying to write into a database. There is documentation for writing an rdd or df into Postgres. But, I am unable to find examples or documentation on how it is done in Structured streaming.

我已阅读文档 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch ,但我不明白在哪里创建jdbc连接以及如何将其写入数据库./p>

I have read the documentation https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch , but I couldn't understand where I would create a jdbc connection and how I would write it to the database.

def foreach_batch_function(df, epoch_id):
    # what goes in here?
    pass

view_counts_query = windowed_view_counts.writeStream \
    .outputMode("append") \
    .foreachBatch(foreach_batch_function)
    .option("truncate", "false") \
    .trigger(processingTime="5 seconds") \
    .start() \
    .awaitTermination()

此函数接收常规数据帧并写入postgres表

This function takes in a regular dataframe and writes into a postgres table

def postgres_sink(config, data_frame):
    config.read('/src/config/config.ini')
    dbname = config.get('dbauth', 'dbname')
    dbuser = config.get('dbauth', 'user')
    dbpass = config.get('dbauth', 'password')
    dbhost = config.get('dbauth', 'host')
    dbport = config.get('dbauth', 'port')

    url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
    properties = {
        "driver": "org.postgresql.Driver",
        "user": dbuser,
        "password": dbpass
    }

    data_frame.write.jdbc(url=url, table="metrics", mode="append",
                          properties=properties)

推荐答案

在这里,除了您已经拥有的东西之外,几乎没有什么可做的. foreachBatch 带有功能(DataFrame, Int) => None ,因此您所需要的只是一个小型适配器,其他所有功能都可以正常工作:

There is really little be done here, beyond what you already have. foreachBatch takes a function (DataFrame, Int) => None, so all you need is a small adapter, and everything else should work just fine:

def foreach_batch_for_config(config)
    def _(df, epoch_id):
        postgres_sink(config, df)
   return _

view_counts_query = (windowed_view_counts
    .writeStream
    .outputMode("append") 
    .foreachBatch(foreach_batch_for_config(some_config))
    ...,
    .start()
    .awaitTermination())

尽管说实话,ConfigParser从一开始就是一个奇怪的想法.您可以调整签名并对其进行初始化

though to be honest passing ConfigParser around is a strange idea from the beginning. You could adjust the signature adn initialize it in place

def postgres_sink(data_frame, batch_id):
    config = configparser.ConfigParser()
    ...
    data_frame.write.jdbc(...)

,其余部分保持原样.这样,您可以直接使用函数:

and keep the rest as-is. This way you could use your function directly:

...
.foreachBatch(postgres_sink)
...

这篇关于如何将流数据帧写入PostgreSQL?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆