如何使用 saveAsHadoopFile 或 MultiTextOutputFormat 保存数据框 [英] How to save data frame using saveAsHadoopFile or MultiTextOutputFormat

查看:38
本文介绍了如何使用 saveAsHadoopFile 或 MultiTextOutputFormat 保存数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基本上我需要根据 DataPartition 列创建输出文件.数据框中的最后一列

Basically i need to create output file based on the DataPartition column.Last column in the data frame

所以第一行和最后一行将保存在Fundamental.Fundamental.Fundamental.Japan.1.2018-09-24-0937.Full.txt中间一行将保存在Fundamental.Fundamental.Fundamental.ThirdParty.1.2018-09-24-0937.Full.txt

So First row and last row will be saved in Fundamental.Fundamental.Fundamental.Japan.1.2018-09-24-0937.Full.txt and middle row will be saved in Fundamental.Fundamental.Fundamental.ThirdParty.1.2018-09-24-0937.Full.txt

+--------------------------------+--------------+---------------------------+-------------------------+--------+------------------+--------+-----------------+---------------+--------------------------+---------------------------+---------------+-------------------------+-----------------------------+-----------------------------------+-----------------------+----------------------------+----------------------------------+--------------------+----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+
|Fundamental_uniqueFundamentalSet|OrganizationId|OrganizationId_objectTypeId|OrganizationId_objectType|GaapCode|ConsolidationBasis|IsFiling|NonFilingDateTime|NonFilingReason|PrimaryReportingEntityCode|TotalPrimaryReportingShares|LocalLanguageId|Fundamental_effectiveFrom|Fundamental_effectiveFromPlus|Fundamental_effectiveFromPlusNaCode|Fundamental_effectiveTo|Fundamental_effectiveToMinus|Fundamental_effectiveToMinusNACode|ConsolidationBasisId|GaapCodeId|FFAction|!||DataPartition                                                                                                                                   |
+--------------------------------+--------------+---------------------------+-------------------------+--------+------------------+--------+-----------------+---------------+--------------------------+---------------------------+---------------+-------------------------+-----------------------------+-----------------------------------+-----------------------+----------------------------+----------------------------------+--------------------+----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+
|192730241374                    |4295877894    |404010                     |Organization             |JPG     |Consolidated      |true    |                 |               |A51EF                     |117588807.00000            |505126         |2013-06-29T00:55:15Z     |                             |                                   |9999-12-31T00:00:00Z   |                            |                                  |3013598             |3011577   |I|!|       |file:/C:/Users/u6034690/Desktop/SPARK/trfsmallfffile/Fundamental/FINALSPARK/Fundamental.Fundamental.Fundamental.Japan.1.2018-09-24-0937.Full.txt|
|192730391384                    |4295877894    |404010                     |Organization             |AOG     |Consolidated      |true    |                 |               |A51EF                     |117588807.00000            |505126         |2018-09-19T09:51:46Z     |                             |                                   |9999-12-31T00:00:00Z   |                            |                                  |3013598             |1003042842|I|!|       |file:/C:/Users/u6034690/Desktop/SPARK/trfsmallfffile/Fundamental/FINALSPARK/Fundamental.Fundamental.Fundamental.ThirdParty.1.2018-09-24-0937.Full.txt|
|192730241373                    |4295877894    |404010                     |Organization             |JPG     |Parent            |true    |                 |               |A51EF                     |117588807.00000            |505126         |2013-06-29T00:55:15Z     |                             |                                   |9999-12-31T00:00:00Z   |                            |                                  |3013599             |3011577   |I|!|       |file:/C:/Users/u6034690/Desktop/SPARK/trfsmallfffile/Fundamental/FINALSPARK/Fundamental.Fundamental.Fundamental.Japan.1.2018-09-24-0937.Full.txt|
+--------------------------------+--------------+---------------------------+-------------------------+--------+------------------+--------+-----------------+---------------+--------------------------+---------------------------+---------------+-------------------------+-----------------------------+-----------------------------------+-----------------------+----------------------------+----------------------------------+--------------------+----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+

我正在寻找的这类东西不起作用.

import org.apache.hadoop.io.NullWritable
import org.apache.spark.HashPartitioner
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}

dataframe.partitionBy(new HashPartitioner(noOfHashPartitioner)).saveAsHadoopFile(output, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat], classOf[GzipCodec])

预期输出.

Fundamental.uniqueFundamentalSet|^|OrganizationId|^|OrganizationId.objectTypeId|^|OrganizationId.objectType|^|GaapCode|^|ConsolidationBasis|^|IsFiling|^|NonFilingDateTime|^|NonFilingReason|^|PrimaryReportingEntityCode|^|TotalPrimaryReportingShares|^|LocalLanguageId|^|Fundamental.effectiveFrom|^|Fundamental.effectiveFromPlus|^|Fundamental.effectiveFromPlusNaCode|^|Fundamental.effectiveTo|^|Fundamental.effectiveToMinus|^|Fundamental.effectiveToMinusNACode|^|ConsolidationBasisId|^|GaapCodeId|^|FFAction|!|
192730241373|^|4295877894|^|404010|^|Organization|^|JPG|^|Parent|^|True|^||^||^|A51EF|^|117588807.00000|^|505126|^|2013-06-29T00:55:15Z|^||^||^|9999-12-31T00:00:00Z|^||^||^|3013599|^|3011577|^|I|!|
192730241374|^|4295877894|^|404010|^|Organization|^|JPG|^|Consolidated|^|True|^||^||^|A51EF|^|117588807.00000|^|505126|^|2013-06-29T00:55:15Z|^||^||^|9999-12-31T00:00:00Z|^||^||^|3013598|^|3011577|^|I|!|

推荐答案

你需要创建一个 PairedRDD,键是你的输出文件名,值是记录,然后你可以调用saveAsHadoopFile() 以您想要的方式保存文件.

You need to create a PairedRDD with the key being your output file name and value being the record and then you can callsaveAsHadoopFile() to save the files the way that you are looking for.

import org.json.JSONObject

val dataframe = .... //this is the dataframe that you want to save

val pairedRDD = dataframe.toJSON.rdd.map(row => {
    val record = new JSONObject(row)
    val key = record.getString("DataPartition")

    (key, row)
})

pairedRDD.partitionBy(new HashPartitioner(noOfHashPartitioner))
    .saveAsHadoopFile("", classOf[String], classOf[String], classOf[RddMultiTextOutputFormat])

这将为您提供所需的输出.

This will give you, your desired output.

这篇关于如何使用 saveAsHadoopFile 或 MultiTextOutputFormat 保存数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆