使用蒙戈 - Hadoop的连接器通过Apache星火更新集合在MongoDB中 [英] Update collection in MongoDb via Apache Spark using Mongo-Hadoop connector

查看:384
本文介绍了使用蒙戈 - Hadoop的连接器通过Apache星火更新集合在MongoDB中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想通过在Java中的火花,更新的MongoDB特定集合。
我使用的是 MongoDB的连接器Hadoop的来检索和的阿帕奇星火到MongoDB的Java编写的。

I would like to update a specific collection in MongoDb via Spark in Java. I am using the MongoDB Connector for Hadoop to retrieve and save information from Apache Spark to MongoDb in Java.

以下声宝尼斯坎南的优秀后对检索和保存后通过星火,我被困与更新集合集合到MongoDB的。

After following Sampo Niskanen's excellent post regarding retrieving and saving collections to MongoDb via Spark, I got stuck with updating collections.

<一个href=\"https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/MongoOutputFormat.java\"相对=nofollow> MongoOutputFormat.java 包括构造采取的String [] updateKeys,我猜是指键的可能名单来比较现有集合并进行更新。但是,使用星火的 saveAsNewApiHadoopFile()方法,参数 MongoOutputFormat.class ,我想知道如何使用更新的构造。

MongoOutputFormat.java includes a constructor taking String[] updateKeys, which I am guessing is referring to a possible list of keys to compare on existing collections and perform an update. However, using Spark's saveAsNewApiHadoopFile() method with parameter MongoOutputFormat.class, I am wondering how to use that update constructor.

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

在此之前,<一href=\"https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/io/MongoUpdateWritable.java\"相对=nofollow> MongoUpdateWritable.java 正在用来执行集合更新。从我所看到Hadoop的例子,这通常被设置在 mongo.job.output.value ,也许这样的星火:

Prior to this, MongoUpdateWritable.java was being used to perform collection updates. From examples I've seen on Hadoop, this is normally set on mongo.job.output.value, maybe like this in Spark:

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);

不过,我仍然不知道如何指定 MongoUpdateWritable.java 更新密钥。

诚然,作为一个哈克方式,我给自己定的目标的_id我的文档的键值,这样,当执行保存,收集将覆盖具有相同键值为的文件_id

Admittedly, as a hacky way, I've set the "_id" of the object as my document's KeyValue so that when a save is performed, the collection will overwrite the documents having the same KeyValue as _id.

JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
    BSONObject o = (BSONObject) s._1;

    //for all keys, set _id to key:value_
    String id = "";
    for (String key : o.keySet()){
        id += key + ":" + (String) o.get(key) + "_";
    }
    o.put("_id", id);

    o.put("result", s._2);
    return new Tuple2<>(null, o);
});

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

我想通过星火使用执行MongoDB的集合更新 MongoOutputFormat MongoUpdateWritable 配置,最好使用 saveAsNewAPIHadoopFile()方法。可能吗?如果没有,是否有没有具体涉及_id设置为我想更新的键值任何其他方式?

I would like to perform the mongodb collection update via Spark using MongoOutputFormat or MongoUpdateWritable or Configuration, ideally using the saveAsNewAPIHadoopFile() method. Is it possible? If not, is there any other way that does not involve specifically setting the _id to the key values I want to update on?

推荐答案

我试过的 config.set几个组合(mongo.job.output.value,......)和几个组合

.saveAsNewAPIHadoopFile(
        "file:///bogus",
        classOf[Any],
        classOf[Any],
        classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
        mongo_config
      )

和他们没有工作。

我把它用 MongoUpdateWritable 类作为自己的地图的方法输出工作:

I made it to work by using MongoUpdateWritable class as output of my map method:

items.map(row => {
      val mongo_id = new ObjectId(row("id").toString)
      val query = new BasicBSONObject()
      query.append("_id", mongo_id)
      val update = new BasicBSONObject()

      update.append("$set", new BasicBSONObject().append("field_name", row("new_value")))
      val muw = new MongoUpdateWritable(query,update,false,true)
      (null, muw)
    })
     .saveAsNewAPIHadoopFile(
       "file:///bogus",
       classOf[Any],
       classOf[Any],
       classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
       mongo_config
     )

在蒙戈执行的原始查询然后是这样的:

The raw query executed in mongo is then something like this:

2014-11-09T13:32:11.609-0800 [conn438] update db.users query: { _id: ObjectId('5436edd3e4b051de6a505af9') } update: { $set: { value: 10 } } nMatched:1 nModified:0 keyUpdates:0 numYields:0 locks(micros) w:24 3ms

这篇关于使用蒙戈 - Hadoop的连接器通过Apache星火更新集合在MongoDB中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆