连接mongodb和apache-spark时如何查询 [英] How to query when connecting mongodb with apache-spark

查看:80
本文介绍了连接mongodb和apache-spark时如何查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我现在正在试验Spark和Mongodb,它们使用mongodb-hadoop连接器桥接了spark和mongodb的通信.这是 https://github.com/plaa/mongo-spark 的示例,此示例对我来说效果很好.

然后根据此示例,我使用了 https://github.com/10gen-interns的更大数据集/big-data-exploration ,其中有600万条飞行数据记录.我想做的是查询mongodb数据集,然后做一些进一步的处理.

航班数据的架构位于 https://gist.github.com/sweetieSong/6016700

请参阅数据示例:

{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ),
  "age" : 27,
  "airTime" : 316,
  "airlineId" : 19805,
  "arrDelay" : -37,
  "arrTime" : Date( 1336304580000 ),
  "carrier" : "AA",
  "carrierId" : "AA",
  "crsArrTime" : Date( 1336306800000 ),
  "crsDepTime" : Date( 1336294800000 ),
  "crsElapsedTime" : 380,
  "date" : Date( 1336262400000 ),
  "dayOfMonth" : 6,
  "dayOfWeek" : 7,
  "depDelay" : -5,
  "depTime" : Date( 1336294500000 ),
  "destAirport" : "LAX",
  "destAirportId" : 12892,
  "destCity" : "Los Angeles, CA",
  "destCityId" : 32575,
  "destState" : "California",
  "destStateId" : "CA",
  "destWAC" : 91,
  "distance" : 2475,
  "diverted" : true,
  "elapsedTime" : 348,
  "flightNum" : 1,
  "month" : 5,
  "numDivAirportLandings" : 0,
  "numFlights" : 1,
  "origAirport" : "JFK",
  "origAirportId" : 12478,
  "origCity" : "New York, NY",
  "origCityId" : 31703,
  "origState" : "New York",
  "origStateId" : "NY",
  "origWAC" : 22,
  "quarter" : 2,
  "tailNum" : "N323AA",
  "taxiIn" : 19,
  "taxiOut" : 13,
  "wheelsOff" : Date( 1336295280000 ),
  "wheelsOn" : Date( 1336303440000 ),
  "year" : 2012 }

我的Scala代码是

val sc = new SparkContext("local", "Scala Word Count")

val config = new Configuration()
config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights")
config.set("mongo.input.query","{destAirport: 'LAX'}");
//config.set("mongo.input.query","{_id.destAirport: 'LAX'}");

val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

println(我们正在运行scala..count",mongoRDD.count())

出于测试目的,我只想首先从destAirport'LAX'获取所有记录,我不知道查询的方式,所以我尝试了两种不同格式的查询,"{destAirport:'LAX'}"和"{_id.destAirport:'LAX'}"

在运行应用程序时,控制台会输出此类信息

INFO MongoCollectionSplitter:创建的拆分:min = {"_id":{"$ oid":"51bf29d8ca69141e42097d7f"}},max = {"_id":{"$ oid":"51bf29dfca69141e420991ad"}}

14/08/05 10:30:51 INFO Executor: Running task ID 751
14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192)
14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750)
14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally
14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from:
14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597
14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver
14/08/05 10:30:51 INFO Executor: Finished task ID 751

无论查询是什么(甚至不设置查询),Spark都会始终执行1191个任务.每个任务将输出相似的单词.和mongoRDD.count()始终输出0.

我的第一个问题是什么是正确的查询?

此外,我以前认为mongodb-hadoop的作用是,mongodb首先查询所有集合,然后将结果发送回spark进行处理.但现在在我看来,mongodb会将集合拆分为多个,然后查询集合的一小部分,然后将该部分的结果发送给spark.是吗 ?

解决方案

我的第一个问题是什么是正确的查询?

我认为没有正确"的查询-您需要根据要处理的数据进行查询

此外,我以前认为mongodb-hadoop的作用是,mongodb首先查询所有集合,然后将结果发送回spark进行处理.但现在在我看来,mongodb会将集合拆分为多个,然后查询集合的一小部分,然后将该部分的结果发送给spark.是吗?

我遇到了同样的问题.

我认为,给定MongoInputSplit.class的newAPIHadoopRDD在计算拆分时不考虑查询.它仅在计算拆分后才应用.这意味着无论您的查询有多精简,拆分的数量都将保持不变,并且与集合的大小成正比.

newAPIHadoopRDD正在使用StandaloneMongoSplitter.请注意,此类未使用查询来计算拆分边界.它只是使用mongo内部的"splitVector"命令;从此处的文档中- http://api.mongodb.org/internal/current/commands.html ,看起来也没有考虑到该查询.

我没有一个好的解决方案.更好的方法是仅在计算查询后 才拆分mongo集合,但这需要该拆分器的另一种实现.以下是有关此问题的一些好书: http://www.ikanow.com/how -很好-mongodb与hadoop集成/

I am now experimenting the Spark and Mongodb, which uses mongodb-hadoop connector to bridge the spark and mongodb communication. Here is a example of https://github.com/plaa/mongo-spark, this example works well for me.

Then based on this example, I used a bigger dataset from https://github.com/10gen-interns/big-data-exploration, which has a 6 millions records of flight data. What I want to do is to query the mongodb dataset and then do some further processing.

The schema for the flights data is in https://gist.github.com/sweetieSong/6016700

see data example:

{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ),
  "age" : 27,
  "airTime" : 316,
  "airlineId" : 19805,
  "arrDelay" : -37,
  "arrTime" : Date( 1336304580000 ),
  "carrier" : "AA",
  "carrierId" : "AA",
  "crsArrTime" : Date( 1336306800000 ),
  "crsDepTime" : Date( 1336294800000 ),
  "crsElapsedTime" : 380,
  "date" : Date( 1336262400000 ),
  "dayOfMonth" : 6,
  "dayOfWeek" : 7,
  "depDelay" : -5,
  "depTime" : Date( 1336294500000 ),
  "destAirport" : "LAX",
  "destAirportId" : 12892,
  "destCity" : "Los Angeles, CA",
  "destCityId" : 32575,
  "destState" : "California",
  "destStateId" : "CA",
  "destWAC" : 91,
  "distance" : 2475,
  "diverted" : true,
  "elapsedTime" : 348,
  "flightNum" : 1,
  "month" : 5,
  "numDivAirportLandings" : 0,
  "numFlights" : 1,
  "origAirport" : "JFK",
  "origAirportId" : 12478,
  "origCity" : "New York, NY",
  "origCityId" : 31703,
  "origState" : "New York",
  "origStateId" : "NY",
  "origWAC" : 22,
  "quarter" : 2,
  "tailNum" : "N323AA",
  "taxiIn" : 19,
  "taxiOut" : 13,
  "wheelsOff" : Date( 1336295280000 ),
  "wheelsOn" : Date( 1336303440000 ),
  "year" : 2012 }

My scala code is

val sc = new SparkContext("local", "Scala Word Count")

val config = new Configuration()
config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights")
config.set("mongo.input.query","{destAirport: 'LAX'}");
//config.set("mongo.input.query","{_id.destAirport: 'LAX'}");

val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

println("We're running scala..count ", mongoRDD.count())

For the testing purpose, I just want to first get all the records from destAirport 'LAX', I don't know how query is like so I tried two different format of queries, "{destAirport: 'LAX'}" and "{_id.destAirport: 'LAX'}"

When running the application, the console outputs such info

INFO MongoCollectionSplitter: Created split: min={ "_id" : { "$oid" : "51bf29d8ca69141e42097d7f"}}, max= { "_id" : { "$oid" : "51bf29dfca69141e420991ad"}}

14/08/05 10:30:51 INFO Executor: Running task ID 751
14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192)
14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750)
14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally
14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from:
14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597
14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver
14/08/05 10:30:51 INFO Executor: Finished task ID 751

No matter what the query is (even don't set the query), the spark always execute 1191 Tasks. Each task will output similar words. and mongoRDD.count() always output 0.

My first question is what is the right query?

Moreover previously I thought what mongodb-hadoop does, is that mongodb firstly query all the collection, and then send the results back to spark for processing. but now it seems to me, mongodb will split the collection into many, and then query that small part of collection, and then send the results of that part to spark. Is it ?

解决方案

My first question is what is the right query?

I don't think there's a "right" query - you need to query based on the data you would like to process

Moreover previously I thought what mongodb-hadoop does, is that mongodb firstly query all the collection, and then send the results back to spark for processing. but now it seems to me, mongodb will split the collection into many, and then query that small part of collection, and then send the results of that part to spark. Is it ?

I encountered the same issue.

I believe that newAPIHadoopRDD, given the MongoInputSplit.class, does not account for the query when calculating the splits. It is only applied after the splits are calculated. This means that no matter how lean your query may be, the number of splits will remain the same, and will be proportional to the size of the collection.

newAPIHadoopRDD is using the StandaloneMongoSplitter. Note that this class is not using the query to calculate the split boundaries. It is just using mongo's internal "splitVector" command; from the documentation here - http://api.mongodb.org/internal/current/commands.html, it also looks like it does not account for the query.

I don't have a good solution though. A better approach would split the mongo collection only after calculating the query, but this requires another implementation of the splitter. Here's some good read about the issue: http://www.ikanow.com/how-well-does-mongodb-integrate-with-hadoop/

这篇关于连接mongodb和apache-spark时如何查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆