在python中的进程之间共享连续的numpy数组 [英] Sharing contiguous numpy arrays between processes in python

查看:143
本文介绍了在python中的进程之间共享连续的numpy数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

虽然我找到了许多类似于我的问题的答案,但我不认为这里已经直接解决了这个问题,而且我还有其他一些问题.共享连续的numpy数组的动机如下:

  • 我正在使用在Caffe上运行的卷积神经网络对图像进行回归到一系列连续值标签.
  • 图像需要特定的预处理和数据增强.
  • (1)标签的连续性质(它们是浮动的)和(2)数据扩充的约束意味着我正在python中预处理数据,然后使用inCaffe中的内存数据层.
  • 将训练数据加载到内存中比较慢.我想将其并行化,以便:

(1)我正在编写的python创建了一个数据处理程序"类,该类实例化了两个连续的numpy数组.(2)工作进程在这些numpy数组之间交替,从磁盘加载数据,执行预处理,然后将数据插入numpy数组.(3)同时,python Caffe包装器将数据从 other 数组发送到GPU,以通过网络运行.

我有几个问题:

  1. 是否有可能在连续的numpy数组中分配内存,然后将其包装在共享内存对象中(我不确定"object"在这里是否是正确的术语),方法是使用python多处理中的Array类?

  2. Numpy数组具有.ctypes属性,我认为这对于从Array()实例化共享内存数组很有用,但似乎无法确切确定如何使用它们.

  3. 如果在没有numpy数组的情况下实例化共享内存 ,它是否保持连续?如果不是,是否有办法确保它保持连续?

是否可以做类似的事情:

 将numpy导入为np从多处理导入数组contArr = np.ascontiguousarray(np.zeros((n_images,n_channels,img_height,img_width)),dtype = np.float32)sm_contArr =数组(contArr.ctypes.?,contArr?) 

然后使用实例化工作程序

  p.append(Process(target = some_worker_function,args =(data_to_load,sm_contArr)))p.start() 

谢谢!

我知道有很多库在不同的维护状态下具有相似的功能.我宁愿将其限制为纯python和numpy,但如果不可能的话,我当然会愿意使用它.

解决方案

将numpy的 ndarray 包裹在多处理的 RawArray()

周围

有多种方法可以跨进程共享内存中的 numpy 数组.让我们看看如何使用 multiprocessing 模块来做到这一点.

第一个重要发现是 numpy 提供了 np.frombuffer()函数,将 ndarray 接口包装在预先存在的接口周围支持缓冲协议的对象(例如 bytes() bytearray() array()等).这将根据只读对象创建只读数组,并根据可写对象创建可写数组.

我们可以将其与 multiprocessing 提供的共享内存 RawArray() 结合起来.请注意, Array()不适用于该目的,因为它是具有锁定的代理对象,并且不会直接公开缓冲区接口.当然,这意味着我们需要自己对被数字化的RawArrays 进行适当的同步.

关于 ndarray 包裹的 RawArrays ,存在一个复杂的问题:当 multiprocessing 在进程之间发送这样的数组时-实际上,它需要将我们创建的数组发送给两个工作人员-腌制然后解开它们.不幸的是,这导致它创建了 ndarrays 的副本,而不是在内存中共享它们.

解决方案虽然有点丑陋,但要保持 RawArrays 不变,直到将其转移给工作人员,并且仅将它们包装在.

此外,最好直接通过 multiprocessing.Queue ,但这也不起作用.不能将 RawArray 放入这样的 Queue 内,并且包裹了 ndarray 的容器将被腌制和去腌制,因此实际上是被复制的.>

解决方法是将所有预分配数组的列表发送到工作进程,并通过 Queues 将索引中的索引传达给该列表.就像传递令牌(索引)一样,持有令牌的人都可以在关联的数组上进行操作.

主程序的结构如下:

 <代码>#!/usr/bin/env python3#-*-编码:utf-8-*-将numpy导入为np导入队列从多处理导入freeze_support,set_start_method从多处理导入事件,流程,队列从multiprocessing.sharedctypes导入RawArraydef create_shared_arrays(size,dtype = np.int32,num = 2):dtype = np.dtype(dtype)如果dtype.isbuiltin和dtype.char位于'bBhHiIlLfd'中:类型代码= dtype.char别的:类型代码,大小='B',大小* dtype.itemsize返回[RawArray(typecode,size)for _ in range(num)]def main():my_dtype = np.float32#125000000(大小)* 4(dtype)* 2(数字)〜= 1 GB内存使用量数组= create_shared_arrays(125000000,dtype = my_dtype)q_free =队列()q_used = Queue()保释= Event()对于范围内的arr_id(len(arrays)):q_free.put(arr_id)#使用分配的数组索引预填充空闲队列pr1 = MyDataLoader(arrays,q_free,q_used,bail,dtype = my_dtype,步长= 1024)pr2 = MyDataProcessor(arrays,q_free,q_used,bail,dtype = my_dtype,步长= 1024)pr1.start()pr2.start()pr2.join()print("\ n {}已加入.".format(pr2.name))pr1.join()print("{}加入.".format(pr1.name))如果__name__ =='__main__':Frozen_support()#在Windows上,仅"spawn"可用.#同样,这测试了阵列的正确共享而没有作弊".set_start_method('spawn')主要的() 

这将准备两个数组的列表,两个 Queues -一个免费"队列,其中 MyDataProcessor 放置它完成的数组索引和 MyDataLoader >从一个二手"队列中获取它们,其中 MyDataLoader 放置容易填充的数组的索引,而 MyDataProcessor 从-中获取它们,并从 multiprocessing.Event 从所有工人中发起一致的保释.我们现在可以取消后者,因为我们只有一个数组的生产者和一个消费者,但是为更多的工人做准备并没有什么坏处.

然后,我们在列表中用 RawArrays 的所有索引预先填充空" Queue ,并实例化每种类型的worker之一,并向他们传递必要的通信对象.我们将它们都启动,然后等待它们 join().

这是 MyDataProcessor 的样子,它使用二手" 队列中的数组索引,并将数据发送到某个外部黑盒( debugio.在示例中输出):

  class MyDataProcessor(Process):def __init __(自身,数组,q_free,q_used,bail,dtype = np.int32,step = 1):super().__ init __()self.arrays =数组self.q_free = q_freeself.q_used = q_usedself.bail =保释self.dtype = dtypeself.step =步骤def运行(自己):#将RawArrays包装在ndarrays内数组= [np.frombuffer(arr,dtype = self.dtype)for self.arrays中的arr]从debugio导入输出为writer而True:arr_id = self.q_used.get()如果arr_id为None:休息arr =数组[arr_id]print('(',end ='',flush = True)#只是可视化活动对于范围(0,len(arr),self.step)中的j:writer.write(str(arr [j])+'\ n')print(')',end ='',flush = True)#只是可视化活动self.q_free.put(arr_id)writer.flush()self.bail.set()#告诉加载者尽快纾困self.q_free.put(None,timeout = 1)#在get()上唤醒加载程序阻塞尝试:而True:self.q_used.get_nowait()#唤醒put()上的加载程序阻塞除了queue.Empty:经过 

首先,它使用'np.frombuffer()'将接收到的 RawArrays 包装在 ndarrays 中,并保留新列表,因此它们可用作numpy 数组在进程运行时运行,而不必一遍又一遍地包装它们.

还请注意, MyDataProcessor 只会写入 self.bail Event ,而不会对其进行检查.相反,如果需要通知它退出,它将在队列上找到 None 标记而不是数组索引.当 MyDataLoader 没有更多数据可用并启动拆解过程时, MyDataProcessor 仍可以处理队列中的所有有效数组而不会过早退出./p>

这是 MyDataLoader 的样子:

  class MyDataLoader(Process):def __init __(自身,数组,q_free,q_used,bail,dtype = np.int32,step = 1):super().__ init __()self.arrays =数组self.q_free = q_freeself.q_used = q_usedself.bail =保释self.dtype = dtypeself.step =步骤def运行(自己):#将RawArrays包装在ndarrays内数组= [np.frombuffer(arr,dtype = self.dtype)for self.arrays中的arr]从debugio导入输入作为读取器for _ in range(10):#测试通过一定数量的通过后结束如果self.bail.is_set():#我们在等待put()时被要求纾困返回arr_id = self.q_free.get()如果arr_id为None:#我们被要求在等待get()时纾困self.q_free.put(None,timeout = 1)#放回下一个加载器返回如果self.bail.is_set():#当我们得到一个普通数组时,我们被要求纾困返回arr =数组[arr_id]eof =假print('<',end ='',flush = True)#只是可视化活动对于范围(0,len(arr),self.step)中的j:行= reader.readline()如果不行:eof =真休息arr [j] = np.fromstring(line,dtype = self.dtype,sep ='\ n')如果eof:print('EOF>',end ='',flush = True)#只是可视化活动休息print('>',end ='',flush = True)#只是可视化活动如果self.bail.is_set():#我们在填充数组时被要求纾困返回self.q_used.put(arr_id)#告诉处理器数组已填充如果不是self.bail.is_set():self.bail.set()#告诉其他装载者尽快救助#标记处理器的数据结尾,因为我们是第一个纾困的人self.q_used.put(无) 

它的结构与其他工人非常相似.之所以有点肿,是因为它会在许多地方检查 self.bail Event ,以减少卡住的可能性.(这并不是万无一失的,因为在检查和访问 Queue 之间设置 Event 的可能性很小.如果这是一个问题,则需要使用一些同步功能.原始仲裁访问对 Event Queue 的合并访问.)

它也从一开始就将接收到的 RawArrays 包装在 ndarrays 中,并从外部黑盒( debugio.input 例如).

请注意,通过对 main()函数中的两个工作人员使用 step = 参数,我们可以更改完成读写的比例(严格地出于测试目的-在生产环境中, step = 1 ,读写所有 numpy 数组成员).

同时增加两个值使工作进程仅访问 numpy 数组中的几个值,从而显着加快了所有工作,这表明性能不受工作进程之间的通信的限制流程.如果我们将 numpy 数组直接放在 Queues 上,在整个进程之间来回复制它们,增加步长不会显着改善性能-它本来可以仍然很慢.

作为参考,这是我用于测试的 debugio 模块:

 <代码>#!/usr/bin/env python3#-*-编码:utf-8-*-从AST导入literal_eval从io导入RawIOBase,BufferedReader,BufferedWriter,TextIOWrapper类DebugInput(RawIOBase):def __init __(self,end = None):如果end不为None并且end<0:引发ValueError("end必须为非负数")super().__ init __()self.pos = 0self.end =结束可读(自己):返回Truedef读取(self,size = -1):如果self.end为None:如果尺寸<0:引发NotImplementedError(大小必须为非负数")结束= self.pos +大小小号尺寸<0:结束= self.end别的:结束=分钟(self.pos +大小,self.end)线= []而self.pos<结尾:偏移量= self.pos%400pos = self.pos-偏移量如果偏移量<18:i =(偏移+ 2)//2pos + = i * 2-2小数偏移量<288:i =(偏移+ 12)//3pos + = i * 3-12别的:i =(偏移+ 112)//4pos + = i * 4-112行= str(i).encode('ascii')+ b'\ n'行=行[self.pos-pos:end-pos]self.pos + = len(行)大小-= len(线)lines.append(line)返回b''.join(行)def readinto(self,b):数据= self.read(len(b))b [:len(data)] =数据返回len(数据)def seekable(个体):返回Truedef seek(自我,偏移量,whence = 0):如果whence == 0:pos =偏移elif wherece == 1:pos = self.pos +偏移量elif wherece == 2:如果self.end为None:引发ValueError(无法寻求无限流的结束")pos = self.end +偏移量别的:引发NotImplementedError(未知值")self.pos = max((如果self.end为None则为pos min(pos,self.end)),0)返回self.pos类DebugOutput(RawIOBase):def __init __():super().__ init __()self.buf = b''self.num = 1def可写(自己):返回Truedef write(self,b):*线,self.buf =(self.buf + b).split(b'\ n')对于行中的行:值= literal_eval(line.decode('ascii'))如果value!= int(value)或int(value)&255!= self.num:引发ValueError("expected {},got {}".format(self.num,value))self.num = self.num%127 + 1返回len(b)输入= TextIOWrapper(BufferedReader(DebugInput()),encoding ='ascii')输出= TextIOWrapper(BufferedWriter(DebugOutput()),encoding ='ascii') 

While I have found numerous answers to questions similar to mine, I don't believe it has been directly addressed here--and I have several additional questions. The motivation for sharing contiguous numpy arrays is as follows:

  • I'm using a convolutional neural network run on Caffe to perform a regression on images to a series of continuous-value labels.
  • The images require specific preprocessing and data augmentation.
  • The constraints of (1) the continuous nature of the labels (they're floats) and (2) the data augmentation means that I'm preprocessing the data in python and then serving it up as contiguous numpy arrays using the in-memory data layer in Caffe.
  • Loading the training data into memory is comparatively slow. I'd like to parallelize it such that:

(1) The python I'm writing creates a "data handler" class which instantiates two contiguous numpy arrays. (2) A worker process alternates between those numpy arrays, loading the data from the disk, performing preprocessing, and inserting the data into the numpy array. (3) Meanwhile, the python Caffe wrappers send data from the other array to the GPU to be run through the net.

I have a few questions:

  1. Is it possible to allocate memory in a contiguous numpy array then wrap it in a shared memory object (I'm not sure if 'object' is the correct term here) using something like the Array class from python's multiprocessing?

  2. Numpy arrays have a .ctypes attribute, I presume this is useful for the instantiation of shared memory arrays from Array(), but can't seem to determine precisely how to use them.

  3. If the shared memory is instantiated without the numpy array, does it remain contiguous? If not, is there a way to ensure it does remain contiguous?

Is it possible to do something like:

import numpy as np
from multiprocessing import Array
contArr = np.ascontiguousarray(np.zeros((n_images, n_channels, img_height, img_width)), dtype=np.float32)
sm_contArr = Array(contArr.ctypes.?, contArr?)

Then instantiate the worker with

p.append(Process(target=some_worker_function, args=(data_to_load, sm_contArr)))
p.start()

Thanks!

Edit: I'm aware there are a number of libraries that have similar functions in varying states of maintenance. I would prefer to restrict this to pure python and numpy, but if that's not possible I would of course be willing to use one.

解决方案

Wrap numpy's ndarray around multiprocessing's RawArray()

There are multiple ways to share numpy arrays in memory across processes. Let's have a look at how you can do it using the multiprocessing module.

The first important observation is that numpy provides the np.frombuffer() function to wrap an ndarray interface around a preexisting object that supports the buffer protocol (such as bytes(), bytearray(), array() and so on). This creates read-only arrays from read-only objects and writable arrays from writable objects.

We can combine that with the shared memory RawArray() that multiprocessing provides. Note that Array() doesn't work for that purpose, as it is a proxy object with a lock and doesn't directly expose the buffer interface. Of course that means that we need to provide for proper synchronization of our numpified RawArrays ourselves.

There is one complicating issue regarding ndarray-wrapped RawArrays: When multiprocessing sends such an array between processes - and indeed it will need to send our arrays, once created, to both workers - it pickles and then unpickles them. Unfortunately, that results in it creating copies of the ndarrays instead of sharing them in memory.

The solution, while a bit ugly, is to keep the RawArrays as is until they are transferred to the workers and only wrap them in ndarrays once each worker process has started.

Furthermore, it would have been preferable to communicate arrays, be it a plain RawArray or an ndarray-wrapped one, directly via a multiprocessing.Queue, but that doesn't work, either. A RawArray cannot be put inside such a Queue and an ndarray-wrapped one would have been pickled and unpickled, so in effect copied.

The workaround is to send a list of all pre-allocated arrays to the worker processes and communicate indices into that list over the Queues. It's very much like passing around tokens (the indices) and whoever holds the token is allowed to operate on the associated array.

The structure of the main program could look like this:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
import queue

from multiprocessing import freeze_support, set_start_method
from multiprocessing import Event, Process, Queue
from multiprocessing.sharedctypes import RawArray


def create_shared_arrays(size, dtype=np.int32, num=2):
    dtype = np.dtype(dtype)
    if dtype.isbuiltin and dtype.char in 'bBhHiIlLfd':
        typecode = dtype.char
    else:
        typecode, size = 'B', size * dtype.itemsize

    return [RawArray(typecode, size) for _ in range(num)]


def main():
    my_dtype = np.float32

    # 125000000 (size) * 4 (dtype) * 2 (num) ~= 1 GB memory usage
    arrays = create_shared_arrays(125000000, dtype=my_dtype)
    q_free = Queue()
    q_used = Queue()
    bail = Event()

    for arr_id in range(len(arrays)):
        q_free.put(arr_id)  # pre-fill free queue with allocated array indices

    pr1 = MyDataLoader(arrays, q_free, q_used, bail,
                       dtype=my_dtype, step=1024)
    pr2 = MyDataProcessor(arrays, q_free, q_used, bail,
                          dtype=my_dtype, step=1024)

    pr1.start()
    pr2.start()

    pr2.join()
    print("\n{} joined.".format(pr2.name))

    pr1.join()
    print("{} joined.".format(pr1.name))


if __name__ == '__main__':
    freeze_support()

    # On Windows, only "spawn" is available.
    # Also, this tests proper sharing of the arrays without "cheating".
    set_start_method('spawn')
    main()

This prepares a list of two arrays, two Queues - a "free" queue where MyDataProcessor puts array indices it is done with and MyDataLoader fetches them from as well as a "used" queue where MyDataLoader puts indices of readily filled arrays and MyDataProcessor fetches them from - and a multiprocessing.Event to start a concerted bail out of all workers. We could do away with the latter for now, as we have only one producer and one consumer of arrays, but it doesn't hurt being prepared for more workers.

Then we pre-fill the "empty" Queue with all indices of our RawArrays in the list and instantiate one of each type of workers, passing them the necessary communication objects. We start both of them and just wait for them to join().

Here's how MyDataProcessor could look like, which consumes array indices from the "used" Queue and sends the data off to some external black box (debugio.output in the example):

class MyDataProcessor(Process):
    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
        super().__init__()
        self.arrays = arrays
        self.q_free = q_free
        self.q_used = q_used
        self.bail = bail
        self.dtype = dtype
        self.step = step

    def run(self):
        # wrap RawArrays inside ndarrays
        arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]

        from debugio import output as writer

        while True:
            arr_id = self.q_used.get()
            if arr_id is None:
                break

            arr = arrays[arr_id]

            print('(', end='', flush=True)          # just visualizing activity
            for j in range(0, len(arr), self.step):
                writer.write(str(arr[j]) + '\n')
            print(')', end='', flush=True)          # just visualizing activity

            self.q_free.put(arr_id)

            writer.flush()

        self.bail.set()                     # tell loaders to bail out ASAP
        self.q_free.put(None, timeout=1)    # wake up loader blocking on get()

        try:
            while True:
                self.q_used.get_nowait()    # wake up loader blocking on put()
        except queue.Empty:
            pass

The first it does is wrap the received RawArrays in ndarrays using 'np.frombuffer()' and keep the new list, so they are usable as numpy arrays during the process' runtime and it doesn't have to wrap them over and over again.

Note also that MyDataProcessor only ever writes to the self.bail Event, it never checks it. Instead, if it needs to be told to quit, it will find a None mark on the queue instead of an array index. This is done for when a MyDataLoader has no more data available and starts the tear down procedure, MyDataProcessor can still process all valid arrays that are in the queue without prematurely exiting.

This is how MyDataLoader could look like:

class MyDataLoader(Process):
    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
        super().__init__()
        self.arrays = arrays
        self.q_free = q_free
        self.q_used = q_used
        self.bail = bail
        self.dtype = dtype
        self.step = step

    def run(self):
        # wrap RawArrays inside ndarrays
        arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]

        from debugio import input as reader

        for _ in range(10):  # for testing we end after a set amount of passes
            if self.bail.is_set():
                # we were asked to bail out while waiting on put()
                return

            arr_id = self.q_free.get()
            if arr_id is None:
                # we were asked to bail out while waiting on get()
                self.q_free.put(None, timeout=1)  # put it back for next loader
                return

            if self.bail.is_set():
                # we were asked to bail out while we got a normal array
                return

            arr = arrays[arr_id]

            eof = False
            print('<', end='', flush=True)          # just visualizing activity
            for j in range(0, len(arr), self.step):
                line = reader.readline()
                if not line:
                    eof = True
                    break

                arr[j] = np.fromstring(line, dtype=self.dtype, sep='\n')

            if eof:
                print('EOF>', end='', flush=True)   # just visualizing activity
                break

            print('>', end='', flush=True)          # just visualizing activity

            if self.bail.is_set():
                # we were asked to bail out while we filled the array
                return

            self.q_used.put(arr_id)     # tell processor an array is filled

        if not self.bail.is_set():
            self.bail.set()             # tell other loaders to bail out ASAP
            # mark end of data for processor as we are the first to bail out
            self.q_used.put(None)

It is very similar in structure to the other worker. The reason it is bloated up a bit is that it checks the self.bail Event at many points, so as to reduce the likelihood to get stuck. (It's not completely foolproof, as there is a tiny chance that the Event could get set between checking and accessing the Queue. If that's a problem, one needs to use some synchronization primitive arbitrating access to both the Event and the Queue combined.)

It also wraps the received RawArrays in ndarrays at the very beginning and reads data from an external black box (debugio.input in the example).

Note that by playing around with the step= arguments to both workers in the main() function, we can change the ratio of how much reading and writing is done (strictly for testing purposes - in a production environment step= would be 1, reading and writing all numpy array members).

Increasing both values makes the workers only access a few of the values in the numpy arrays, thereby significantly speeding everything up, which goes to show that the performance is not limited by the communication between the worker processes. Had we put numpy arrays directly onto the Queues, copying them forth and back between the processes in whole, increasing the step size would not have significantly improved the performance - it would have remained slow.

For reference, here is the debugio module I used for testing:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from ast import literal_eval
from io import RawIOBase, BufferedReader, BufferedWriter, TextIOWrapper


class DebugInput(RawIOBase):
    def __init__(self, end=None):
        if end is not None and end < 0:
            raise ValueError("end must be non-negative")

        super().__init__()
        self.pos = 0
        self.end = end

    def readable(self):
        return True

    def read(self, size=-1):
        if self.end is None:
            if size < 0:
                raise NotImplementedError("size must be non-negative")
            end = self.pos + size
        elif size < 0:
            end = self.end
        else:
            end = min(self.pos + size, self.end)

        lines = []
        while self.pos < end:
            offset = self.pos % 400
            pos = self.pos - offset
            if offset < 18:
                i = (offset + 2) // 2
                pos += i * 2 - 2
            elif offset < 288:
                i = (offset + 12) // 3
                pos += i * 3 - 12
            else:
                i = (offset + 112) // 4
                pos += i * 4 - 112

            line = str(i).encode('ascii') + b'\n'
            line = line[self.pos - pos:end - pos]
            self.pos += len(line)
            size -= len(line)
            lines.append(line)

        return b''.join(lines)

    def readinto(self, b):
        data = self.read(len(b))
        b[:len(data)] = data
        return len(data)

    def seekable(self):
        return True

    def seek(self, offset, whence=0):
        if whence == 0:
            pos = offset
        elif whence == 1:
            pos = self.pos + offset
        elif whence == 2:
            if self.end is None:
                raise ValueError("cannot seek to end of infinite stream")
            pos = self.end + offset
        else:
            raise NotImplementedError("unknown whence value")

        self.pos = max((pos if self.end is None else min(pos, self.end)), 0)
        return self.pos


class DebugOutput(RawIOBase):
    def __init__(self):
        super().__init__()
        self.buf = b''
        self.num = 1

    def writable(self):
        return True

    def write(self, b):
        *lines, self.buf = (self.buf + b).split(b'\n')

        for line in lines:
            value = literal_eval(line.decode('ascii'))
            if value != int(value) or int(value) & 255 != self.num:
                raise ValueError("expected {}, got {}".format(self.num, value))

            self.num = self.num % 127 + 1

        return len(b)


input = TextIOWrapper(BufferedReader(DebugInput()), encoding='ascii')
output = TextIOWrapper(BufferedWriter(DebugOutput()), encoding='ascii')

这篇关于在python中的进程之间共享连续的numpy数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆