使用 Hive 查询计算 Hadoop 中连续记录之间的差异 [英] Compute differences between succesive records in Hadoop with Hive Queries

查看:23
本文介绍了使用 Hive 查询计算 Hadoop 中连续记录之间的差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Hive 表,用于保存客户呼叫的数据.为简单起见,考虑它有 2 列,第一列保存客户 ID,第二列保存调用的时间戳(unix 时间戳).

I have a Hive table that holds data of customer calls. For simplicity consider it has 2 columns, first column holds the customer ID and the second column holds the timestamp of the call (unix timestamp).

我可以查询此表以查找每个客户的所有来电:

I can query this table to find all the calls for each customer:

SELECT * FROM mytable SORT BY customer_id, call_time;

结果是:

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...

是否可以创建一个 Hive 查询,从第二次调用开始,为每个客户返回两次连续调用之间的时间间隔?对于上面的例子,查询应该返回:

Is it possible to create a Hive query that returns, for each customer, starting from the second call, the time interval between two succesive calls? For the above example that query should return:

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...

我尝试调整 sql 解决方案,但我坚持 Hive 限制:它只接受 FROM 中的子查询joins 必须只包含相等.

I have tried to adapt the solutions from the sql solution, but I'm stuck with the Hive limitations: it accepts subqueries only in FROM and joins must contain only equalities.

谢谢.

我尝试使用 Hive UDF 函数:

I have tried to use a Hive UDF function:

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}

并将其与名称delta"一起使用.

and use it with the name "delta".

但似乎(从日志和结果来看)它是在 MAP 时间使用的.由此产生了 2 个问题:

But it seems (from the logs and result) that it is being used at MAP time. 2 problems arise from this:

首先: 在使用此函数之前,表数据必须按客户 ID 和时间戳排序.查询:

First: The table data must be sorted by Customer ID and timestamp BEFORE using this function. The query:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;

不起作用,因为排序部分是在 REDUCE 时间执行的,在我的函数被使用很久之后.

does not work because the sorting part is performed at REDUCE time, long after my function is being used.

我可以在使用该函数之前对表数据进行排序,但我对此并不满意,因为这是我希望避免的开销.

I can sort the table data before using the function, but I'm not happy with this because it is an overhead I hope to avoid.

第二:在分布式 Hadoop 配置的情况下,数据在可用的作业跟踪器之间拆分.所以我相信这个函数会有多个实例,每个映射器一个,所以有可能在 2 个映射器之间拆分相同的客户数据.在这种情况下,我会失去客户电话,这是不可接受的.

Second: In case of a distributed Hadoop configuration, data is split among the available job trackers. So I believe there will be multiple instances of this function, one for each mapper, so it is possible to have the same customer data split between 2 mappers. In this case I will lose customer calls, which is not acceptable.

我不知道如何解决这个问题.我知道 DISTRIBUTE BY 确保将具有特定值的所有数据发送到同一个减速器(从而确保 SORT 按预期工作),有人知道映射器是否有类似的东西吗?

I don't know how to solve this issue. I know that DISTRIBUTE BY ensures that all data with a specific value is sent to the same reducer (thus ensuring that SORT works as expected), does anybody know if there is something similar for the mapper?

接下来我打算按照libjack的建议使用reduce脚本.在其他一些 Hive 查询之间需要进行这种计算",因此我想尝试 Hive 提供的所有功能,然后再按照 Balaswamy vaddeman 的建议使用其他工具.

Next I plan to follow libjack's suggestion to use a reduce script. This "computation" is needed between some other hive queries, so I want to try everything Hive offers, before moving to another tool, as suggested by Balaswamy vaddeman.

我开始研究自定义脚本解决方案.但是,在《Programming Hive》一书(本章介绍自定义脚本)第 14 章的第一页中,我发现了以下段落:

I started to investigate the custom scripts solution. But, in the first page of chapter 14 in Programming Hive book (this chapter presents the custom scripts), I found the following paragraph:

流式传输通常不如编码类似的 UDF 或输入格式对象.序列化和反序列化数据以传入和出管道效率相对较低.调试整体也比较困难统一规划.但是,它对于快速原型设计很有用以及利用非 Java 编写的现有代码.对于蜂巢不想写Java代码的用户,它可以是一个非常有效的方法.

Streaming is usually less efficient than coding the comparable UDFs or InputFormat objects. Serializing and deserializing data to pass it in and out of the pipe is relatively inefficient. It is also harder to debug the whole program in a unified manner. However, it is useful for fast prototyping and for leveraging existing code that is not written in Java. For Hive users who don’t want to write Java code, it can be a very effective approach.

因此很明显,就效率而言,自定义脚本并不是最佳解决方案.

So it was clear that the custom scripts is not the best solution in terms of efficiency.

但是我应该如何保留我的 UDF 功能,同时确保它在分布式 Hadoop 配置中按预期工作?我在语言手册 UDF wiki 页面的 UDF 内部部分找到了这个问题的答案.如果我写我的查询:

But how should I keep my UDF function, but make sure it works as expected in a distributed Hadoop configuration? I found the answer to this question in the UDF Internals section of the Language Manual UDF wiki page. If I write my query:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

它在 REDUCE 时间执行,并且 DISTRIBUTE BY 和 SORT BY 构造保证来自同一客户的所有记录都由同一个 reducer 按调用顺序处理.

it is executed at REDUCE time and DISTRIBUTE BY and SORT BY constructs guarantee that all the records from the same customer are being processed by the same reducer, in order of calls.

所以上面的 UDF 和这个查询构造解决了我的问题.

So the above UDF and this query construct solve my problem.

(抱歉没有添加链接,但我没有足够的声望点,我不允许这样做)

(Sorry for not adding the links, but I'm not allowed to do it because I don't have enough reputation points)

推荐答案

这是一个老问题,但为了将来参考,我在这里写了另一个命题:

It's an old question, but for future references, I write here another proposition:

Hive 窗口函数允许在您的查询.

Hive Windowing functions allows to use previous / next values in your query.

类似的代码查询可能是:

A similar code query may be :

SELECT customer_id, call_time - LAG(call_time, 1, 0) OVER (PARTITION BY customer_id ORDER BY call_time) FROM mytable;

这篇关于使用 Hive 查询计算 Hadoop 中连续记录之间的差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆