从pySpark SQL获取新行ID写入远程mysql数据库(JDBC) [英] getting the new row id from pySpark SQL write to remote mysql db (JDBC)
问题描述
我正在使用pyspark-sql通过JDBC在远程mysql数据库中创建行.
I am using pyspark-sql to create rows in a remote mysql db, using JDBC.
我有两个表parent_table(id, value)
和child_table(id, value, parent_id)
,因此parent_id
的每一行都可以根据需要将child_id
中的每一行与之关联.
I have two tables, parent_table(id, value)
and child_table(id, value, parent_id)
, so each row of parent_id
may have as many rows in child_id
associated to it as needed.
现在,我想创建一些新数据并将其插入数据库中.我在此处使用代码准则用于write
操作,但我希望能够执行以下操作:
Now I want to create some new data and insert it into the database. I'm using the code guidelines here for the write
opperation, but I would like to be able to do something like:
parentDf = sc.parallelize([5, 6, 7]).toDF(('value',))
parentWithIdDf = parentDf.write.mode('append') \
.format("jdbc") \
.option("url", "jdbc:mysql://" + host_name + "/"
+ db_name).option("dbtable", table_name) \
.option("user", user_name).option("password", password_str) \
.save()
# The assignment at the previous line is wrong, as pyspark.sql.DataFrameWriter#save doesn't return anything.
我想为上面代码的最后一行提供一种方法,以便为每行返回一个具有新行ID的DataFrame,以便我可以这样做
I would like a way for the last line of code above to return a DataFrame with the new row ids for each row so I can do
childDf = parentWithIdDf.flatMap(lambda x: [[8, x[0]], [9, x[0]]])
childDf.write.mode('append')...
表示最后我将在远程数据库中拥有
meaning that at the end I would have in my remote databasde
parent_table
____________
| id | value |
____________
| 1 | 5 |
| 2 | 6 |
| 3 | 7 |
____________
child_table
________________________
| id | value | parent_id |
________________________
| 1 | 8 | 1 |
| 2 | 9 | 1 |
| 3 | 8 | 2 |
| 4 | 9 | 2 |
| 5 | 8 | 3 |
| 6 | 9 | 3 |
________________________
正如我在上面的第一个代码段中所写的那样,pyspark.sql.DataFrameWriter#save
不会返回任何内容,而是查看
As I've written in the first code snippet above, pyspark.sql.DataFrameWriter#save
doesn't return anything, looking at its documentation, so how can I achieve this?
我做错了什么吗?似乎没有办法从Spark动作(save
是)中取回数据,而我想将此动作用作转换,shich让我觉得我可能以错误的方式想到了所有这些内容
Am I doing something completely wrong? It looks like there is no way to get data back from a Spark's action (which save
is) while I would like to use this action as a transformation, shich leads me to think I may be thinking of all this in the wrong way.
推荐答案
一个简单的答案是使用时间戳+自动递增编号创建唯一的ID.仅在某个时间实例中只有一台服务器正在运行时,这才起作用. :)
A simple answer is to to use the timestamp + auto increment number to create a unique ID. This only works if there is only one server is running at an instance of time. :)
这篇关于从pySpark SQL获取新行ID写入远程mysql数据库(JDBC)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!