如何使用 DataFrame 和 JDBC 连接提高慢速 Spark 作业的性能? [英] How to improve performance for slow Spark jobs using DataFrame and JDBC connection?

查看:32
本文介绍了如何使用 DataFrame 和 JDBC 连接提高慢速 Spark 作业的性能?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在单个节点 (local[*]) 上以独立模式通过 JDBC 访问中等大小的 Teradata 表(约 1 亿行).

I am trying to access a mid-size Teradata table (~100 million rows) via JDBC in standalone mode on a single node (local[*]).

我使用的是 Spark 1.4.1.并且设置在非常强大的机器上(2 个 CPU、24 个内核、126G RAM).

I am using Spark 1.4.1. and is setup on a very powerful machine(2 cpu, 24 cores, 126G RAM).

我尝试了几种内存设置和调整选项以使其运行速度更快,但它们都没有产生巨大的影响.

I have tried several memory setup and tuning options to make it work faster, but neither of them made a huge impact.

我确定我遗漏了一些东西,下面是我的最后一次尝试,花了大约 11 分钟来获得这个简单的计数,而使用 JDBC 连接通过 R 来获得计数只需要 40 秒.

I am sure there is something I am missing and below is my final try that took about 11 minutes to get this simple counts vs it only took 40 seconds using a JDBC connection through R to get the counts.

bin/pyspark --driver-memory 40g --executor-memory 40g

df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()

当我尝试使用 BIG 表(5B 记录)时,查询完成后没有返回任何结果.

When I tried with BIG table (5B records) then no results returned upon completion of query.

推荐答案

所有的聚合操作都是在整个数据集被检索到内存中成为一个 DataFrame 集合后执行的.因此,在 Spark 中进行计数永远不会像直接在 TeraData 中那样高效.有时,通过创建视图然后使用 JDBC API 映射这些视图,将一些计算推入数据库是值得的.

All of the aggregation operations are performed after the whole dataset is retrieved into memory into a DataFrame collection. So doing the count in Spark will never be as efficient as it would be directly in TeraData. Sometimes it's worth it to push some computation into the database by creating views and then mapping those views using the JDBC API.

每次使用 JDBC 驱动访问大表时,都应指定分区策略,否则将创建带有单个分区的 DataFrame/RDD重载单个 JDBC 连接.

Every time you use the JDBC driver to access a large table you should specify the partitioning strategy otherwise you will create a DataFrame/RDD with a single partition and you will overload the single JDBC connection.

相反,您想尝试以下 AI(自 Spark 1.4.0+ 起):

Instead you want to try the following AI (since Spark 1.4.0+):

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", 
  lowerBound = minValue,
  upperBound = maxValue,
  numPartitions = 20,
  connectionProperties = new java.util.Properties()
)

还有一个选项可以下推一些过滤.

There is also an option to push down some filtering.

如果您没有均匀分布的整数列,您希望通过指定自定义谓词(where 语句)来创建一些自定义分区.例如,假设您有一个时间戳列并希望按日期范围进行分区:

If you don't have an uniformly distributed integral column you want to create some custom partitions by specifying custom predicates (where statements). For example let's suppose you have a timestamp column and want to partition by date ranges:

    val predicates = 
  Array(
    "2015-06-20" -> "2015-06-30",
    "2015-07-01" -> "2015-07-10",
    "2015-07-11" -> "2015-07-20",
    "2015-07-21" -> "2015-07-31"
  )
  .map {
    case (start, end) => 
      s"cast(DAT_TME as date) >= date '$start'  AND cast(DAT_TME as date) <= date '$end'"
  }

 predicates.foreach(println) 

// Below is the result of how predicates were formed 
//cast(DAT_TME as date) >= date '2015-06-20'  AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01'  AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11'  AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21'  AND cast(DAT_TME as date) <= date '2015-07-31'


sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  predicates = predicates,
  connectionProperties = new java.util.Properties()
)

它将生成一个 DataFrame,其中每个分区将包含与不同谓词关联的每个子查询的记录.

It will generate a DataFrame where each partition will contain the records of each subquery associated to the different predicates.

DataFrameReader.scala

这篇关于如何使用 DataFrame 和 JDBC 连接提高慢速 Spark 作业的性能?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆