Spark RDD 中的多个分区 [英] Multiple Partitions in Spark RDD

查看:34
本文介绍了Spark RDD 中的多个分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我试图在 Play/Scala 项目中使用 Spark 从 MySQL 数据库中获取数据.由于我尝试接收的行数很大,我的目标是从 spark rdd 中获取迭代器.这是 Spark 上下文和配置...

So I am trying to get data from a MySQL database using Spark within a Play/Scala project. Since the amount of rows I am trying to receive is huge, my aim is to get an Iterator from the spark rdd. Here is the Spark context and configuration...

  private val configuration = new SparkConf()
    .setAppName("Reporting")
    .setMaster("local[*]")
    .set("spark.executor.memory", "2g")
    .set("spark.akka.timeout", "5")
    .set("spark.driver.allowMultipleContexts", "true")

  val sparkContext = new SparkContext(configuration)

JDBCRDD和sql查询如下

The JDBCRDD is as follows along with the sql query

val query =
  """
    |SELECT id, date
    |FROM itembid
    |WHERE date BETWEEN ? AND ?
  """.stripMargin


val rdd = new JdbcRDD[ItemLeadReportOutput](SparkProcessor.sparkContext,
      driverFactory,
      query,
      rangeMinValue.get,
      rangeMaxValue.get,
      partitionCount,
      rowMapper)
      .persist(StorageLevel.MEMORY_AND_DISK)

数据太多,一时搞不定.在开始使用较小的数据集时,可以从 rdd.toLocalIterator 获取迭代器.但是,在这种特定情况下,它无法计算迭代器.所以我的目标是有多个分区并逐个接收数据.我不断收到错误.这样做的正确方法是什么?

The data is too much to get it at once. At the beginning with smaller data sets it was possible the get an iterator from rdd.toLocalIterator. However in this specific case it can not compute an iterator. So my aim is to have multiple partitions and recevie data part by part. I keep getting errors. What is the correct way of doing this ?

推荐答案

我相信您在读取 ​​MySQL 表时遇到了堆问题.

I believe that you are facing a heap problem read your MySQL table.

在你的情况下,我要做的是将数据从 MySQL 获取到存储系统(HDFS、本地)文件中,然后我将使用 spark 的上下文 textFile 来获取它!

What I'll do in your case is to fetch the data from MySQL into the storage system (HDFS, local) files and then I'll use spark's context textFile to fetch it!

示例:

object JDBCExample {

  def main(args: Array[String]) {
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost/database"
    val username = "user"
    val password = "pass"

    var connection: Connection = null

    try {
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)

      // This is the tricky part of reading a huge MySQL table you'll need to set your sql statement as following :
      val statement = connection.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY)
      statement.setMaxRows(0)
      statement.setFetchSize(Integer.MIN_VALUE)

      val resultSet = statement.executeQuery("select * from ex_table")

      val fileWriter = new FileWriter("output.csv")
      val writer = new CSVWriter(fileWriter, '\t');

      while (resultSet.next()) {
        val entries = List(... // process result here //...)
        writer.writeNext(entries.toArray)
      }
      writer.close();

    } catch {
      case e: Throwable => e.printStackTrace
    }
    connection.close()
  }
}

一旦您的数据被存储,您就可以读取它:

Once your data is stored you can read it:

val data = sc.textFile("output.csv")

PS:我在代码中使用了一些快捷方式(每个示例为 CSVWriter),但您可以将其用作您打算执行的操作的骨架!

PS: I've used some shortcuts (CSVWriter per example) in the code but you can use it as a skeleton to what you are intending to do!

这篇关于Spark RDD 中的多个分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆