让 Spark、Python 和 MongoDB 协同工作 [英] Getting Spark, Python, and MongoDB to work together

查看:41
本文介绍了让 Spark、Python 和 MongoDB 协同工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很难将这些组件正确地组合在一起.我已经安装并成功运行了 Spark,我可以在本地、独立以及通过 YARN 运行作业.我遵循了建议的步骤(据我所知)这里这里

我正在开发 Ubuntu,我拥有的各种组件版本是

我在执行各个步骤时遇到了一些困难,例如要将哪些 jar 添加到哪个路径,所以我添加的是

  • /usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce 我添加了 mongo-hadoop-core-1.5.0-SNAPSHOT.jar
  • 以下环境变量
    • export HADOOP_HOME=/usr/local/share/hadoop-2.6.1"
    • 导出路径=$PATH:$HADOOP_HOME/bin
    • export SPARK_HOME=/usr/local/share/spark-1.5.1-bin-hadoop2.6"
    • export PYTHONPATH=/usr/local/share/mongo-hadoop/spark/src/main/python"
    • 导出路径=$PATH:$SPARK_HOME/bin

我的 Python 程序很基础

from pyspark import SparkContext, SparkConf导入 pymongo_sparkpymongo_spark.activate()定义主():conf = SparkConf().setAppName(pyspark 测试")sc = SparkContext(conf=conf)rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')如果 __name__ == '__main__':主要的()

我正在使用命令运行它

$SPARK_HOME/bin/spark-submit --driver-class-path/usr/local/share/mongo-hadoop/spark/build/libs/--master local[4] ~/sparkPythonExample/SparkPythonExample.py

结果我得到以下输出

回溯(最近一次调用最后一次):文件/home/me/sparkPythonExample/SparkPythonExample.py",第 24 行,位于 <module> 中.主要的()文件/home/me/sparkPythonExample/SparkPythonExample.py",第 17 行,在主目录中rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')文件/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py",第 161 行,在 mongoRDD返回 self.mongoPairRDD(connection_string, config).values()文件/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py",第 143 行,在 mongoPairRDD_ensure_pickles(自我)文件/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py",第 80 行,在 _ensure_picklesorig_tb)py4j.protocol.Py4JError

根据此处<块引用>

Java 客户端发生异常时引发此异常代码.例如,如果您尝试从空堆栈中弹出一个元素.抛出的 Java 异常实例存储在java_exception 成员.

查看 pymongo_spark.py 的源代码和抛出错误的行,它说

<块引用>

"与 JVM 通信时出错.MongoDB Spark jar 是否打开Spark 的类路径?: "

因此,作为回应,我试图确保传递了正确的 jars,但我可能做错了这一切,见下文

$SPARK_HOME/bin/spark-submit --jars/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py

我已经将 pymongo 导入到同一个 python 程序中,以验证我至少可以使用它访问 MongoDB,而且我可以.

我知道这里有很多活动部分,所以如果我能提供更多有用的信息,请告诉我.

解决方案

更新:

2016-07-04

自上次更新以来,MongoDB Spark 连接器 已经成熟很多.它提供最新的二进制文件和数据基于源代码的 API,但它使用 SparkConf 配置,因此主观上不如 Stratio/Spark-MongoDB 灵活.

2016-03-30

自从最初的答案以来,我发现了两种从 Spark 连接到 MongoDB 的不同方法:

虽然前者似乎相对不成熟,但后者看起来是比 Mongo-Hadoop 连接器更好的选择,并提供了 Spark SQL API.

# 根据你的设置调整 Scala 和包版本# 虽然官方 0.11 只支持 Spark 1.5# 我在 1.6.1 上没有遇到任何问题bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0

df = (sqlContext.read.format("com.stratio.datasource.mongodb").options(host="mongo:27017", database="foo", collection="bar").加载())df.show()## +---+----+-----+## |×|是|_id|## +---+----+-----+## |1.0|-1.0|56fbe6f6e4120712c...|## |0.0|4.0|56fbe701e4120712c...|## +---+----+-----+

它似乎比 mongo-hadoop-spark 稳定得多,支持谓词下推,无需静态配置,简单有效.

原答案:

确实,这里有很多活动部分.我试图通过构建一个简单的 Docker 映像来使其更易于管理,该映像大致匹配所描述的配置(尽管为了简洁,我省略了 Hadoop 库).您可以在 GitHub(DOI 10.5281/zenodo.47882) 并从头开始构建:

git clone https://github.com/zero323/docker-mongo-spark.gitcd docker-mongo-sparkdocker build -t zero323/mongo-spark .

或下载我已推送到 Docker Hub 的图像,以便您可以简单地docker pull zero323/mongo-spark):

开始图像:

docker run -d --name mongo mongo:2.6docker run -i -t --link mongo:mongo zero323/mongo-spark/bin/bash

通过 --jars--driver-class-path 启动 PySpark shell:

pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}

最后看看它是如何工作的:

导入pymongo导入 pymongo_sparkmongo_url = 'mongodb://mongo:27017/'客户端 = pymongo.MongoClient(mongo_url)client.foo.bar.insert_many([{"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])客户端关闭()pymongo_spark.activate()rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url)).map(lambda doc: (doc.get('x'), doc.get('y'))))rdd.collect()## [(1.0, -1.0), (0.0, 4.0)]

请注意,mongo-hadoop 似乎在第一个动作后关闭了连接.所以在 collect 之后调用例如 rdd.count() 会抛出异常.

基于我在创建此图像时遇到的不同问题,我倾向于相信 passing mongo-hadoop-1.5.0-SNAPSHOT.jarmongo-hadoop-spark-1.5.0-SNAPSHOT.jar --jars--driver-class-path是唯一的硬性要求.

注意事项:

I'm having difficulty getting these components to knit together properly. I have Spark installed and working successfully, I can run jobs locally, standalone, and also via YARN. I have followed the steps advised (to the best of my knowledge) here and here

I'm working on Ubuntu and the various component versions I have are

I had some difficulty following the various steps such as which jars to add to which path, so what I have added are

  • in /usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce I have added mongo-hadoop-core-1.5.0-SNAPSHOT.jar
  • the following environment variables
    • export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
    • export PATH=$PATH:$HADOOP_HOME/bin
    • export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
    • export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
    • export PATH=$PATH:$SPARK_HOME/bin

My Python program is basic

from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    rdd = sc.mongoRDD(
        'mongodb://username:password@localhost:27017/mydb.mycollection')

if __name__ == '__main__':
    main()

I am running it using the command

$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py

and I am getting the following output as a result

Traceback (most recent call last):
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
    main()
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
    rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
    return self.mongoPairRDD(connection_string, config).values()
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
    _ensure_pickles(self)
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
    orig_tb)
py4j.protocol.Py4JError

According to here

This exception is raised when an exception occurs in the Java client code. For example, if you try to pop an element from an empty stack. The instance of the Java exception thrown is stored in the java_exception member.

Looking at the source code for pymongo_spark.py and the line throwing the error, it says

"Error while communicating with the JVM. Is the MongoDB Spark jar on Spark's CLASSPATH? : "

So in response, I have tried to be sure the right jars are being passed, but I might be doing this all wrong, see below

$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py

I have imported pymongo to the same python program to verify that I can at least access MongoDB using that, and I can.

I know there are quite a few moving parts here so if I can provide any more useful information please let me know.

解决方案

Updates:

2016-07-04

Since the last update MongoDB Spark Connector matured quite a lot. It provides up-to-date binaries and data source based API but it is using SparkConf configuration so it is subjectively less flexible than the Stratio/Spark-MongoDB.

2016-03-30

Since the original answer I found two different ways to connect to MongoDB from Spark:

While the former one seems to be relatively immature the latter one looks like a much better choice than a Mongo-Hadoop connector and provides a Spark SQL API.

# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0

df = (sqlContext.read
  .format("com.stratio.datasource.mongodb")
  .options(host="mongo:27017", database="foo", collection="bar")
  .load())

df.show()

## +---+----+--------------------+
## |  x|   y|                 _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+

It seems to be much more stable than mongo-hadoop-spark, supports predicate pushdown without static configuration and simply works.

The original answer:

Indeed, there are quite a few moving parts here. I tried to make it a little bit more manageable by building a simple Docker image which roughly matches described configuration (I've omitted Hadoop libraries for brevity though). You can find complete source on GitHub (DOI 10.5281/zenodo.47882) and build it from scratch:

git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .

or download an image I've pushed to Docker Hub so you can simply docker pull zero323/mongo-spark):

Start images:

docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash

Start PySpark shell passing --jars and --driver-class-path:

pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}

And finally see how it works:

import pymongo
import pymongo_spark

mongo_url = 'mongodb://mongo:27017/'

client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
    {"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()

pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
    .map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()

## [(1.0, -1.0), (0.0, 4.0)]

Please note that mongo-hadoop seems to close the connection after the first action. So calling for example rdd.count() after the collect will throw an exception.

Based on different problems I've encountered creating this image I tend to believe that passing mongo-hadoop-1.5.0-SNAPSHOT.jar and mongo-hadoop-spark-1.5.0-SNAPSHOT.jar to both --jars and --driver-class-path is the only hard requirement.

Notes:

这篇关于让 Spark、Python 和 MongoDB 协同工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆