如何使用DataFrame和JDBC连接提高慢速Spark作业的性能? [英] How to improve performance for slow Spark jobs using DataFrame and JDBC connection?

查看:296
本文介绍了如何使用DataFrame和JDBC连接提高慢速Spark作业的性能?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过JDBC在单个节点(local [*])上以独立模式访问中型Teradata表(约1亿行).

I am trying to access a mid-size Teradata table (~100 million rows) via JDBC in standalone mode on a single node (local[*]).

我正在使用Spark 1.4.1.并安装在功能非常强大的计算机(2 cpu,24核,126G RAM)上.

I am using Spark 1.4.1. and is setup on a very powerful machine(2 cpu, 24 cores, 126G RAM).

我尝试了几种内存设置和调整选项,以使其运行更快,但都没有产生很大的影响.

I have tried several memory setup and tuning options to make it work faster, but neither of them made a huge impact.

我确定确实缺少一些东西,下面是我最后的尝试,大约花了11分钟才能得到这个简单的计数,而通过R的JDBC连接仅花费40秒就可以得到计数.

I am sure there is something I am missing and below is my final try that took about 11 minutes to get this simple counts vs it only took 40 seconds using a JDBC connection through R to get the counts.

bin/pyspark --driver-memory 40g --executor-memory 40g

df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()

当我尝试使用BIG表(5B条记录)时,查询完成后未返回任何结果.

When I tried with BIG table (5B records) then no results returned upon completion of query.

推荐答案

所有聚合操作都是在将整个数据集检索到内存中的DataFrame集合之后执行的.因此,在Spark中进行计数永远不会像直接在TeraData中那样高效.有时值得通过创建视图,然后使用JDBC API映射这些视图来将一些计算推送到数据库中.

All of the aggregation operations are performed after the whole dataset is retrieved into memory into a DataFrame collection. So doing the count in Spark will never be as efficient as it would be directly in TeraData. Sometimes it's worth it to push some computation into the database by creating views and then mapping those views using the JDBC API.

每次使用JDBC驱动程序访问大表时,都应指定分区策略,否则将创建具有单个分区的DataFrame/RDD,并且将使单个JDBC连接过载.

Every time you use the JDBC driver to access a large table you should specify the partitioning strategy otherwise you will create a DataFrame/RDD with a single partition and you will overload the single JDBC connection.

相反,您想尝试以下AI(自Spark 1.4.0+起):

Instead you want to try the following AI (since Spark 1.4.0+):

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", 
  lowerBound = minValue,
  upperBound = maxValue,
  numPartitions = 20,
  connectionProperties = new java.util.Properties()
)

还有一个选项可以进行一些过滤.

There is also an option to push down some filtering.

如果没有统一分布的整数列,则要通过指定自定义谓词(where语句)来创建一些自定义分区.例如,假设您有一个timestamp列,并想按日期范围进行分区:

If you don't have an uniformly distributed integral column you want to create some custom partitions by specifying custom predicates (where statements). For example let's suppose you have a timestamp column and want to partition by date ranges:

    val predicates = 
  Array(
    "2015-06-20" -> "2015-06-30",
    "2015-07-01" -> "2015-07-10",
    "2015-07-11" -> "2015-07-20",
    "2015-07-21" -> "2015-07-31"
  )
  .map {
    case (start, end) => 
      s"cast(DAT_TME as date) >= date '$start'  AND cast(DAT_TME as date) <= date '$end'"
  }

 predicates.foreach(println) 

// Below is the result of how predicates were formed 
//cast(DAT_TME as date) >= date '2015-06-20'  AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01'  AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11'  AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21'  AND cast(DAT_TME as date) <= date '2015-07-31'


sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  predicates = predicates,
  connectionProperties = new java.util.Properties()
)

它将生成一个DataFrame,其中每个分区将包含与不同谓词相关的每个子查询的记录.

It will generate a DataFrame where each partition will contain the records of each subquery associated to the different predicates.

查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆