Spark RDD中的多个分区 [英] Multiple Partitions in Spark RDD

查看:326
本文介绍了Spark RDD中的多个分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我试图在Play/Scala项目中使用Spark从MySQL数据库获取数据.由于我要接收的行数很大,因此我的目标是从spark rdd获取一个Iterator.这是Spark上下文和配置...

So I am trying to get data from a MySQL database using Spark within a Play/Scala project. Since the amount of rows I am trying to receive is huge, my aim is to get an Iterator from the spark rdd. Here is the Spark context and configuration...

  private val configuration = new SparkConf()
    .setAppName("Reporting")
    .setMaster("local[*]")
    .set("spark.executor.memory", "2g")
    .set("spark.akka.timeout", "5")
    .set("spark.driver.allowMultipleContexts", "true")

  val sparkContext = new SparkContext(configuration)

JDBCRDD以及sql查询如下

The JDBCRDD is as follows along with the sql query

val query =
  """
    |SELECT id, date
    |FROM itembid
    |WHERE date BETWEEN ? AND ?
  """.stripMargin


val rdd = new JdbcRDD[ItemLeadReportOutput](SparkProcessor.sparkContext,
      driverFactory,
      query,
      rangeMinValue.get,
      rangeMaxValue.get,
      partitionCount,
      rowMapper)
      .persist(StorageLevel.MEMORY_AND_DISK)

数据太多,无法立即获取.最初,使用较小的数据集可以从rdd.toLocalIterator获得一个迭代器.但是,在这种特定情况下,它无法计算迭代器.因此,我的目标是部分地具有多个分区和接收数据.我不断收到错误消息.正确的方法是什么?

The data is too much to get it at once. At the beginning with smaller data sets it was possible the get an iterator from rdd.toLocalIterator. However in this specific case it can not compute an iterator. So my aim is to have multiple partitions and recevie data part by part. I keep getting errors. What is the correct way of doing this ?

推荐答案

我相信您在阅读MySQL表时遇到堆问题.

I believe that you are facing a heap problem read your MySQL table.

在您的情况下,我要做的是将MySQL的数据提取到存储系统(HDFS,本地)文件中,然后使用spark的上下文textFile进行提取!

What I'll do in your case is to fetch the data from MySQL into the storage system (HDFS, local) files and then I'll use spark's context textFile to fetch it!

示例:

object JDBCExample {

  def main(args: Array[String]) {
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost/database"
    val username = "user"
    val password = "pass"

    var connection: Connection = null

    try {
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)

      // This is the tricky part of reading a huge MySQL table you'll need to set your sql statement as following :
      val statement = connection.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY)
      statement.setMaxRows(0)
      statement.setFetchSize(Integer.MIN_VALUE)

      val resultSet = statement.executeQuery("select * from ex_table")

      val fileWriter = new FileWriter("output.csv")
      val writer = new CSVWriter(fileWriter, '\t');

      while (resultSet.next()) {
        val entries = List(... // process result here //...)
        writer.writeNext(entries.toArray)
      }
      writer.close();

    } catch {
      case e: Throwable => e.printStackTrace
    }
    connection.close()
  }
}

一旦存储了数据,您就可以读取它:

Once your data is stored you can read it:

val data = sc.textFile("output.csv")

PS:我在代码中使用了一些快捷方式(每个示例均使用CSVWriter),但是您可以将其用作要执行的操作的框架!

PS: I've used some shortcuts (CSVWriter per example) in the code but you can use it as a skeleton to what you are intending to do!

这篇关于Spark RDD中的多个分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆