优化:将JSON从流API转储到Mongo [英] Optimization: Dumping JSON from a Streaming API to Mongo

查看:123
本文介绍了优化:将JSON从流API转储到Mongo的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景: 我设置了一个python模块,以从流API抓取JSON对象,并使用pymongo将它们(一次插入25个)批量存储在MongoDB中.为了进行比较,我还具有从相同的流API到curl的bash命令,以及从pipemongoimport的bash命令.这两种方法都将数据存储在单独的集合中.

Background: I have a python module set up to grab JSON objects from a streaming API and store them (bulk insert of 25 at a time) in MongoDB using pymongo. For comparison, I also have a bash command to curl from the same streaming API and pipe it to mongoimport. Both these approaches store data in separate collections.

我会定期监视集合的count(),以检查它们的运行情况.

Periodically, I monitor the count() of the collections to check how they fare.

到目前为止,我看到python模块落后于curl | mongoimport方法约1000个JSON对象.

So far, I see the python module lagging by about 1000 JSON objects behind the curl | mongoimport approach.

问题: 如何优化python模块使其与 curl | mongoimport保持同步?

Problem: How can I optimize my python module to be ~ in sync with the curl | mongoimport?

我不能使用tweetstream,因为我没有使用Twitter API,而是使用了第三方流媒体服务.

I cannot use tweetstream since I am not using the Twitter API but a 3rd party streaming service.

有人可以在这里帮我吗?

Could someone please help me out here?

Python模块:

Python module:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

感谢您的阅读.

推荐答案

摆脱了StringIO库.在这种情况下,由于WRITEFUNCTION回调handle_data被每一行调用,只需直接加载JSON.但是,有时数据中可能包含两个JSON对象.抱歉,我无法发布所使用的curl命令,因为它包含我们的凭据.但是,正如我所说,这是适用于所有流式API的普遍问题.

Got rid of the StringIO library. As the WRITEFUNCTION callback handle_data, in this case, gets invoked for every line, just load the JSON directly. Sometimes, however, there could be two JSON objects contained in data. I am sorry, I can't post the curl command that I use as it contains our credentials. But, as I said, this is a general issue applicable to any streaming API.


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    

这篇关于优化:将JSON从流API转储到Mongo的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆