计算Hadoop中的连续记录与Hive查询之间的差异 [英] Compute differences between succesive records in Hadoop with Hive Queries

查看:158
本文介绍了计算Hadoop中的连续记录与Hive查询之间的差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Hive表格,用于保存客户呼叫的数据。
为简单起见,考虑它有两列,第一列保存客户ID,第二列保存调用的时间戳(unix timestamp)。



我可以查询此表以查找每个客户的所有调用:

  SELECT * FROM mytable SORT BY customer_id,call_time; 

结果是:

pre > 客户1时间戳11
客户1时间戳12
客户1时间戳13
客户2时间戳21
客户3时间戳31
客户3时间戳32
...

是否可以创建一个Hive查询,该查询返回每个客户从第二个呼叫开始的时间间隔两个接连的电话?
对于上面那个查询应该返回的例子:

pre $ Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...

我试图调整解决方案来自 sql解决方案,但我坚持使用Hive限制:它只接受FROM中的子查询连接必须仅包含平等值



谢谢。



EDIT1:



我曾尝试使用Hive UDF函数:

  public class DeltaComputerUDF extends UDF {
private String previousCustomerId ;
私人长期previousCallTime;

public String evaluate(String customerId,LongWritable callTime){
long callTimeValue = callTime.get();
String timeDifference = null; (customerId.equals(previousCustomerId)){
timeDifference = new Long(callTimeValue - previousCallTime).toString();

if
}

previousCustomerId = customerId;
previousCallTime = callTimeValue;

return timeDifference;
}}

并将其与名称delta结合使用。



但是它似乎(从日志和结果)它在MAP时间被使用。由此产生2个问题:

第一: 必须按照客户ID和时间戳对表格数据进行排序这个功能。查询:

  SELECT customer_id,call_time,delta(customer_id,call_time)FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id,call_time; 

不起作用,因为排序部分是在我的函数被使用很久之后的REDUCE时间执行的。



我可以在使用函数之前对表格数据进行排序,但是我对此并不满意,因为它是我希望避免的开销。



其次: 在分布式Hadoop配置的情况下,数据会在可用的作业跟踪器中分割。所以我相信这个函数会有多个实例,每个映射器都有一个实例,所以可以在两个映射器之间分割相同的客户数据。在这种情况下,我将失去客户电话,这是不可接受的。



我不知道如何解决这个问题。我知道DISTRIBUTE BY确保具有特定值的所有数据都被发送到同一个reducer(从而确保SORT按预期工作),有没有人知道mapper是否有类似的东西?



接下来我打算遵循libjack的建议使用reduce脚本。这个计算在其他一些蜂房查询之间是需要的,所以我想尝试一下蜂巢提供的所有东西,然后按照Balaswamy vaddeman的建议转移到另一个工具。

> EDIT2:



我开始研究自定义脚本解决方案。但是,在编程Hive书(本章介绍自定义脚本)的第14章的第一页中,我找到了以下段落:

lockquote

流式处理通常比编码可比较的UDF或
InputFormat对象效率低。对数据进行序列化和反序列化以将数据传入并将
从管道中传出是相对低效的。以统一的方式调试整个
程序也很困难。但是,它对快速原型
和利用未用Java编写的现有代码非常有用。对于不想编写Java代码的Hive
用户,它可以是非常有效的
方法。




<所以很显然,自定义脚本不是效率方面的最佳解决方案。



但是,我应该如何保持自己的UDF功能,但要确保它能够工作预计在分布式Hadoop配置?我在Language Manual UDF wiki页面的UDF内部部分找到了这个问题的答案。如果我写我的查询:

$ p $ SELECT customer_id,call_time,delta(customer_id,call_time)FROM(SELECT customer_id,call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id,call_time)t;

它在REDUCE时间执行,并且DISTRIBUTE BY和SORT BY构造保证所有来自相同的记录客户正在按照调用顺序处理相同的缩减器。



所以上面的UDF和这个查询构造解决了我的问题。



(对不起,不添加链接,但我不能这样做,因为我没有足够的信誉点)

方案

这是一个老问题,但对于未来的参考,我在这里写下另一个命题:
$ b Hive 窗口函数允许在查询中使用前一个/下一个值。



SELECT customer_id,LAG(call_time,1,0)OVER(PARTITION BY customer_id ORDER BY call_time ROWS 1 PRECEDING) - call_time FROM mytable;


I have a Hive table that holds data of customer calls. For simplicity consider it has 2 columns, first column holds the customer ID and the second column holds the timestamp of the call (unix timestamp).

I can query this table to find all the calls for each customer:

SELECT * FROM mytable SORT BY customer_id, call_time;

The result is:

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...

Is it possible to create a Hive query that returns, for each customer, starting from the second call, the time interval between two succesive calls? For the above example that query should return:

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...

I have tried to adapt the solutions from the sql solution, but I'm stuck with the Hive limitations: it accepts subqueries only in FROM and joins must contain only equalities.

Thank you.

EDIT1:

I have tried to use a Hive UDF function:

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}

and use it with the name "delta".

But it seems (from the logs and result) that it is being used at MAP time. 2 problems arise from this:

First: The table data must be sorted by Customer ID and timestamp BEFORE using this function. The query:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;

does not work because the sorting part is performed at REDUCE time, long after my function is being used.

I can sort the table data before using the function, but I'm not happy with this because it is an overhead I hope to avoid.

Second: In case of a distributed Hadoop configuration, data is split among the available job trackers. So I believe there will be multiple instances of this function, one for each mapper, so it is possible to have the same customer data split between 2 mappers. In this case I will lose customer calls, which is not acceptable.

I don't know how to solve this issue. I know that DISTRIBUTE BY ensures that all data with a specific value is sent to the same reducer (thus ensuring that SORT works as expected), does anybody know if there is something similar for the mapper?

Next I plan to follow libjack's suggestion to use a reduce script. This "computation" is needed between some other hive queries, so I want to try everything Hive offers, before moving to another tool, as suggested by Balaswamy vaddeman.

EDIT2:

I started to investigate the custom scripts solution. But, in the first page of chapter 14 in Programming Hive book (this chapter presents the custom scripts), I found the following paragraph:

Streaming is usually less efficient than coding the comparable UDFs or InputFormat objects. Serializing and deserializing data to pass it in and out of the pipe is relatively inefficient. It is also harder to debug the whole program in a unified manner. However, it is useful for fast prototyping and for leveraging existing code that is not written in Java. For Hive users who don’t want to write Java code, it can be a very effective approach.

So it was clear that the custom scripts is not the best solution in terms of efficiency.

But how should I keep my UDF function, but make sure it works as expected in a distributed Hadoop configuration? I found the answer to this question in the UDF Internals section of the Language Manual UDF wiki page. If I write my query:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

it is executed at REDUCE time and DISTRIBUTE BY and SORT BY constructs guarantee that all the records from the same customer are being processed by the same reducer, in order of calls.

So the above UDF and this query construct solve my problem.

(Sorry for not adding the links, but I'm not allowed to do it because I don't have enough reputation points)

解决方案

It's an old question, but for future references, I write here another proposition:

Hive Windowing functions allows to use previous / next values in your query.

A simili code query may be :

SELECT customer_id, LAG(call_time, 1, 0) OVER (PARTITION BY customer_id ORDER BY call_time ROWS 1 PRECEDING) - call_time FROM mytable;

这篇关于计算Hadoop中的连续记录与Hive查询之间的差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆