如何使用SPARK将多个实木复合地板文件转换为TFrecord文件? [英] How to convert multiple parquet files into TFrecord files using SPARK?

查看:103
本文介绍了如何使用SPARK将多个实木复合地板文件转换为TFrecord文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想根据特定条件从大型DataFrame生成分层的TFrecord文件,为此我使用write.partitionBy().我也在SPARK中使用了tensorflow-connector,但这显然不能与write.partitionBy()操作一起使用.因此,除了尝试分两个步骤工作外,我没有找到其他方法:

I would like to produce stratified TFrecord files from a large DataFrame based on a certain condition, for which I use write.partitionBy(). I'm also using the tensorflow-connector in SPARK, but this apparently does not work together with a write.partitionBy() operation. Therefore, I have not found another way than to try to work in two steps:

  1. 使用partitionBy()根据我的条件对数据帧进行分区,然后将生成的分区写入镶木地板文件中.
  2. 使用tensorflow-connector插件读取这些实木复合地板文件以将其转换为TFrecord文件.
  1. Repartion the dataframe according to my condition, using partitionBy() and write the resulting partitions to parquet files.
  2. Read those parquet files to convert them into TFrecord files with the tensorflow-connector plugin.

这是我无法有效执行的第二步.我的想法是读取执行器上的各个拼花地板文件,然后立即将它们写入TFrecord文件中.但这需要访问SQLContext,这只能在驱动程序中完成(

It is the second step that I'm unable to do efficiently. My idea was to read in the individual parquet files on the executors and immediately write them into TFrecord files. But this needs access to the SQLContext which can only be done in the Driver (discussed here) so not in parallel. I would like to do something like this:

# List all parquet files to be converted
import glob, os
files = glob.glob('/path/*.parquet'))

sc = SparkSession.builder.getOrCreate()
sc.parallelize(files, 2).foreach(lambda parquetFile: convert_parquet_to_tfrecord(parquetFile))

我可以构造能够在执行程序上执行此功能的函数convert_parquet_to_tfrecord吗?

Could I construct the function convert_parquet_to_tfrecord that would be able to do this on the executors?

我还尝试在读取所有实木复合地板文件时仅使用通配符:

I've also tried just using the wildcard when reading all the parquet files:

SQLContext(sc).read.parquet('/path/*.parquet')

这确实读取了所有实木复合地板文件,但不幸的是没有读取到单个分区中.似乎原始结构丢失了,所以如果我想将单个实木复合地板文件的确切内容转换为TFrecord文件,这对我没有帮助.

This indeed reads all parquet files, but unfortunately not into individual partitions. It appears that the original structure gets lost, so it doesn't help me if I want the exact contents of the individual parquet files converted into TFrecord files.

还有其他建议吗?

推荐答案

尝试spark-tfrecord.

Try spark-tfrecord.

Spark-TFRecord是一个类似于spark-tensorflow-connector的工具,但是它具有partitionBy.以下示例显示了如何对数据集进行分区.

Spark-TFRecord is a tool similar to spark-tensorflow-connector but it does partitionBy. The following example shows how to partition a dataset.

import org.apache.spark.sql.SaveMode

// create a dataframe

val df = Seq((8, "bat"),(8, "abc"), (1, "xyz"), (2, "aaa")).toDF("number", "word")
val tf_output_dir = "/tmp/tfrecord-test"

// dump the tfrecords to files.
df.repartition(3, col("number")).write.mode(SaveMode.Overwrite).partitionBy("number").format("tfrecord").option("recordType", "Example").save(tf_output_dir)

更多信息,请参见 Github仓库: https://github.com/linkedin/spark-tfrecord

More information can be found at Github repo: https://github.com/linkedin/spark-tfrecord

这篇关于如何使用SPARK将多个实木复合地板文件转换为TFrecord文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆