如何用Mrjob和Hadoop填充Postgresql数据库 [英] How to populate a postgresql database with Mrjob and Hadoop

查看:150
本文介绍了如何用Mrjob和Hadoop填充Postgresql数据库的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想通过使用带有MrJob和Hadoop 2.7.1的映射器来填充Postgresql数据库。我目前使用以下代码:

I would like to populate a database of Postgresql by using a mapper with MrJob and Hadoop 2.7.1. I currently using the following code:

# -*- coding: utf-8 -*-
#Script for storing the sparse data into a database by using Hadoop
import psycopg2
import re
from mrjob.job import MRJob

args_d = False
args_c = True
args_s = True
args_n = 'es_word_space'


def unicodize(segment):
    if re.match(r'\\u[0-9a-f]{4}', segment):
        return segment.decode('unicode-escape')
    return segment.decode('utf-8')

def create_tables(cr):
    cr.execute("create table word_list(id serial primary key, word character varying not null)")
    cr.execute("""create table word_sparse(
        id serial primary key, 
        word_id integer references word_list(id) not null,
        pos integer not null,
        val float not null)""")

def delete_tables(cr):
    cr.execute("drop table word_sparse")
    cr.execute("drop table word_list")

class MRwordStore(MRJob):
    def mapper(self, _, line):
        global cr

        item = line.strip().split('\t')
        replaced = u"".join((unicodize(seg) for seg in re.split(r'(\\u[0-9a-f]{4})', item[0])))
        key = u''.join((c for c in replaced if c != '"'))

        cr.execute("insert into word_list(word) values(%s) returning id", (key,))
        word_id = cr.fetchone()[0]

            #Parse the list, literal_eval is avoided because of memory issues
        inside = False
        number = ""
        pos = 0
        val = 0
        for c in item[1]:
            if c == '[':
                inside = True
            elif c.isdigit():
                number += c
            elif c == ',':
                if inside:
                    pos = int(number)
                    number = ""
            elif c == ']':
                if inside:
                    val = int(number)
                    number = ""
                    cr.execute("insert into word_sparse(word_id, pos, val) values (%s, %s, %s)", (word_id, pos, val))
                inside = False

if __name__ == "__main__":
    """
    Stores words in the database.

    The first time, run with the arguments -cs.
    If the database has to be recreated, run again with the d argument (-dcs)

    It also asumes the owner of the database is a user named semeval with password semeval
    """
    global cr

    conn = psycopg2.connect("dbname=%s user=semeval password=semeval" % args_n)
    cr = conn.cursor()
    if args_d:
        delete_tables(cr)
    if args_c:
        create_tables(cr)
    if args_s:
        MRwordStore().run()

    conn.commit()
    conn.close()

我尝试过通过调用我的脚本,我得到以下输出:

I tried to use not reducer. By calling my script I have this output:

$ python db_store_hadoop.py -r hadoop /almac/ignacio/data/wdSp_sparse.txt
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/db_store_hadoop.hduser.20160113.012419.718376
writing wrapper script to /tmp/db_store_hadoop.hduser.20160113.012419.718376/setup-wrapper.sh
Using Hadoop version 2.7.1
Copying local files into hdfs:///user/hduser/tmp/mrjob/db_store_hadoop.hduser.20160113.012419.718376/files/

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

并没有更多,似乎被绞死了。这是我的输入文件的示例:

and there is not more, it seems to be hanged. Here is a sample of my input file:

"\u00e1gil" [[1572, 1], [1590, 1], [4, 1], [774, 1]]
"\u00e1guila"   [[10, 5], [1116, 2], [15, 1], [1590, 1], [1641, 2], [1704, 1], [1740, 3], [183, 1], [3, 1], [428, 2], [900, 3]]
"\u00e1guilas"  [[1043, 1], [248, 1], [618, 1], [701, 2], [862, 2], [864, 2]]
"\u00e1lava"    [[1572, 1], [1576, 2], [1590, 1], [726, 2]]

长度为1.5gB。我已经创建了数据库,它是空的。非常感谢您的帮助,因为我认为可能存在很多误解。

which is 1.5gB length. I already created the database and it is empty. Thank you very much for your help because I think probably there are many misconceptions.

推荐答案

每个映射器都需要自己的数据库连接。在 mapper_init()中创建数据库连接,并在 mapper_final()中将其关闭。您需要与mrjob脚本分开创建数据库。您应该首先尝试一些非常简单的mrjob脚本。您尚未以正确的方式启动它。浏览文档中的示例。

Each mapper needs its own database connection. Create the database connection in mapper_init() and close it in mapper_final(). You need to create the database separately from the mrjob script. You should try some very simple mrjob scripts first. You haven't started it the correct way. Work through the examples in the documentation.

这篇关于如何用Mrjob和Hadoop填充Postgresql数据库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆