如何使用来自MongoDB的过滤记录来构建Spark数据框? [英] How to build Spark data frame with filtered records from MongoDB?

查看:516
本文介绍了如何使用来自MongoDB的过滤记录来构建Spark数据框?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的应用程序是使用MongoDB作为平台构建的. 数据库中的一个集合拥有大量数据,并选择了Apache Spark通过计算来检索和生成分析数据. 我已配置用于MongoDB的Spark连接器以与MongoDB进行通信. 我需要使用 pyspark 查询MongoDB集合,并构建一个由mongodb查询的结果集组成的数据框. 请为我建议一个合适的解决方案.

My application has been built utilizing MongoDB as a platform. One collection in DB has massive volume of data and have opted for apache spark to retrieve and generate analytical data through calculation. I have configured Spark Connector for MongoDB to communicate with MongoDB. I need to query MongoDB collection using pyspark and build a dataframe consisting of resultset of mongodb query. Please suggest me an appropriate solution to it.

推荐答案

您可以像这样直接将数据加载到数据框中:

You can load the data directly into a dataframe like so:

# Create the dataframe
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/mydb.mycoll").load()

# Filter the data via the api
df.filter(people.age > 30)

# Filter via sql
df.registerTempTable("people")
over_thirty = sqlContext.sql("SELECT name, age FROM people WHERE age > 30")

有关更多信息,请参见Mongo Spark连接器 Python API 部分或introduction.py . SQL查询将被转换并传递回连接器,以便在将数据发送到Spark集群之前可以在MongoDB中对其进行查询.

For more information see the Mongo Spark connector Python API section or the introduction.py. The SQL queries are translated and passed back to the connector so that the data can be queried in MongoDB before being sent to the spark cluster.

您还可以提供自己的聚合管道,以在将结果返回到Spark之前应用于集合:

You can also provide your own aggregation pipeline to apply to the collection before returning results into Spark:

dfr = sqlContext.read.option("pipeline", "[{ $match: { name: { $exists: true } } }]")
df = dfr.option("uri", ...).format("com.mongodb.spark.sql.DefaultSource").load()

这篇关于如何使用来自MongoDB的过滤记录来构建Spark数据框?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆