python struct.error: 'i' 格式需要 -2147483648 <= number <= 2147483647 [英] python struct.error: 'i' format requires -2147483648 <= number <= 2147483647

查看:28
本文介绍了python struct.error: 'i' 格式需要 -2147483648 <= number <= 2147483647的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题

我愿意使用多处理模块 (multiprocessing.Pool.starmap().但是,它会给出如下错误消息.我猜这个错误消息是关于输入的大小 (2147483647 = 2^31 − 1?),因为相同的代码对于输入数据帧的分数(frac=0.05)(train_scala, test, ts).我将数据框的类型转换为尽可能小,但它并没有变得更好.

anaconda 版本是 4.3.30,Python 版本是 3.6(64 位).并且系统内存超过128GB,拥有20多个核心.您想建议任何指针或解决方案来克服这个问题吗?如果这个问题是由多处理模块的大数据引起的,我应该使用多少小数据来利用 Python3 上的多处理模块?

代码:

from multiprocessing import Pool, cpu_count从 itertools 导入重复p = 池(8)is_train_seq = [真]*len(historyCutoffs)+[假]config_zip = zip(historyCutoffs,repeat(train_scala),repeat(test),repeat(ts),ul_parts_path,repeat(members),is_train_seq)p.starmap(multiprocess_FE, config_zip)

错误信息:

回溯(最近一次调用最后一次): 中的文件main_1210_FE_scala_multiprocessing.py",第 705 行print('----池星图开始----')文件/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py",第274行,在星图中返回 self._map_async(func, iterable, starmapstar, chunksize).get()文件/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py",第644行,在get提高 self._value_handle_tasks 中的文件/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py",第 424 行放置(任务)文件/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py",第206行,发送self._send_bytes(_ForkingPickler.dumps(obj))文件/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py",第393行,_send_bytesheader = struct.pack("!i", n)struct.error: 'i' 格式需要 -2147483648 <= number <= 2147483647

额外信息

  • historyCutoffs 是一个整数列表
  • train_scala 是一个 Pandas DataFrame (377MB)
  • test 是一个 Pandas DataFrame (15MB)
  • ts 是一个 Pandas DataFrame (547MB)
  • ul_parts_path 是目录列表(字符串)
  • is_train_seq 是布尔值列表

额外代码:方法 multiprocess_FE

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):train_dict = {}ts_dict = {}msno_dict = {}ul_dict = {}如果 is_train == 真:train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]别的:train_dict[historyCutoff] = 测试msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)print('msno 的长度是 {:d} 在截止 {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) &(ts.msno.isin(msno_dict[historyCutoff]))]print('交易长度为 {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt")) ##.sample(frac=0.01, replace=False)ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]train_dict[historyCutoff] = rich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)

解决方案

进程间的通信协议使用pickling,pickled 数据以pickled 数据的大小作为前缀.对于您的方法,所有参数一起被腌制为一个对象.

您生成了一个对象,当pickled 大于i 结构格式化程序(一个四字节有符号整数)时,它打破了代码所做的假设.

您可以将数据帧的读取委托给子进程,只发送加载数据帧所需的元数据.它们的总大小接近 1GB,太多的数据无法在进程之间通过管道共享.

引用自编程指南部分:

<块引用>

继承比pickle/unpickle更好

当使用 spawnforkserver 启动方法时,multiprocessing 中的许多类型都需要可以选择,以便子进程可以使用它们.但是,通常应该避免使用管道或队列将共享对象发送到其他进程.相反,您应该安排程序,以便需要访问在别处创建的共享资源的进程可以从祖先进程继承它.

如果您不是在 Windows 上运行并使用 spawnforkserver 方法,您可以在启动您的之前将数据帧加载为全局变量子进程,此时子进程将通过正常的操作系统写时复制内存页面共享机制继承"数据.

请注意,此限制在 Python 3.8 中针对非 Windows 系统提高到 unsigned long long(8 字节),因此您现在可以发送和接收 4 EiB 数据.请参阅此提交,以及 Python 问题 #35152#17560.>

如果你不能升级,不能利用资源继承,又不是在Windows上运行,那么就用这个补丁:

import functools导入日志导入结构导入系统记录器 = logging.getLogger()def patch_mp_connection_bpo_17560():"""应用 PR-10305/bpo-17560 连接发送/接收最大大小更新请参阅 https://bugs.python.org/issue17560 上的原始问题和https://github.com/python/cpython/pull/10305 用于拉取请求.仅支持 Python 3.3 - 3.7 版本,此功能对超出该范围的 Python 版本不执行任何操作."""patchname = "bpo-17560 的多处理连接补丁"如果不是 (3, 3) 0x7ffffff:pre_header = struct.pack("!i", -1)header = struct.pack("!Q", n)self._send(pre_header)self._send(标题)self._send(buf)别的:orig_send_bytes(self, buf)@functools.wraps(orig_recv_bytes)def recv_bytes(self, maxsize=None):buf = self._recv(4)size, = struct.unpack("!i", buf.getvalue())如果大小 == -1:buf = self._recv(8)size, = struct.unpack("!Q", buf.getvalue())如果 maxsize 不是 None 并且 size >最大尺寸:返回无返回 self._recv(size)Connection._send_bytes = send_bytesConnection._recv_bytes = recv_byteslogger.info(补丁名称+已应用")

Problem

I'm willing to do a feature engineering using multiprocessing module (multiprocessing.Pool.starmap(). However, it gives an error message as follows. I guess this error message is about the size of inputs (2147483647 = 2^31 − 1?), since the same code worked smoothly for a fraction(frac=0.05) of input dataframes(train_scala, test, ts). I convert types of data frame as smallest as possible, however it does not get better.

The anaconda version is 4.3.30 and the Python version is 3.6 (64 bit). And the memory size of the system is over 128GB with more than 20 cores. Would you like to suggest any pointer or solution to overcome this problem? If this problem is caused by a large data for a multiprocessing module, How much smaller data should I use to utilize the multiprocessing module on Python3?

Code:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)

Error Message:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

Extra infos

  • historyCutoffs is a list of integers
  • train_scala is a pandas DataFrame (377MB)
  • test is a pandas DataFrame (15MB)
  • ts is a pandas DataFrame (547MB)
  • ul_parts_path is a list of directories (string)
  • is_train_seq is a list of booleans

Extra Code: Method multiprocess_FE

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
    train_dict = {}
    ts_dict = {}
    msno_dict = {}
    ul_dict = {}
    if is_train == True:
        train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
    else:
        train_dict[historyCutoff] = test
    msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
    print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
    ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
    print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
    ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
    ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
    train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)

解决方案

The communication protocol between processes uses pickling, and the pickled data is prefixed with the size of the pickled data. For your method, all arguments together are pickled as one object.

You produced an object that when pickled is larger than fits in a i struct formatter (a four-byte signed integer), which breaks the assumptions the code has made.

You could delegate reading of your dataframes to the child process instead, only sending across the metadata needed to load the dataframe. Their combined size is nearing 1GB, way too much data to share over a pipe between your processes.

Quoting from the Programming guidelines section:

Better to inherit than pickle/unpickle

When using the spawn or forkserver start methods many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

If you are not running on Windows and use either the spawn or forkserver methods, you could load your dataframes as globals before starting your subprocesses, at which point the child processes will 'inherit' the data via the normal OS copy-on-write memory page sharing mechanisms.

Note that this limit was raised for non-Windows systems in Python 3.8, to an unsigned long long (8 bytes), and so you can now send and receive 4 EiB of data. See this commit, and Python issues #35152 and #17560.

If you can't upgrade and you can't make use of resource inheriting, and are not running on Windows, then use this patch:

import functools
import logging
import struct
import sys

logger = logging.getLogger()


def patch_mp_connection_bpo_17560():
    """Apply PR-10305 / bpo-17560 connection send/receive max size update

    See the original issue at https://bugs.python.org/issue17560 and 
    https://github.com/python/cpython/pull/10305 for the pull request.

    This only supports Python versions 3.3 - 3.7, this function
    does nothing for Python versions outside of that range.

    """
    patchname = "Multiprocessing connection patch for bpo-17560"
    if not (3, 3) < sys.version_info < (3, 8):
        logger.info(
            patchname + " not applied, not an applicable Python version: %s",
            sys.version
        )
        return

    from multiprocessing.connection import Connection

    orig_send_bytes = Connection._send_bytes
    orig_recv_bytes = Connection._recv_bytes
    if (
        orig_send_bytes.__code__.co_filename == __file__
        and orig_recv_bytes.__code__.co_filename == __file__
    ):
        logger.info(patchname + " already applied, skipping")
        return

    @functools.wraps(orig_send_bytes)
    def send_bytes(self, buf):
        n = len(buf)
        if n > 0x7fffffff:
            pre_header = struct.pack("!i", -1)
            header = struct.pack("!Q", n)
            self._send(pre_header)
            self._send(header)
            self._send(buf)
        else:
            orig_send_bytes(self, buf)

    @functools.wraps(orig_recv_bytes)
    def recv_bytes(self, maxsize=None):
        buf = self._recv(4)
        size, = struct.unpack("!i", buf.getvalue())
        if size == -1:
            buf = self._recv(8)
            size, = struct.unpack("!Q", buf.getvalue())
        if maxsize is not None and size > maxsize:
            return None
        return self._recv(size)

    Connection._send_bytes = send_bytes
    Connection._recv_bytes = recv_bytes

    logger.info(patchname + " applied")

这篇关于python struct.error: 'i' 格式需要 -2147483648 &lt;= number &lt;= 2147483647的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆