使用Python将多行插入到Hive表中 [英] Using Python to insert multiple rows into a Hive table

查看:2011
本文介绍了使用Python将多行插入到Hive表中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Hive是一个数据仓库,专门用于查询和聚合驻留在HDFS上的大型数据集。


$ b

标准 INSERT INTO


  1. 每个语句都需要执行Map / Reduce进程。

  2. 每条语句都会导致一个新文件被添加到HDFS中 - 随着时间的推移,从表中读取时这会导致性能很差。



据说,现在有一个Hive / HCatalog的Streaming API,详见 here。



我需要使用Python将数据以速度插入Hive。我知道 pyhive pyhs2 库,但它们都没有使用Streaming API。 / p>

有没有人成功地设法让Python使用Streaming API将多行插入到Hive中,并且这是如何完成的?

Hive用户可以通过脚本对数据流进行流式处理,以转换数据:
添加文件replace-nan-with-zeros.py;

 选择
TRANSFORM(...)
USING'python replace-nan-with-zeros.py'
AS(...)
FROM some_table;

这里有一个简单的Python脚本:

#!/ usr / bin / env python 
import sys

kFirstColumns = 7

def main(argv):

用于sys.stdin中的行:
line = line.strip();
inputs = line.split('\t')

#用零替代NaN
outputs = []
columnIndex = 1;
用于输入值:
newValue =值
如果columnIndex> kFirstColumns:
newValue = value.replace('NaN','0.0')
outputs.append(newValue)
columnIndex = columnIndex + 1

print'\\ \\ t'.join(输出)

如果__name__ ==__main__:
main(sys.argv [1:])

Hive和Python



Python可以通过HiveQL TRANSFORM语句从Hive用作UDF。例如,以下HiveQL调用存储在streaming.py文件中的Python脚本。



基于Linux的HDInsight



添加文件wasb:///streaming.py;

  SELECT TRANSFORM(clientid,devicemake,devicemodel)
使用'streaming.py'AS
(clientid string,phoneLable string,phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

基于Windows的HDInsight

添加文件wasb :///streaming.py;

  SELECT TRANSFORM(clientid,devicemake,devicemodel)
USING'D:\\ \\ Python27 \python.exe streaming.py'AS
(clientid string,phoneLable string,phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

下面是这个例子的作用:

1.文件开头处的添加文件语句将streaming.py文件添加到分布式缓存中,以便群集中的所有节点均可访问。



2。 SELECT TRANSFORM ... USING语句从混合样本中选择数据,并将clientid,devicemake和devicemodel传递给streaming.py脚本。
$ b 3. AS子句描述从streaming.py返回的字段



以下是HiveQL示例使用的streaming.py文件。

 #!/ usr / bin / env python 

导入sys
导入字符串
导入hashlib

而True:
line = sys.stdin.readline()
if not line:
break

line = string.strip(line,\ n)
clientid,devicemake,devicemodel = string.split(line,\ t)
phone_label = devicemake +''+ devicemodel
print\t.join([clientid, phone_label,hashlib.md5(phone_label).hexdigest()])

由于我们使用流媒体,脚本必须做到以下几点:

1.从STDIN读取数据。这是通过在这个例子中使用sys.stdin.readline()来实现的。



2.使用string.strip删除尾随的换行符(line,\\\
),因为我们只是想要文本数据而不是行尾指示符。



3.当进行流处理时,单行将包含带有制表符的所有值每个值之间的字符。因此,可以使用string.split(line,\ t)来分割每个标签处的输入,只返回字段。



4.当处理完成时,必须将输出作为单行写入STDOUT,并在每个字段之间加上一个制表符。这是通过使用打印\ t.join([clientid,phone_label,hashlib.md5(phone_label).hexdigest()])。



5.完成所有这些都发生在一个while循环中,直到没有行被读取为止,此时break会退出循环并且脚本终止。

除此之外,脚本只是连接devicemake和devicemodel的输入值,并计算连接值的散列值。非常简单,但它描述了从Hive调用的任何Python脚本应如何工作的基本知识:循环,读取输入,直到不再有任何输入,在选项卡处分开每行输入,处理,写入一行制表符分隔输出。

Hive is a data warehouse designed for querying and aggregating large datasets that reside on HDFS.

The standard INSERT INTO syntax performs poorly because:

  1. Each statement required a Map/Reduce process to be executed.
  2. Each statement will result in a new file being added to HDFS - over time this will lead to very poor performance when reading from the table.

With that said, there is now a Streaming API for Hive / HCatalog, as detailed here.

I am faced with the need to insert data at velocity into Hive, using Python. I am aware of the pyhive and pyhs2 libraries, but neither of them appears to make use of the Streaming API.

Has anyone successfully managed to get Python to insert many rows into Hive using the Streaming API, and how was this done?

I look forward to your insights!

解决方案

Hive user can stream table through script to transform that data: ADD FILE replace-nan-with-zeros.py;

SELECT
  TRANSFORM (...)
  USING 'python replace-nan-with-zeros.py'
  AS (...)
FROM some_table;

Here a simple Python script:

    #!/usr/bin/env python
    import sys

kFirstColumns= 7

def main(argv):

    for line in sys.stdin:
        line = line.strip();
        inputs = line.split('\t')

        # replace NaNs with zeros
        outputs = [ ]
        columnIndex = 1;
        for value in inputs:
            newValue = value
            if columnIndex > kFirstColumns:
                newValue = value.replace('NaN','0.0')
            outputs.append(newValue)
            columnIndex = columnIndex + 1

        print '\t'.join(outputs)

if __name__ == "__main__":
    main(sys.argv[1:])

Hive and Python

Python can be used as a UDF from Hive through the HiveQL TRANSFORM statement. For example, the following HiveQL invokes a Python script stored in the streaming.py file.

Linux-based HDInsight

add file wasb:///streaming.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
  USING 'streaming.py' AS
  (clientid string, phoneLable string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Windows Based HDInsight

add file wasb:///streaming.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
  USING 'D:\Python27\python.exe streaming.py' AS
  (clientid string, phoneLable string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Here's what this example does:

1.The add file statement at the beginning of the file adds the streaming.py file to the distributed cache, so it's accessible by all nodes in the cluster.

2.The SELECT TRANSFORM ... USING statement selects data from the hivesampletable, and passes clientid, devicemake, and devicemodel to the streaming.py script.

3.The AS clause describes the fields returned from streaming.py

Here's the streaming.py file used by the HiveQL example.

#!/usr/bin/env python

import sys
import string
import hashlib

while True:
  line = sys.stdin.readline()
  if not line:
    break

  line = string.strip(line, "\n ")
  clientid, devicemake, devicemodel = string.split(line, "\t")
  phone_label = devicemake + ' ' + devicemodel
  print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

Since we are using streaming, this script has to do the following:

1.Read data from STDIN. This is accomplished by using sys.stdin.readline() in this example.

2.The trailing newline character is removed using string.strip(line, "\n "), since we just want the text data and not the end of line indicator.

3.When doing stream processing, a single line contains all the values with a tab character between each value. So string.split(line, "\t") can be used to split the input at each tab, returning just the fields.

4.When processing is complete, the output must be written to STDOUT as a single line, with a tab between each field. This is accomplished by using print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).

5.This all occurs within a while loop, that will repeat until no line is read, at which point break exits the loop and the script terminates.

Beyond that, the script just concatenates the input values for devicemake and devicemodel, and calculates a hash of the concatenated value. Pretty simple, but it describes the basics of how any Python script invoked from Hive should function: Loop, read input until there is no more, break each line of input apart at the tabs, process, write a single line of tab delimited output.

这篇关于使用Python将多行插入到Hive表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆