锁定dask.multiprocessing.get并将元数据添加到HDF [英] Locking in dask.multiprocessing.get and adding metadata to HDF

查看:91
本文介绍了锁定dask.multiprocessing.get并将元数据添加到HDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在纯Python中执行ETL任务,我想为所考虑的每个原始输入文件收集错误度量以及元数据(错误度量是根据文件数据部分中提供的错误代码计算出来的,而元数据是存储在标头中).这是整个过程的伪代码:

Performing an ETL-task in pure Python, I would like to collect error metrics as well as metadata for each of the raw input files considered (error metrics are computed from error codes provided in the data section of the files while metadata is stored in headers). Here's pseudo-code for the whole procedure:

import pandas as pd
import dask
from dask import delayed
from dask import dataframe as dd

META_DATA = {}  # shared resource
ERRORS = {}  # shared resource

def read_file(file_name):
    global META_DATA, ERRORS

    # step 1: process headers
    headers = read_header(file_name)
    errors = {}
    data_bfr = []

    # step 2: process data section
    for line in data_section:
        content_id, data = parse_line(line)
        if contains_errors(data):
            errors[content_id] = get_error_code(data)
        else:
            data_bfr.append(content_id, data)

    # ---- Part relevant for question 1 ----
    # step 3: acquire lock for shared resource and write metadata
    with lock.acquire():
        write_metadata(file_name, headers)  # stores metadata in META_DATA[file_name]
        write_errors(file_name, errors)  # stores error metrics in ERRORS[file_name]

    return pd.DataFrame(data=data_bfr,...)

with set_options(get=dask.multiprocessing.get):
    df = dd.from_delayed([delayed(read_file)(file_name) \
                          for file_name in os.listdir(wd)])

    # ---- Part relevant for question 2 ----
    df.to_hdf('data.hdf', '/data', 'w', complevel=9, \
        complib='blosc',..., metadata=(META_DATA, ERRORS))

对于每个输入文件,read_file返回一个pd.DataFrame,进一步将相关的元数据和错误度量写入共享资源.我正在使用dask的多处理调度程序从延迟的read_file操作列表中计算dask.dataframe.

For each input file read_file returns a pd.DataFrame, further writing relevant metadata and error metrics to shared resources. I am using dask's multiprocessing scheduler to compute a dask.dataframe from a list of delayed read_file-operations.

  • 问题1 :每个read_file操作都写入共享资源META_DATAERRORS.要实现适用于dask.multiprocessing.get的适当锁定策略,我该怎么做?从with locket.lock_file('.lock'):上下文内将元数据和错误信息写入集合是否足够? multiprocessing.RLock是否有效?我必须怎么做才能初始化与dask一起使用的锁?从根本上讲,如何在dask中将META_DATAERRORS声明为共享资源?
  • 问题2 :如果可能,我想用元数据和错误指标注释HDF数据.从有关从dask数据框提供者收集属性"的问题中,我了解到dask当前不支持将元数据添加到数据帧,但是是否可以将信息写入HDF?如果是这样,在这种情况下如何处理对共享资源的访问?
  • Question 1: Each of the read_file-operations writes to the shared resources META_DATA and ERRORS. What do I have to do to implement a proper locking strategy that works with dask.multiprocessing.get? Would it be sufficient to write the metadata and error information to the collections from within a with locket.lock_file('.lock'):-context? Does multiprocessing.RLock work? What do I have to do to initialize the lock to work with dask? More fundamentally, how can I declare META_DATA and ERRORS as shared resources in dask?
  • Question 2: If possible, I would like to annotate HDF-data with metadata and error metrics. From a question on "Collecting attributes from dask dataframe providers", I learned that dask currently does not support adding metadata to dataframes, but is it possible for the information be written to HDF? If so, how to handle the access to the shared resources in this case?

推荐答案

不依赖于全局变量

Dask与纯函数一起使用效果最佳.

Don't depend on Globals

Dask works best with pure functions.

尤其是您的情况是Python的局限性,因为它(正确地)不会在进程之间共享全局数据.相反,我建议您从函数中显式返回数据:

In particular, your case is a limitation in Python, in that it (correctly) doesn't share global data between processes. Instead, I recommend that you return data explicitly from functions:

def read_file(file_name):
    ...
    return df, metadata, errors

values = [delayed(read_file)(fn) for fn in filenames]
dfs      = [v[0] for v in values]
metadata = [v[1] for v in values]
errors   = [v[2] for v in values]

df = dd.from_delayed(dfs)

import toolz
metadata = delayed(toolz.merge)(metadata)
errors = delayed(toolz.merge)(errors)

这篇关于锁定dask.multiprocessing.get并将元数据添加到HDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆