分块写入XArray MultiIndex数据 [英] Writing xarray multiindex data in chunks

查看:91
本文介绍了分块写入XArray MultiIndex数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试有效地重组大型多维数据集.假设随着时间的推移,我有许多遥感图像,其中有多个带,坐标用于像素位置为x y,图像获取时间为时间,而采集的不同数据为带.

I am trying to efficiently restructure a large multidimentional dataset. Let assume I have a number of remotely sensed images over time with a number of bands with coordinates x y for pixel location, time for time of image acquisition, and band for different data collected.

在我的用例中,假设xarray的坐标长度大致为x(3000),y(3000),时间(10)和浮点数据的带(40).所以100gb +的数据.

In my use case lets assume the xarray coord lengths are roughly x (3000), y (3000), time (10), with bands (40) of floating point data. So 100gb+ of data.

我一直在尝试使用此示例,但是我很难将其翻译成这种情况.

I have been trying to work from this example but I am having trouble translating it to this case.

注意:实际数据比此示例大得多.

NOTE: the actual data is much larger than this example.

import numpy as np
import dask.array as da
import xarray as xr

nrows = 100
ncols = 200
row_chunks = 50
col_chunks = 50

data = da.random.random(size=(1, nrows, ncols), chunks=(1, row_chunks, col_chunks))

def create_band(data, x, y, band_name):

    return xr.DataArray(data,
                        dims=('band', 'y', 'x'),
                        coords={'band': [band_name],
                                'y': y,
                                'x': x})

def create_coords(data, left, top, celly, cellx):
    nrows = data.shape[-2]
    ncols = data.shape[-1]
    right = left + cellx*ncols
    bottom = top - celly*nrows
    x = np.linspace(left, right, ncols) + cellx/2.0
    y = np.linspace(top, bottom, nrows) - celly/2.0
    
    return x, y

x, y = create_coords(data, 1000, 2000, 30, 30)

src = []

for time in ['t1', 't2', 't3']:

    src_t = xr.concat([create_band(data, x, y, band) for band in ['blue', 'green', 'red', 'nir']], dim='band')\
                    .expand_dims(dim='time')\
                    .assign_coords({'time': [time]})
    
    src.append(src_t)

src = xr.concat(src, dim='time')

print(src)


<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (time: 3, band: 4, y: 100, x: 200)>
dask.array<concatenate, shape=(3, 4, 100, 200), dtype=float64, chunksize=(1, 1, 50, 50), chunktype=numpy.ndarray>
Coordinates:
  * x        (x) float64 1.015e+03 1.045e+03 1.075e+03 ... 6.985e+03 7.015e+03
  * band     (band) object 'blue' 'green' 'red' 'nir'
  * y        (y) float64 1.985e+03 1.955e+03 1.924e+03 ... -984.7 -1.015e+03
  * time     (time) object 't1' 't2' 't3'

重组-堆叠并转置

我需要存储以下内容:

Restructured - stacked and transposed

I need to store the output of the following:

print(src.stack(sample=('y','x','time')).T)

<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (sample: 60000, band: 4)>
dask.array<transpose, shape=(60000, 4), dtype=float64, chunksize=(3600, 1), chunktype=numpy.ndarray>
Coordinates:
  * band     (band) object 'blue' 'green' 'red' 'nir'
  * sample   (sample) MultiIndex
  - y        (sample) float64 1.985e+03 1.985e+03 ... -1.015e+03 -1.015e+03
  - x        (sample) float64 1.015e+03 1.015e+03 ... 7.015e+03 7.015e+03
  - time     (sample) object 't1' 't2' 't3' 't1' 't2' ... 't3' 't1' 't2' 't3'

我希望使用dask和xarray将结果分块写入磁盘,以供

I am hoping to use dask and xarray to write the result to disk in chunks, accessible for open_mfdataset. parquet seems like a good option, but I can't figure out how to write it in chunks (src is too big to store in memory).

@dask.delayed
def stacker(data):
   return data.stack(sample=('y','x','time')).T.to_pandas() 

stacker(src).to_parquet('out_*.parquet')

def stack_write(data):
   data.stack(sample=('y','x','time')).T.to_pandas().to_parquet('out_*.parquet')
   return None

stack_write(src)

在这一点上,我只是希望一些好主意.谢谢!

At this point I am just hoping for some good ideas. Thanks!

推荐答案

我在这里有一个解决方案(

I have a solution here (https://github.com/pydata/xarray/issues/1077#issuecomment-644803374) for writing multiindexed datasets to file.

您必须手动编码"将数据集转换为可以写为netCDF的形式.然后解码"当你读回来的时候.

You'll have to manually "encode" the dataset into a form that can be written as netCDF. And then "decode" when you read it back.

import numpy as np
import pandas as pd
import xarray as xr


def encode_multiindex(ds, idxname):
    encoded = ds.reset_index(idxname)
    coords = dict(zip(ds.indexes[idxname].names, ds.indexes[idxname].levels))
    for coord in coords:
        encoded[coord] = coords[coord].values
    shape = [encoded.sizes[coord] for coord in coords]
    encoded[idxname] = np.ravel_multi_index(ds.indexes[idxname].codes, shape)
    encoded[idxname].attrs["compress"] = " ".join(ds.indexes[idxname].names)
    return encoded


def decode_to_multiindex(encoded, idxname):
    names = encoded[idxname].attrs["compress"].split(" ")
    shape = [encoded.sizes[dim] for dim in names]
    indices = np.unravel_index(encoded.landpoint.values, shape)
    arrays = [encoded[dim].values[index] for dim, index in zip(names, indices)]
    mindex = pd.MultiIndex.from_arrays(arrays)

    decoded = xr.Dataset({}, {idxname: mindex})
    for varname in encoded.data_vars:
        if idxname in encoded[varname].dims:
            decoded[varname] = (idxname, encoded[varname].values)
    return decoded

这篇关于分块写入XArray MultiIndex数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆