Spark Mongo连接器,MongoShardedPartitioner不起作用 [英] Spark Mongo connector, MongoShardedPartitioner does not work

查看:300
本文介绍了Spark Mongo连接器,MongoShardedPartitioner不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

出于测试目的,我配置了一个4节点集群,每个集群都有一个Spark Worker和一个MongoDB Shard.这些是详细信息:

For testing purposes, I have configured a 4-node cluster, each of them has a Spark Worker and a MongoDB Shard. These are the details:

  • 四个Debian 9服务器(名为Visa0,Visa1,Visa2,Visa3)
  • 4个节点上的Spark(v2.4.0)集群(签证1:主节点,签证0..3:从节点)
  • MongoDB(v3.2.11)分片群集包含4个节点(在Visa1..3上设置了配置服务器副本,在Visa1上设置了mongos,在分片服务器上设置了Visa0..3)
  • 我正在使用安装有"spark-shell --packages"的MongoDB Spark连接器 org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
  • Four Debian 9 servers (named visa0, visa1, visa2, visa3)
  • Spark(v2.4.0) cluster on 4 nodes (visa1: master, visa0..3: slaves)
  • MongoDB (v3.2.11) sharded cluster con 4 nodes ( config server replica set on visa1..3, mongos on visa1, shard servers: visa0..3 )
  • I'm using MongoDB Spark connector installed with "spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

使用MongoShardedPartitioner配置SparkSession时,尽管正确提取了数据框架构,但从数据库加载的每个数据框都是空的.

When configuring SparkSession with MongoShardedPartitioner, every dataframe loaded from the database is empty, though the dataframe schema is fetched correctly.

可以在spark-defaults.conf文件中完成配置,也可以在SparkSession构建器中使用.config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner")来完成配置.

This is reproduced either the configuration is done in the spark-defaults.conf file or with .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") in the SparkSession builder.

使用MongoShardedPartitioner时,df.count()== 0:

With MongoShardedPartitioner, df.count() == 0:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
>>>                                                                             
>>> df2.count()
0  

但是在不指定分区程序的情况下可以正常工作:

But works correctly without specifying partitioner:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 22:7:33 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162  

问题:

  • 如何知道默认情况下配置了哪个分区?
  • 在这种情况下如何使用MongoShardedPartitioner?

预先感谢

2019年1月13日:推荐的解决方法

如以下回答所示,似乎MongoShardedPartitioner不支持将散列索引用作分片索引.但是,我需要一个哈希索引来将块均匀地分布在我的节点上,而不依赖于时间(我想使用_id会按时间顺序分布).

As answered below, it seems that MongoShardedPartitioner does not support hashed indexes as shard index. However, I need a hash index to distribute the chunks evenly on my nodes, independently of time (using _id would distribute chronologically, I guess).

我的解决方法是在数据库中创建一个新字段,并使用日期存储区的md5哈希值对它进行索引(作为普通索引)并将其用作分片索引.

My workaround has been to create a new field in the database with the computed md5 hash of a date bucket, indexing it (as a normal index), and using it as shard index.

现在,代码可以正常工作了:

Now, the code works fine:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> 
>>> 
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .config("spark.mongodb.input.partitionerOptions.shardkey", "datebuckethash") \
...   .getOrCreate()
>>> 
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()

2019-01-13 11:19:31 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162   

推荐答案

很抱歉,您听到连接器有问题.

Sorry jose to hear you are having an issue with the connector.

如何知道默认情况下配置了哪个分区?

How can I know which partitioner is configured by default?

有关分区程序的信息可以在 Spark连接器文档站点上找到.如果您觉得有任何遗漏或不清楚的地方,请在 Docs jira项目中提交票证可以帮助将来的用户!

Information regarding partitioners can be found on the Spark connector documentation site. Please file a ticket in the Docs jira project if you feel anything is missing or unclear, it really could help future users!

默认分区程序是围绕 MongoSamplePartitioner的薄包装器. .它基于对集合的统计抽样,将集合分为多个大小的分区.

The default partitioner is a thin wrapper around the MongoSamplePartitioner. It splits up a collection into sized partitions based on statistical sampling of the collection.

在这种情况下如何使用MongoShardedPartitioner?

MongoShardedPartitioner 使用生成分区.默认情况下,它将使用_id作为密钥.您可能需要配置该值.

The MongoShardedPartitioner uses the shardKey to generate the partitions. By default it will use _id as the key. You may need to configure that value.

注意:MongoShardedPartitioner不支持散列分片,,因为当前无法根据散列值查询集合-因此,在检索分区时,它将无法返回结果.我添加了 DOCS-12345 来更新文档.

Note: Hashed shardkeys are not supported by the MongoShardedPartitioner as currently there is no way to query a collection against the hashed value - so when retrieving partitions it will fail to return results. I've added DOCS-12345 to update the documentation.

您的设置中似乎存在一个问题,其中MongoShardedPartitioner无法按预期方式对集合进行分区,并返回0个结果.由于模式查询将查询集合,因此模式推断仍将起作用.如果不是配置/哈希分片问题,请发出 Spark jira项目中的错误.并且我可以帮助您找出原因并为您发布修复程序.

It looks like there is an issue in your setup where the MongoShardedPartitioner is failing to partition the collection as expected and returning 0 results. Schema inference will still work because of how it queries the collection. If its not a config / hashed shardkey issue then issue please file a bug in the Spark jira project and I can help identify the cause and release a fix for you.

这篇关于Spark Mongo连接器,MongoShardedPartitioner不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆