如何使用 SPARK 将多个镶木地板文件转换为 TFrecord 文件? [英] How to convert multiple parquet files into TFrecord files using SPARK?

查看:29
本文介绍了如何使用 SPARK 将多个镶木地板文件转换为 TFrecord 文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想根据特定条件从大型 DataFrame 生成分层的 TFrecord 文件,为此我使用 write.partitionBy().我也在 SPARK 中使用了 tensorflow-connector,但这显然不能与 write.partitionBy() 操作一起使用.因此,除了尝试分两步工作之外,我还没有找到其他方法:

I would like to produce stratified TFrecord files from a large DataFrame based on a certain condition, for which I use write.partitionBy(). I'm also using the tensorflow-connector in SPARK, but this apparently does not work together with a write.partitionBy() operation. Therefore, I have not found another way than to try to work in two steps:

  1. 根据我的情况,使用 partitionBy() 重新分区数据帧,并将生成的分区写入 parquet 文件.
  2. 读取这些镶木地板文件,使用 tensorflow-connector 插件将它们转换为 TFrecord 文件.
  1. Repartion the dataframe according to my condition, using partitionBy() and write the resulting partitions to parquet files.
  2. Read those parquet files to convert them into TFrecord files with the tensorflow-connector plugin.

这是我无法有效完成的第二步.我的想法是读入执行器上的单个镶木地板文件并立即将它们写入 TFrecord 文件.但这需要访问只能在驱动程序中完成的 SQLContext (此处讨论) 所以不是并行的.我想做这样的事情:

It is the second step that I'm unable to do efficiently. My idea was to read in the individual parquet files on the executors and immediately write them into TFrecord files. But this needs access to the SQLContext which can only be done in the Driver (discussed here) so not in parallel. I would like to do something like this:

# List all parquet files to be converted
import glob, os
files = glob.glob('/path/*.parquet'))

sc = SparkSession.builder.getOrCreate()
sc.parallelize(files, 2).foreach(lambda parquetFile: convert_parquet_to_tfrecord(parquetFile))

我可以构造函数 convert_parquet_to_tfrecord 来在执行器上执行此操作吗?

Could I construct the function convert_parquet_to_tfrecord that would be able to do this on the executors?

我还尝试在读取所有镶木地板文件时仅使用通配符:

I've also tried just using the wildcard when reading all the parquet files:

SQLContext(sc).read.parquet('/path/*.parquet')

这确实读取了所有镶木地板文件,但不幸的是没有读取到单个分区.看起来原始结构丢失了,所以如果我想要将单个镶木地板文件的确切内容转换为 TFrecord 文件,它对我没有帮助.

This indeed reads all parquet files, but unfortunately not into individual partitions. It appears that the original structure gets lost, so it doesn't help me if I want the exact contents of the individual parquet files converted into TFrecord files.

还有其他建议吗?

推荐答案

试试 spark-tfrecord.

Try spark-tfrecord.

Spark-TFRecord 是一个类似于 spark-tensorflow-connector 的工具,但它是 partitionBy.以下示例显示了如何对数据集进行分区.

Spark-TFRecord is a tool similar to spark-tensorflow-connector but it does partitionBy. The following example shows how to partition a dataset.

import org.apache.spark.sql.SaveMode

// create a dataframe

val df = Seq((8, "bat"),(8, "abc"), (1, "xyz"), (2, "aaa")).toDF("number", "word")
val tf_output_dir = "/tmp/tfrecord-test"

// dump the tfrecords to files.
df.repartition(3, col("number")).write.mode(SaveMode.Overwrite).partitionBy("number").format("tfrecord").option("recordType", "Example").save(tf_output_dir)

更多信息可以在Github 仓库:https://github.com/linkedin/spark-tfrecord

More information can be found at Github repo: https://github.com/linkedin/spark-tfrecord

这篇关于如何使用 SPARK 将多个镶木地板文件转换为 TFrecord 文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆