如何在Spark SQL(PySpark)中实现自动增量 [英] How to implement auto increment in spark SQL(PySpark)

查看:394
本文介绍了如何在Spark SQL(PySpark)中实现自动增量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在我的Spark sql表中实现一个自动递增列,我该怎么做.请指导我.我正在使用pyspark 2.0

I need to implement a auto increment column in my spark sql table, how could i do that. Kindly guide me. i am using pyspark 2.0

谢谢 卡良(Kalyan)

Thank you Kalyan

推荐答案

我将编写/重用有状态的Hive udf 并向pySpark注册,因为Spark SQL确实对Hive提供了良好的支持.

I would write/reuse stateful Hive udf and register with pySpark as Spark SQL does have good support for Hive.

在下面的代码中选中此行@UDFType(deterministic = false, stateful = true),以确保它是有状态的UDF.

check this line @UDFType(deterministic = false, stateful = true) in below code to make sure it's stateful UDF.

package org.apache.hadoop.hive.contrib.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;

/**
 * UDFRowSequence.
 */
@Description(name = "row_sequence",
    value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false, stateful = true)
public class UDFRowSequence extends UDF
{
  private LongWritable result = new LongWritable();

  public UDFRowSequence() {
    result.set(0);
  }

  public LongWritable evaluate() {
    result.set(result.get() + 1);
    return result;
  }
}

// End UDFRowSequence.java

现在构建jar并在pyspark get启动时添加位置.

Now build the jar and add the location when pyspark get's started.

$ pyspark --jars your_jar_name.jar

然后向sqlContext注册.

sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")

现在在选择查询中使用row_seq()

Now use row_seq() in select query

sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")

在pySpark中使用Hive UDF的项目

这篇关于如何在Spark SQL(PySpark)中实现自动增量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆