从Python增量写入拼图数据集 [英] Incrementally writing Parquet dataset from Python

查看:24
本文介绍了从Python增量写入拼图数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从我的Python应用程序中写出一个比RAM更大的数据--基本上是将数据从SQLAlChemy转储到Parque。我的解决方案的灵感来自this question。尽管增加了the batch size as hinted here,但我面临的问题是:

  • 内存使用量急剧增加

  • 编写器在一段时间后开始减速(写入吞吐量速度下降5倍以上)

我的假设是,这是因为ParquetWriter元数据管理在行数增加时变得昂贵。我认为我应该切换到datasets,这将允许编写器在处理过程中关闭文件刷新出元数据。

我的问题是

  • 有没有用PYTHON和PARQUET编写增量数据集的例子

  • 我的假设是正确还是不正确,使用数据集将有助于保持编写器吞吐量?

我的摘要代码:


writer = pq.ParquetWriter(
                    fname,
                    Candle.to_pyarrow_schema(small_candles),
                    compression='snappy',
                    allow_truncated_timestamps=True,
                    version='2.0',  # Highest available schema
                    data_page_version='2.0',  # Highest available schema
            ) as writer:

    def writeout():
        nonlocal data
        duration = time.time() - stats["started"]
        throughout = stats["candles_processed"] / duration
        logger.info("Writing Parquet table for candle %s, throughput is %s", "{:,}".format(stats["candles_processed"]), throughout)
        writer.write_table(
            pa.Table.from_pydict(
                    data,
                    writer.schema
            )
        )
        data = dict.fromkeys(data.keys(), [])
        process = psutil.Process(os.getpid())
        logger.info("Flushed %s writer, the memory usage is %s", bucket, process.memory_info())

    # Use massive yield_per() or otherwise we are leaking memory
    for item in query.yield_per(100_000):
        frame = construct_frame(row_type, item)
        for key, value in frame.items():
            data[key].append(value)

        stats["candles_processed"] += 1

        # Do regular checkopoints to avoid out of memory
        # and to log the progress to the console
        # For fine tuning Parquet writer see
        # https://issues.apache.org/jira/browse/ARROW-10052
        if stats["candles_processed"] % 100_000 == 0:
            writeout()

推荐答案

本例中,如@0x26res所述,原因是错误地使用了PythonList和Dicts作为工作缓冲区。

确保正确清除列表词典后,内存消耗问题可以忽略不计。

这篇关于从Python增量写入拼图数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆