python struct.error:'i'格式要求-2147483648< =数字< = 2147483647 [英] python struct.error: 'i' format requires -2147483648 <= number <= 2147483647

查看:270
本文介绍了python struct.error:'i'格式要求-2147483648< =数字< = 2147483647的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我愿意使用多处理模块(multiprocessing.Pool.starmap()进行特征工程. 但是,它给出以下错误信息.我猜这个错误消息大约是输入的大小(2147483647 = 2 ^ 31 − 1?),因为相同的代码对于输入数据帧(train_scala,test,ts)的一部分(frac=0.05)均能顺利工作.我将数据帧的类型转换为最小,但是并没有变好.

I'm willing to do a feature engineering using multiprocessing module (multiprocessing.Pool.starmap(). However, it gives an error message as follows. I guess this error message is about the size of inputs (2147483647 = 2^31 − 1?), since the same code worked smoothly for a fraction(frac=0.05) of input dataframes(train_scala, test, ts). I convert types of data frame as smallest as possible, however it does not get better.

anaconda版本为4.3.30,Python版本为3.6(64位). 该系统的内存大小超过128GB,具有20多个内核. 您想提出任何指针或解决方案来克服此问题吗?如果此问题是由于多处理模块的大量数据引起的,那么我应该使用多少较小的数据来利用Python3上的多处理模块?

The anaconda version is 4.3.30 and the Python version is 3.6 (64 bit). And the memory size of the system is over 128GB with more than 20 cores. Would you like to suggest any pointer or solution to overcome this problem? If this problem is caused by a large data for a multiprocessing module, How much smaller data should I use to utilize the multiprocessing module on Python3?

代码:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)

错误消息:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

其他信息

  • historyCutoffs是一个整数列表
  • train_scala是熊猫的DataFrame(377MB)
  • test是pandas DataFrame(15MB)
  • ts是一个熊猫数据框(547MB)
  • ul_parts_path是目录(字符串)的列表
  • is_train_seq是布尔值列表
  • Extra infos

    • historyCutoffs is a list of integers
    • train_scala is a pandas DataFrame (377MB)
    • test is a pandas DataFrame (15MB)
    • ts is a pandas DataFrame (547MB)
    • ul_parts_path is a list of directories (string)
    • is_train_seq is a list of booleans
    • 附加代码:方法multiprocess_FE

      def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
          train_dict = {}
          ts_dict = {}
          msno_dict = {}
          ul_dict = {}
          if is_train == True:
              train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
          else:
              train_dict[historyCutoff] = test
          msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
          print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
          ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
          print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
          ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
          ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
          train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)
      

      推荐答案

      进程之间的通信协议使用酸洗,并且酸洗数据的前缀是酸洗数据的大小.对于您的方法,所有参数一起被腌制为一个对象.

      The communication protocol between processes uses pickling, and the pickled data is prefixed with the size of the pickled data. For your method, all arguments together are pickled as one object.

      您生成了一个对象,该对象腌制后的大小大于i结构格式化程序(一个四字节有符号整数)的大小,这破坏了代码所做的假设.

      You produced an object that when pickled is larger than fits in a i struct formatter (a four-byte signed integer), which breaks the assumptions the code has made.

      您可以改为将数据帧的读取委托给子进程,仅跨加载数据帧所需的元数据发送.它们的总大小接近1GB,以至于过多的数据无法在进程之间通过管道共享.

      You could delegate reading of your dataframes to the child process instead, only sending across the metadata needed to load the dataframe. Their combined size is nearing 1GB, way too much data to share over a pipe between your processes.

      编程指南部分中引用:

      Quoting from the Programming guidelines section:

      比泡菜/腌制更好的继承

      使用spawnforkserver启动方法时,multiprocessing中的许多类型都需要可腌制,以便子进程可以使用它们. 但是,通常应该避免使用管道或队列将共享对象发送到其他进程.相反,您应该安排程序,以便需要访问在其他位置创建的共享资源的进程可以从祖先进程继承该程序.

      When using the spawn or forkserver start methods many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

      如果您未在Windows上运行并且使用spawnforkserver方法,则可以在启动子流程之前 将数据框加载为全局变量,此时子进程将通过普通的操作系统写时复制内存页面共享机制继承"数据.

      If you are not running on Windows and use either the spawn or forkserver methods, you could load your dataframes as globals before starting your subprocesses, at which point the child processes will 'inherit' the data via the normal OS copy-on-write memory page sharing mechanisms.

      请注意,对于非Windows系统,此限制已在Python 3.8中提高为无符号长整型(8个字节),因此您现在可以发送和接收4个此提交,以及Python问题#17560

      Note that this limit was raised for non-Windows systems in Python 3.8, to an unsigned long long (8 bytes), and so you can now send and receive 4 EiB of data. See this commit, and Python issues #35152 and #17560.

      如果您无法升级并且无法利用资源继承,并且无法在Windows上运行,请使用以下修补程序:

      If you can't upgrade and you can't make use of resource inheriting, and are not running on Windows, then use this patch:

      import functools
      import logging
      import struct
      import sys
      
      logger = logging.getLogger()
      
      
      def patch_mp_connection_bpo_17560():
          """Apply PR-10305 / bpo-17560 connection send/receive max size update
      
          See the original issue at https://bugs.python.org/issue17560 and 
          https://github.com/python/cpython/pull/10305 for the pull request.
      
          This only supports Python versions 3.3 - 3.7, this function
          does nothing for Python versions outside of that range.
      
          """
          patchname = "Multiprocessing connection patch for bpo-17560"
          if not (3, 3) < sys.version_info < (3, 8):
              logger.info(
                  patchname + " not applied, not an applicable Python version: %s",
                  sys.version
              )
              return
      
          from multiprocessing.connection import Connection
      
          orig_send_bytes = Connection._send_bytes
          orig_recv_bytes = Connection._recv_bytes
          if (
              orig_send_bytes.__code__.co_filename == __file__
              and orig_recv_bytes.__code__.co_filename == __file__
          ):
              logger.info(patchname + " already applied, skipping")
              return
      
          @functools.wraps(orig_send_bytes)
          def send_bytes(self, buf):
              n = len(buf)
              if n > 0x7fffffff:
                  pre_header = struct.pack("!i", -1)
                  header = struct.pack("!Q", n)
                  self._send(pre_header)
                  self._send(header)
                  self._send(buf)
              else:
                  orig_send_bytes(self, buf)
      
          @functools.wraps(orig_recv_bytes)
          def recv_bytes(self, maxsize=None):
              buf = self._recv(4)
              size, = struct.unpack("!i", buf.getvalue())
              if size == -1:
                  buf = self._recv(8)
                  size, = struct.unpack("!Q", buf.getvalue())
              if maxsize is not None and size > maxsize:
                  return None
              return self._recv(size)
      
          Connection._send_bytes = send_bytes
          Connection._recv_bytes = recv_bytes
      
          logger.info(patchname + " applied")
      

      这篇关于python struct.error:'i'格式要求-2147483648&lt; =数字&lt; = 2147483647的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆