Apache Spark 的主键 [英] Primary keys with Apache Spark
问题描述
我与 Apache Spark 和 PostgreSQL 建立了 JDBC 连接,我想将一些数据插入到我的数据库中.当我使用 append
模式时,我需要为每个 DataFrame.Row
指定 id
.Spark有没有办法创建主键?
I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data into my database. When I use append
mode I need to specify id
for each DataFrame.Row
. Is there any way for Spark to create primary keys?
推荐答案
Scala:
如果您只需要唯一的数字,您可以使用 zipWithUniqueId
并重新创建 DataFrame.首先是一些导入和虚拟数据:
If all you need is unique numbers you can use zipWithUniqueId
and recreate DataFrame. First some imports and dummy data:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
提取架构以供进一步使用:
Extract schema for further usage:
val schema = df.schema
添加id字段:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
创建数据帧:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
Python 中的相同内容:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
如果您更喜欢连续数字,您可以将 zipWithUniqueId
替换为 zipWithIndex
,但它会贵一点.
If you prefer consecutive number your can replace zipWithUniqueId
with zipWithIndex
but it is a little bit more expensive.
直接使用DataFrame
API:
Directly with DataFrame
API:
(通用 Scala、Python、Java、R,语法几乎相同)
以前我错过了 monotonicallyIncreasingId
函数,只要您不需要连续数字,它就可以正常工作:
Previously I've missed monotonicallyIncreasingId
function which should work just fine as long as you don't require consecutive numbers:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
虽然有用的 monotonicallyIncreasingId
是不确定的.不仅 ids 可能因执行而异,而且当后续操作包含过滤器时,如果没有额外的技巧,就不能用于识别行.
While useful monotonicallyIncreasingId
is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.
注意:
也可以使用rowNumber
窗口函数:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
不幸的是:
警告窗口:没有为窗口操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降.
WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
因此,除非您有一种自然的方法来分区数据并确保唯一性,否则目前并不是特别有用.
So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.
这篇关于Apache Spark 的主键的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!