Hive连接优化 [英] Hive join optimization

查看:352
本文介绍了Hive连接优化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两组数据存储在S3存储桶中,需要在Hive中处理并将输出存储回S3。

 数据集1:{requestId:TADS6152JHGJH5435,customerId: ASJHAGSJH,sessionId:172356126} 

DataSet2:{requestId:TADS6152JHGJH5435,userAgent:Mozilla}

我需要根据 requestId 连接这两个数据集,并输出一个组合的行:

 输出:{requestId:TADS6152JHGJH5435,customerId:ASJHAGSJH,sessionId:172356126, userAgent:Mozilla} 

数据集1中的requestIds是一个 / strong>数据集2中的requestids。我正在使用 LEFT OUTER JOIN 来获取我的输出。这是我的Hive脚本的简化版本:

pre $ code CREATE EXTERNAL TABLE dataset1
requestId string
customerId字符串,
sessionId字符串

LOCATION's3:// path_to_dataset1 /';

CREATE EXTERNAL TABLE dataset2(
requestId string,
userAgent string

LOCATION's3:// path_to_dataset2 /';

CREATE EXTERNAL TABLE输出(
requestId字符串,
customerId字符串,
sessionId字符串,
userAgent字符串

LOCATION 'S3,// path_to_output /';

INSERT OVERWRITE TABLE输出
SELECT d1.requestId,d1.customerId,d1.sessionId,d2.userAgent
FROM dataset1 d1 LEFT OUTER JOIN dataset2 d2
ON( d1.requestId = d2.requestId);

我的问题是:

优化这个连接的机会?我可以使用分区/分区来更快地运行连接吗?我在脚本中设置了 hive.auto.convert.join true 。还应该设置其他哪些配置单元属性以获得更好的性能,以满足上述查询的需要? 1。优化联接

我们可以通过启用自动转换映射连接并启用扭曲连接的优化来提高连接的性能。

  Auto Map Joins 

使用小型表格加入大型表格时,Auto Map-Join是一项非常实用的功能。如果我们启用此功能,则小表将保存在每个节点的本地缓存中,然后在Map阶段与大表连接。启用自动映射加入提供了两个优点。首先,将一个小表加载到缓存中将节省每个数据节点上的读取时间。其次,它避免了Hive查询中的偏斜连接,因为连接操作已经在Map阶段为每个数据块完成了。

  Skew Join 

我们可以通过设置hive.optimize来优化偏斜连接,即不平衡连接.skewjoin属性为true,可以通过hive shell或hive-site.xml文件中的SET命令。

 < property> 
<名称> hive.optimize.skewjoin< /名称>
<值> true< /值>
< description>
是否启用偏斜连接优化。
算法如下:在运行时,检测具有较大偏斜的密钥。而不是
处理这些密钥,将它们临时存储在HDFS目录中。在后续图中 - 减少
作业,处理那些倾斜的键。对于所有的表,相同的键不需要倾斜,因此,
后续的map-reduce作业(对于倾斜键)会更快,因为它将是一个
map-join 。
< / description>
< / property>
<属性>
< name> hive.skewjoin.key< / name>
<值> 100000< /值>
< description>
确定我们是否在连接中获得了歪斜键。如果我们看到多于连接运算符中具有相同键的指定行数,
我们认为该键是一个倾斜连接键。
< / description>
< / property>
<属性>
< name> hive.skewjoin.mapjoin.map.tasks< / name>
<值> 10000< /值>
< description>
确定跟随地图合并作业中用于偏斜合并的地图任务的数量。
它应该和hive.skewjoin.mapjoin.min.split一起使用来执行细粒度控制。
< / description>
< / property>
<属性>
< name> hive.skewjoin.mapjoin.min.split< / name>
<值> 33554432< /值>
< description>
通过指定
最小拆分大小,确定在后续映射合并作业中最多用于歪斜连接的映射任务的数量。它应该与hive.skewjoin.mapjoin.map.tasks一起使用来执行细粒度控制。
< / description>
< / property>

2.启用Bucketed Map连接

如果表格由特定列,并且这些表正在连接中使用,那么我们可以启用bucketed映射连接来提高性能。

 <属性> 
< name> hive.optimize.bucketmapjoin< / name>
<值> true< /值>
< description>是否试用桶地图连接< / description>
< / property>
<属性>
< name> hive.optimize.bucketmapjoin.sortedmerge< / name>
<值> true< /值>
< description>是否尝试排序的桶合并映射加入< / description>
< / property>

  3。启用Tez执行引擎

代替在历史悠久的Map-Reduce引擎上运行Hive查询,我们可以提高性能通过在Tez执行引擎上运行,配置单元查询至少100%到300%。我们可以使用以下属性的Tez引擎从hive shell启用。

  hive>设置hive.execution.engine = tez; 

  4。启用并行执行

Hive将查询转换为一个或多个阶段。阶段可能是MapReduce阶段,抽样阶段,合并阶段,限制阶段。默认情况下,Hive一次执行一个这些阶段。一个特定的工作可能由一些不相互依赖的阶段组成,并且可以在

中执行,可能使整个工作更快完成。并行执行可以通过设置以下属性来启用。

 < property> 
< name> hive.exec.parallel< / name>
<值> true< /值>
< description>是否并行执行作业< / description>
< / property>
<属性>
< name> hive.exec.parallel.thread.number< / name>
<值> 8< /值>
< description>最多可以并行执行多少个作业< / description>
< / property>

  5。启用向量化仅在hive-0.13.1版本中,向量化功能首次引入到配置单元中。通过向量化查询执行,我们可以通过一次性批量处理1024行而不是每次执行单行来提高扫描,聚合,筛选器和连接等操作的性能。



我们可以通过在hive shell或hive-site.xml文件中设置以下三个属性来启用向量化查询执行。

  hive> ;设置hive.vectorized.execution.enabled = true; 
hive>设置hive.vectorized.execution.reduce.enabled = true;
hive>设置hive.vectorized.execution.reduce.groupby.enabled = true;

  6。基于成本的优化启用

最近的Hive版本提供了基于成本的优化功能,在查询成本上,导致潜在的不同决策:如何订购连接,执行哪种类型的连接,并行度等。

基于成本的优化可以通过设置下面hive-site.xml文件中的属性。

 < property> 
< name> hive.cbo.enable< / name>
<值> true< /值>
< description>使用Calcite框架控制启用基于成本的优化的标志。< / description>
< / property>
<属性>
< name> hive.compute.query.using.stats< / name>
<值> true< /值>
< description>
当设置为true时,Hive会回答几个查询,如count(1)纯粹使用存储在Metastore中的统计信息
。对于基本的统计信息收集,请将config hive.stats.autogather设置为true。
更高级的统计信息集合需要运行分析表查询。
< / description>
< / property>
<属性>
< name> hive.stats.fetch.partition.stats< / name>
<值> true< /值>
< description>
使用统计信息注释操作符树需要分区级基本
统计信息,如行数,数据大小和文件大小。分区统计信息从
Metastore中获取。当
分区数很高时,为每个需要的分区获取分区统计数据可能会很昂贵。此标志可用于禁止从Metastore获取分区统计信息
。当此标志被禁用时,Hive将调用文件系统来获取文件大小
,并将估计行架构中的行数。
< / description>
< / property>
<属性>
< name> hive.stats.fetch.column.stats< / name>
<值> true< /值>
< description>
使用统计信息注释操作符树需要列统计信息。
列统计信息是从Metastore中获取的。当列数很高时,为每个所需列获取列统计信息
可能会很昂贵。此标志可用于禁止从Metastore获取
列统计信息。
< / description>
< / property>
<属性>
< name> hive.stats.autogather< / name>
<值> true< /值>
< description>在INSERT OVERWRITE命令期间自动收集统计信息的标志。< / description>
< / property>
<属性>
< name> hive.stats.dbclass< / name>
<值> fs< /值>
< description>
期望[jdbc(:。*),hbase,counter,custom,fs]中的模式之一。
存储临时Hive统计信息的存储。在基于文件系统的统计信息收集('fs')中,
每个任务将其收集的统计信息写入文件系统的文件中,在作业完成后将汇总为
。支持的值是StatsSetupConst.java中定义的fs(filesystem),jdbc:database(其中数据库
可以是derby,mysql等),hbase,counter和custom。
< / description>
< / property>


I have two sets of data both stored in an S3 bucket which I need to process in Hive and store the output back to S3. Sample rows from each datasets are as follows:

DataSet 1: {"requestId":"TADS6152JHGJH5435", "customerId":"ASJHAGSJH","sessionId":"172356126"}

DataSet2: {"requestId":"TADS6152JHGJH5435","userAgent":"Mozilla"}

I need to join these two data sets based on the requestId and output a combined row as:

Output:  {"requestId":"TADS6152JHGJH5435", "customerId":"ASJHAGSJH","sessionId":"172356126","userAgent":"Mozilla"}

The requestIds in dataset 1 is a proper subset of of the requestids in dataset 2. I am using a LEFT OUTER JOIN to get my output. Here is a simplified version of my Hive script:

CREATE EXTERNAL TABLE dataset1 (
     requestId string,
     customerId string,
     sessionId string
 )
LOCATION 's3://path_to_dataset1/';

CREATE EXTERNAL TABLE dataset2 (
     requestId string,
     userAgent string
 )
LOCATION 's3://path_to_dataset2/';

CREATE EXTERNAL TABLE output (
     requestId string,
     customerId string,
     sessionId string,
     userAgent string
 )
LOCATION 's3://path_to_output/';

INSERT OVERWRITE TABLE output
  SELECT d1.requestId, d1.customerId, d1.sessionId, d2.userAgent
  FROM dataset1 d1 LEFT OUTER JOIN dataset2 d2
  ON (d1.requestId=d2.requestId);

My question is:

Are there opportunities to optimize this join? Can I use partitioning/bucketing of the tables to run the join faster? I have set hive.auto.convert.join to true in my script. What other hive properties should I set to gain better performance for the above queries?

解决方案

1. Optimize Joins

We can improve the performance of joins by enabling Auto Convert Map Joins and enabling optimization of skew joins.

Auto Map Joins

Auto Map-Join is a very useful feature when joining a big table with a small table. if we enable this feature, the small table will be saved in the local cache on each node, and then joined with the big table in the Map phase. Enabling Auto Map Join provides two advantages. First, loading a small table into cache will save read time on each data node. Second, it avoids skew joins in the Hive query, since the join operation has been already done in the Map phase for each block of data.

Skew Joins

We can enable optimization of skew joins, i.e. imbalanced joins by setting hive.optimize.skewjoin property to true either via SET command in hive shell or hive-site.xml file.

  <property>
    <name>hive.optimize.skewjoin</name>
    <value>true</value>
    <description>
      Whether to enable skew join optimization. 
      The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of
      processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce
      job, process those skewed keys. The same key need not be skewed for all the tables, and so,
      the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a
      map-join.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.key</name>
    <value>100000</value>
    <description>
      Determine if we get a skew key in join. If we see more than the specified number of rows with the same key in join operator,
      we think the key as a skew join key. 
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.map.tasks</name>
    <value>10000</value>
    <description>
      Determine the number of map task used in the follow up map join job for a skew join.
      It should be used together with hive.skewjoin.mapjoin.min.split to perform a fine grained control.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.min.split</name>
    <value>33554432</value>
    <description>
      Determine the number of map task at most used in the follow up map join job for a skew join by specifying 
      the minimum split size. It should be used together with hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.
    </description>
  </property>

2. Enable Bucketed Map Joins

If tables are bucketed by a particular column and these tables are being used in joins then we can enable bucketed map join to improve the performance.

  <property>
    <name>hive.optimize.bucketmapjoin</name>
    <value>true</value>
    <description>Whether to try bucket mapjoin</description>
  </property>
  <property>
    <name>hive.optimize.bucketmapjoin.sortedmerge</name>
    <value>true</value>
    <description>Whether to try sorted bucket merge map join</description>
  </property>

.

3. Enable Tez Execution Engine

Instead of running Hive queries on venerable Map-reduce engine, we can improve the performance of hive queries at least by 100% to 300 % by running on Tez execution engine. We can enable the Tez engine with below property from hive shell.

hive> set hive.execution.engine=tez;

.

4. Enable Parallel Execution

Hive converts a query into one or more stages. Stages could be a MapReduce stage, sampling stage, a merge stage, a limit stage. By default, Hive executes these stages one at a time. A particular job may consist of some stages that are not dependent on each other and could be executed in

parallel, possibly allowing the overall job to complete more quickly. Parallel execution can be enabled by setting below properties.

  <property>
    <name>hive.exec.parallel</name>
    <value>true</value>
    <description>Whether to execute jobs in parallel</description>
  </property>
  <property>
    <name>hive.exec.parallel.thread.number</name>
    <value>8</value>
    <description>How many jobs at most can be executed in parallel</description>
  </property>

.

5. Enable Vectorization

Vectorization feature is introduced into hive for the first time in hive-0.13.1 release only. By vectorized query execution, we can improve performance of operations like scans, aggregations, filters and joins, by performing them in batches of 1024 rows at once instead of single row each time.

We can enable vectorized query execution by setting below three properties in either hive shell or hive-site.xml file.

hive> set hive.vectorized.execution.enabled = true;
hive> set hive.vectorized.execution.reduce.enabled = true;
hive> set hive.vectorized.execution.reduce.groupby.enabled = true;

.

6. Enable Cost Based Optimization

Recent Hive releases provided the feature of cost based optimization, one can achieve further optimizations based on query cost, resulting in potentially different decisions: how to order joins, which type of join to perform, degree of parallelism and others.

cost based optimization can be enabled by setting below properties in hive-site.xml file.

  <property>
    <name>hive.cbo.enable</name>
    <value>true</value>
    <description>Flag to control enabling Cost Based Optimizations using Calcite framework.</description>
  </property>
  <property>
    <name>hive.compute.query.using.stats</name>
    <value>true</value>
    <description>
      When set to true Hive will answer a few queries like count(1) purely using stats
      stored in metastore. For basic stats collection turn on the config hive.stats.autogather to true.
      For more advanced stats collection need to run analyze table queries.
    </description>
  </property>
  <property>
    <name>hive.stats.fetch.partition.stats</name>
    <value>true</value>
    <description>
      Annotation of operator tree with statistics information requires partition level basic
      statistics like number of rows, data size and file size. Partition statistics are fetched from
      metastore. Fetching partition statistics for each needed partition can be expensive when the
      number of partitions is high. This flag can be used to disable fetching of partition statistics
      from metastore. When this flag is disabled, Hive will make calls to filesystem to get file sizes
      and will estimate the number of rows from row schema.
    </description>
  </property>
  <property>
    <name>hive.stats.fetch.column.stats</name>
    <value>true</value>
    <description>
      Annotation of operator tree with statistics information requires column statistics.
      Column statistics are fetched from metastore. Fetching column statistics for each needed column
      can be expensive when the number of columns is high. This flag can be used to disable fetching
      of column statistics from metastore.
    </description>
  </property>
  <property>
    <name>hive.stats.autogather</name>
    <value>true</value>
    <description>A flag to gather statistics automatically during the INSERT OVERWRITE command.</description>
  </property>
  <property>
    <name>hive.stats.dbclass</name>
    <value>fs</value>
    <description>
      Expects one of the pattern in [jdbc(:.*), hbase, counter, custom, fs].
      The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'), 
      each task writes statistics it has collected in a file on the filesystem, which will be aggregated 
      after the job has finished. Supported values are fs (filesystem), jdbc:database (where database 
      can be derby, mysql, etc.), hbase, counter, and custom as defined in StatsSetupConst.java.
    </description>
  </property>

这篇关于Hive连接优化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆