从bigquery加载表,并用pyspark脚本激发集群 [英] load table from bigquery to spark cluster with pyspark script

查看:119
本文介绍了从bigquery加载表,并用pyspark脚本激发集群的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个在bigquery中加载的数据表,我想通过一个pyspark .py文件将它导入到我的Spark集群中。



我在 Dataproc + BigQuery的例子 - 任何可用?,有一种方法来加载一个bigquery表的火花cluster with scala,但是有没有办法在pyspark脚本中做到这一点?

解决方案

这来自@MattJ in 这个问题。这是一个连接到Spark中的BigQuery并执行字数统计的示例。

  import json 
import pyspark
sc = pyspark.SparkContext()

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.get(fs.gs.system.bucket)

conf = {mapred.bq.project.id:< project_id>,mapred.bq.gcs.bucket:< bucket>,
mapred.bq.input.project .id:publicdata,
mapred.bq.input.dataset.id:samples,
mapred.bq.input.table.id:shakespeare}

tableData = sc.newAPIHadoopRDD(
com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat,
org.apache.hadoop.io.LongWritable,com .google.gson.JsonObject,
conf = conf).map(lambda k:json.loads(k [1]))。map(lambda x:(x [word],
int(x [word_count])))。reduceByKey(lambda x,y:x + y)

print tableData.take(10)

您需要更改< project_id> < bucket> 以匹配项目的设置。

I have a data table loaded in bigquery, and I want to import it in my spark cluster via a pyspark .py file.

I saw in Dataproc + BigQuery examples - any available? that there was a way to load a bigquery table in the spark cluster with scala, but is there a way to do it in a pyspark script?

解决方案

This comes from @MattJ in this question. Here's an example to connect to BigQuery in Spark and perform a word count.

import json
import pyspark
sc = pyspark.SparkContext()

hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")

conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>",
    "mapred.bq.input.project.id": "publicdata", 
    "mapred.bq.input.dataset.id":"samples", 
    "mapred.bq.input.table.id": "shakespeare"  }

tableData = sc.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
    "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", 
    conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"],
    int(x["word_count"]))).reduceByKey(lambda x,y: x+y)

print tableData.take(10)

You will need to change <project_id> and <bucket> to match the settings for your project.

这篇关于从bigquery加载表,并用pyspark脚本激发集群的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆