如何正确迭代Big Query TableResult? [英] How to iterate Big Query TableResult correctly?

查看:74
本文介绍了如何正确迭代Big Query TableResult?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Big Query中有一个复杂的联接查询,需要在spark作业中运行.这是当前代码:

I have a complex join query in Big Query and need to run in a spark job. This is the current code:

val bigquery = BigQueryOptions.newBuilder().setProjectId(bigQueryConfig.bigQueryProjectId)
      .setCredentials(credentials)
      .build().getService

val query =
      //some complex query

val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder(
        query)
        .setUseLegacySql(false)
        .setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
        .build()

val jobId: JobId = JobId.newBuilder().setRandomJob().build()

val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

val output = result.iterateAll().iterator().asScala.to[Seq].map { row: FieldValueList =>

//create case class from the row
}

它不断遇到此错误:

超出的速率限制:您的项目:XXX超出了tabledata.list每个项目每秒的字节数的配额.

Exceeded rate limits: Your project: XXX exceeded quota for tabledata.list bytes per second per project.

有没有一种方法可以更好地遍历结果?我已经尝试对查询作业配置执行 setPriority(QueryJobConfiguration.Priority.BATCH),但是它并不能改善结果.还尝试将Spark执行程序的数量减少到1,但没有用.

Is there a way to better iterate through the results? I have tried to do setPriority(QueryJobConfiguration.Priority.BATCH) on the query job configuration, but it doesn't improve results. Also tried to reduce the number of spark executors to 1, but of no use.

推荐答案

您可以直接使用 spark-bigquery-connector 将其读取到DataFrame中:

Instead of reading the query results directly, you can use the spark-bigquery-connector to read them into a DataFrame:

val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder(
        query)
        .setUseLegacySql(false)
        .setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
        .setDestinationTable(TableId.of(destinationDataset, destinationTable))
        .build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()

val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

// read into DataFrame
val data = spark.read.format("bigquery")
  .option("dataset",destinationDataset)
  .option("table" destinationTable)
  .load()

这篇关于如何正确迭代Big Query TableResult?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆