与Apache星火主键 [英] Primary keys with Apache Spark

查看:879
本文介绍了与Apache星火主键的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有Apache的星火和PostgreSQL JDBC连接,我想一些数据插入到我的数据库。当我使用追加模式需要指定 ID 每个 DataFrame.Row 。有什么办法火花来创建主键?

I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data into my database. When I use append mode I need to specify id for each DataFrame.Row. Is there any way for Spark to create primary keys?

推荐答案

斯卡拉

如果你需要的是你可以使用 zipWithUniqueId 并重新创建数据帧独特的数字。一是部分进口和虚拟数据:

If all you need is unique numbers you can use zipWithUniqueId and recreate DataFrame. First some imports and dummy data:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

为进一步使用抽取模式:

Extract schema for further usage:

val schema = df.schema

添加id字段:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

创建数据框:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

在相同的事情的Python

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

如果您preFER连续编号您可以替换 zipWithUniqueId zipWithIndex ,但它是多一点点昂贵。

If you prefer consecutive number your can replace zipWithUniqueId with zipWithIndex but it is a little bit more expensive.

直接与数据帧 API

Directly with DataFrame API:

(通用斯卡拉,Python和Java的,R用pretty大致相同的语法)

previously我已经错过了 monotonicallyIncreasingId 功能,它应该只是罚款,只要你不要求连续编号:

Previously I've missed monotonicallyIncreasingId function which should work just fine as long as you don't require consecutive numbers:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

虽然有用 monotonicallyIncreasingId 具有不确定性。不仅IDS可以是执行到执行但没有额外的技巧不同,不能用于识别行时后续操作包含过滤器。

While useful monotonicallyIncreasingId is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.

注意

也可以使用 ROWNUMBER 窗口功能:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

不幸的是:

WARN窗口:无分区定义窗口操作!所有的数据移动到单个分区,这可能会导致严重的性能下降。

WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

所以,除非你有你的分区数据,并确保其唯一一种自然的方式是不是在这个时候特别有用。

So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.

这篇关于与Apache星火主键的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆