python multiprocessing + peewee + postgresql失败,出现SSL错误 [英] python multiprocessing + peewee + postgresql fails with SSL error

查看:444
本文介绍了python multiprocessing + peewee + postgresql失败,出现SSL错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个Python模型,该模型能够使用多线程模块和peewee在PostgreSQL数据库中进行一些处理。

I am trying to write a Python model which is capable of doing some processing in a PostgreSQL database using the multi-threading module and peewee.

在单核模式下,代码有效,但是,当我尝试在多个核上运行代码时,遇到SSL错误。

In single core mode the code works, however, when I try to run the code with multiple cores I am running into a SSL error.

我想发布我的模型的结构,希望有人可以建议如何以适当的方式设置我的模型。当前,我选择使用一种面向对象的方法,在该方法中,我建立一个在池中共享的连接。为了阐明我所做的事情,现在我将显示到目前为止的源代码

I would like to post the structure of my model in the hope that somebody can advice how to set of my model in a proper way. Currently, I have chosen to use an object oriented approach in which I make one connection which is shared in a pool. To clarify what I have done, I will now show the source code I have so far

我有三个文件:main.py,models.py和parser.py。内容如下

I have three files: main.py, models.py and parser.py. The contents is the following

models.py定义了peewee postgresql表并建立到postgres服务器的连接

models.py defines the peewee postgresql table and makes a connection to the postgres server

import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase

KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"

# initialise the data base
database = PooledPostgresqlExtDatabase(
    "testdb", user="postgres", host="localhost", port=5432, password="xxxx",
    max_connections=8, stale_timeout=300 )


class BaseModel(pw.Model):
    class Meta:
        database = database
        only_save_dirty = True


# this class describes the format of the sql data base
class Company(BaseModel):
    id_number = pw.IntegerField(primary_key=True)
    name = pw.CharField(null=True)
    n_vowels = pw.IntegerField(default=-1)
    processor = pw.IntegerField(default=-1)


def connect_database(database_name, reset_database=False):
    """ connect the database """
    database.connect()
    if reset_database:
        database.drop_tables([Company])
    database.create_tables([Company])

parser.py包含CompanyParser类,该类为用作代码的引擎来执行所有处理。它会生成一些人工数据,并将其存储到postgresql数据库中,然后使用 run 方法对已经存储在数据库中的数据进行一些处理

parser.py contains the CompanyParser class which is used as the engine of the code to do all the processing. It generates some artificial data which is stored to the postgresql database and then the run method is used to do some processing with the data already stored in the database

import pandas as pd
import numpy as np
import random
import string
import peewee as pw
from models import (Company, database, KVK_KEY, NAME_KEY)
import multiprocessing as mp

MAX_SQL_CHUNK = 1000

np.random.seed(0)


def random_name(size=8, chars=string.ascii_lowercase):
    """ Create a random character string of 'size' characters """
    return "".join(random.choice(chars) for _ in range(size))


def vowel_count(characters):
    """
    Count the number of vowels in the string 'characters' and return as an integer
    """
    count = 0
    for char in characters:
        if char in list("aeiou"):
            count += 1
    return count


class CompanyParser(mp.Process):
    def __init__(self, number_of_companies=100, i_proc=None,
                 number_of_procs=1,
                 first_id=None, last_id=None):
        if i_proc is not None and number_of_procs > 1:
            mp.Process.__init__(self)

        self.i_proc = i_proc
        self.number_of_procs = number_of_procs
        self.n_companies = number_of_companies
        self.data_df: pd.DataFrame = None

        self.first_id = first_id
        self.last_id = last_id

    def generate_data(self):
        """ Create a dataframe with fake company data and id's """
        id_list = np.random.randint(1000000, 9999999, self.n_companies)
        company_list = np.array([random_name() for _ in range(self.n_companies)])
        self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
                                    columns=[KVK_KEY, NAME_KEY])
        self.data_df.sort_values([KVK_KEY], inplace=True)

    def store_to_database(self):
        """
        Store the company data to a sql database
        """
        record_list = list(self.data_df.to_dict(orient="index").values())

        n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1

        with database.atomic():
            for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
                print(f"writing {cnt}/{n_batch}")
                Company.insert_many(batch).execute()

    def run(self):
        print("Making query at {}".format(self.i_proc))
        query = (Company.
                 select().
                 where(Company.id_number.between(self.first_id, self.last_id)))
        print("Found {} companies".format(query.count()))

        for cnt, company in enumerate(query):
            print("Processing @ {} - {}:  company {}/{}".format(self.i_proc, cnt,
                                                                company.id_number,
                                                                company.name))
            number_of_vowels = vowel_count(company.name)
            company.n_vowels = number_of_vowels
            company.processor = self.i_proc
            print(f"storing number of vowels: {number_of_vowels}")
            company.save()

我的主脚本加载了存储在models.py和parser.py中的类,并启动了代码。

Finally, my main script load the class stored in the models.py and parser.py and launches the code.

from models import (Company, connect_database)
from parser import CompanyParser

number_of_processors = 2
connect_database(None, reset_database=True)

# init an object of the CompanyParser and use the create database 
parser = CompanyParser()

company_ids = Company.select(Company.id_number)
parser.generate_data()
parser.store_to_database()

n_companies = company_ids.count()
n_comp_per_proc = int(n_companies / number_of_processors)
print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))

for i_proc in range(number_of_processors):
    i_start = i_proc * n_comp_per_proc
    first_id = company_ids[i_start]
    last_id = company_ids[i_start + n_comp_per_proc - 1]

    print(f"Running proc {i_proc} for id {first_id} until id {last_id}")
    sub_parser = CompanyParser(first_id=first_id, last_id=last_id,
                               i_proc=i_proc,
                               number_of_procs=number_of_processors)

    if number_of_processors > 1:
        sub_parser.start()
    else:
        sub_parser.run()

如果 number_of_processors = 1 ,此脚本可以很好地工作。它生成人工数据,将其存储到PostgreSQL数据库中,并对数据进行一些处理(计算名称中的元音数量并将其存储到n_vowels列中)。

In case that the number_of_processors = 1 this script works perfectly fine. It generates artificial data, stores it to the PostgreSQL database and does some processing on the data (it counts the number of vowels in the name and stores it to the n_vowels column)

但是,如果我尝试使用 number_of_processors = 2 的2个内核运行此程序,则会遇到以下错误

However, in case I am trying to run this with 2 cores with number_of_processors = 2, I run into the following error

/opt/miniconda3/bin/python /home/eelco/PycharmProjects/multiproc_peewee/main.py
writing 0/1
Found 100 companies: 50 per proc
Running proc 0 for id 1020737 until id 5295565
Running proc 1 for id 5302405 until id 9891087
Making query at 0
Found 50 companies
Processing @ 0 - 0:  company 1020737/wqrbgxiu
storing number of vowels: 2
Making query at 1
Process CompanyParser-1:
Processing @ 0 - 1:  company 1086107/lkbagrbc
storing number of vowels: 1
Processing @ 0 - 2:  company 1298367/nsdjsqio
storing number of vowels: 2
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: sslv3 alert bad record mac


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 82, in run
    company.save()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 5748, in save
    rows = self.update(**field_dict).where(self._pk_expr()).execute()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
    return self._execute(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2121, in _execute
    cursor = database.execute(self)
  File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
    cursor = self.execute_sql(sql, params, commit=commit)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
    self.commit()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
    reraise(new_type, new_type(*exc_args), traceback)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: sslv3 alert bad record mac

Process CompanyParser-2:
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
psycopg2.OperationalError: SSL error: decryption failed or bad record mac


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/eelco/PycharmProjects/multiproc_peewee/parser.py", line 72, in run
    print("Found {} companies".format(query.count()))
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1881, in count
    return Select([clone], [fn.COUNT(SQL('1'))]).scalar(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1866, in scalar
    row = self.tuples().peek(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1853, in peek
    rows = self.execute(database)[:n]
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1625, in inner
    return method(self, database, *args, **kwargs)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1696, in execute
    return self._execute(database)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 1847, in _execute
    cursor = database.execute(self)
  File "/opt/miniconda3/lib/python3.7/site-packages/playhouse/postgres_ext.py", line 468, in execute
    cursor = self.execute_sql(sql, params, commit=commit)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2721, in execute_sql
    self.commit()
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2512, in __exit__
    reraise(new_type, new_type(*exc_args), traceback)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/opt/miniconda3/lib/python3.7/site-packages/peewee.py", line 2714, in execute_sql
    cursor.execute(sql, params or ())
peewee.OperationalError: SSL error: decryption failed or bad record mac


Process finished with exit code 0

当第二个线程开始对数据库执行操作时。是否有人建议使此代码正常工作。我已经尝试过以下操作

Somehow something goes wrong as soon as the second thread start to do something with the database. Does somebody has advice to get this code working. I have tried the following already


  • 尝试将PooledPostgresDatabase和普通的PostgresqlDatabase连接到
    来连接数据库。这会导致相同的错误

  • 尝试使用sqlite代替postgres。这适用于2个内核,但前提是两个进程之间的干扰不太多。否则我
    会出现一些锁定问题。我的印象是postgres
    比sqlite更适合进行多处理(是真的吗?)

  • 在启动第一个过程后稍作休息(因此仅使用有效一个核心),代码有效,表明 start 方法已正确调用。

  • Try the PooledPostgresDatabase and normal PostgresqlDatabase to connect to the database. This leads to the same error
  • Try using sqlite in stead of postgres. This works for 2 cores, but only if the two processes are not interfering too much; otherwise I can some locking problems. I was in the impression that postgres would be better for doing multiprocessing then sqlite (is that true?)
  • When putting a break after launching the first process(so effectively using only one core), the code works, showing that the start method is called correctly.

希望有人可以

关于
Eelco

Regards Eelco

推荐答案

今天在互联网上搜索时,我在这里找到了解决问题的方法: github.com/鞘翅目。正如coleifer提到的:在开始连接数据库之前,您显然首先必须设置所有fork。基于这个想法,我修改了我的代码,现在可以正常工作了。

After some searching on the internet today I found the solution for my problem here:github.com/coleifer. As coleifer mentions: you apparently first have to set up all the forks before you start connecting to the database. Based on this idea I have modified my code and it is working now.

对于那些感兴趣的人,我将再次发布我的python脚本,以便您了解我的工作方式。这是因为我那里没有那么多明确的例子,所以也许对别人有帮助。

For those interested I will post my python scripts again so you can see how I did it. This because I there is not so much explicit examples out there, so perhaps it may help others.

首先,所有数据库和peewee模块现在都移入了初始化函数,这些函数只能在CompanyParser类的构造函数中调用。
因此,models.py看起来像

First of all, all the database and peewee modules are now moved into initialization functions which are only called inside the constructor of the CompanyParser class. So models.py looks like

import peewee as pw
from playhouse.pool import PooledPostgresqlExtDatabase, PostgresqlDatabase, PooledPostgresqlDatabase

KVK_KEY = "id_number"
NAME_KEY = "name"
N_VOWELS_KEY = "n_vowels"


def init_database():
    db = PooledPostgresqlDatabase(
        "testdb", user="postgres", host="localhost", port=5432, password="xxxxx",
        max_connections=8, stale_timeout=300)
    return db


def init_models(db, reset_tables=False):

    class BaseModel(pw.Model):
        class Meta:
            database = db

    # this class describes the format of the sql data base
    class Company(BaseModel):
        id_number = pw.IntegerField(primary_key=True)
        name = pw.CharField(null=True)
        n_vowels = pw.IntegerField(default=-1)
        processor = pw.IntegerField(default=-1)

    if db.is_closed():
        db.connect()
    if reset_tables and Company.table_exists():
        db.drop_tables([Company])
    db.create_tables([Company])

    return Company

然后,在解析器中定义了工人类CompanyParser .py脚本,如下所示:

Then, the worker class CompanyParser is defined in the parser.py script and looks like this

import multiprocessing as mp
import random
import string

import numpy as np
import pandas as pd
import peewee as pw

from models import (KVK_KEY, NAME_KEY, init_database, init_models)

MAX_SQL_CHUNK = 1000

np.random.seed(0)


def random_name(size=32, chars=string.ascii_lowercase):
    """ Create a random character string of 'size' characters """
    return "".join(random.choice(chars) for _ in range(size))


def vowel_count(characters):
    """
    Count the number of vowels in the string 'characters' and return as an integer
    """
    count = 0
    for char in characters:
        if char in list("aeiou"):
            count += 1
    return count


class CompanyParser(mp.Process):
    def __init__(self, reset_tables=False,
                 number_of_companies=100, i_proc=None,
                 number_of_procs=1, first_id=None, last_id=None):
        if i_proc is not None and number_of_procs > 1:
            mp.Process.__init__(self)

        self.i_proc = i_proc
        self.reset_tables = reset_tables

        self.number_of_procs = number_of_procs
        self.n_companies = number_of_companies
        self.data_df: pd.DataFrame = None

        self.first_id = first_id
        self.last_id = last_id

        # initialise the database and models
        self.database = init_database()
        self.Company = init_models(self.database, reset_tables=self.reset_tables)

    def generate_data(self):
        """ Create a dataframe with fake company data and id's and return the array of id's"""
        id_list = np.random.randint(1000000, 9999999, self.n_companies)
        company_list = np.array([random_name() for _ in range(self.n_companies)])
        self.data_df = pd.DataFrame(data=np.vstack([id_list, company_list]).T,
                                    columns=[KVK_KEY, NAME_KEY])
        self.data_df.drop_duplicates([KVK_KEY], inplace=True)
        self.data_df.sort_values([KVK_KEY], inplace=True)
        return self.data_df[KVK_KEY].values

    def store_to_database(self):
        """
        Store the company data to a sql database
        """
        record_list = list(self.data_df.to_dict(orient="index").values())

        n_batch = int(len(record_list) / MAX_SQL_CHUNK) + 1

        with self.database.atomic():
            for cnt, batch in enumerate(pw.chunked(record_list, MAX_SQL_CHUNK)):
                print(f"writing {cnt}/{n_batch}")
                self.Company.insert_many(batch).execute()

    def run(self):
        query = (self.Company.
                 select().
                 where(self.Company.id_number.between(self.first_id, self.last_id)))

        for cnt, company in enumerate(query):
            print("Processing @ {} - {}:  company {}/{}".format(self.i_proc, cnt, company.id_number,
                                                                company.name))
            number_of_vowels = vowel_count(company.name)
            company.n_vowels = number_of_vowels
            company.processor = self.i_proc
            try:
                company.save()
            except (pw.OperationalError, pw.InterfaceError) as err:
                print("failed save for {} {}: {}".format(self.i_proc, cnt, err))
            else:
                pass

最后,启动进程的main.py脚本:

Finally, the main.py script which launches the processes:

from parser import CompanyParser
import time


def main():
    number_of_processors = 2
    number_of_companies = 10000

    parser = CompanyParser(number_of_companies=number_of_companies, reset_tables=True)
    company_ids = parser.generate_data()
    parser.store_to_database()

    n_companies = company_ids.size
    n_comp_per_proc = int(n_companies / number_of_processors)
    print("Found {} companies: {} per proc".format(n_companies, n_comp_per_proc))
    if not parser.database.is_closed():
        parser.database.close()

    processes = list()
    for i_proc in range(number_of_processors):
        i_start = i_proc * n_comp_per_proc
        first_id = company_ids[i_start]
        last_id = company_ids[i_start + n_comp_per_proc - 1]

        print(f"Running proc {i_proc} for id {first_id} until id {last_id}")

        sub_parser = CompanyParser(first_id=first_id, last_id=last_id, i_proc=i_proc,
                                   number_of_procs=number_of_processors)

        if number_of_processors > 1:
            sub_parser.start()
        else:
            sub_parser.run()

        processes.append(sub_parser)

    # this blocks the script until all processes are done
    for job in processes:
        job.join()

    # make sure all the connections are closed
    for i_proc in range(number_of_processors):
        db = processes[i_proc].database
        if not db.is_closed():
            db.close()
    print("Goodbye!")


if __name__ == "__main__":

    start = time.time()
    main()
    duration = time.time() - start
    print(f"Done in {duration} s")

可以看到,数据库连接是在类内部的每个进程完成的。
该示例有效,并且是多处理+ peewee和PostgreSQL的完整示例。希望这可以帮助其他人。如果您有任何意见或改进建议,请告诉我。

As you can see, the database connection is done per process inside the class. This example works and is a full example of multiprocessing + peewee and PostgreSQL. Hopefully this may help others. In case you have any comments or suggestions for improvement please let me know.

这篇关于python multiprocessing + peewee + postgresql失败,出现SSL错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆