读取Spark中的字节列 [英] Read a bytes column in spark

查看:93
本文介绍了读取Spark中的字节列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据集,其中包含以未知(且不友好)编码的ID字段.我可以使用普通的python读取单列,并验证值在多个数据集之间是不同的和一致的(即,它可以用作连接的主键).

I have a data set which contains an ID field that is in an unknown (and not friendly) encoding. I can read the single column using plain python and verify that the values are distinct and consistent across multiple data sets (i.e. it can be used as a primary key for joining).

使用spark.read.csv加载文件时,似乎spark正在将列转换为utf-8.但是,某些多字节序列会转换为Unicode字符U+FFFD REPLACEMENT CHARACTER.(十六进制为EF BF BD).

When loading the file using spark.read.csv, it seems that spark is converting the column to utf-8. However, some of the multibyte sequences are converted to the Unicode character U+FFFD REPLACEMENT CHARACTER. (EF BF BD in hex).

是否有一种方法可以强制Spark以字节而不是字符串的形式读取列?

Is there a way to force Spark to read the column as bytes and not as a string?

以下是一些可用于重新创建我的问题的代码(让列a为ID字段):

Here is some code that can be used to recreate my issue (let column a be the ID field):

使用示例数据创建文件

data = [
    (bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb0'), '1', 'a'),
    (bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb1'), '2', 'b'),
    (bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb2'), '3', 'c')
]

with open('sample.csv', 'wb') as f:
    header = ["a", "b", "c"]
    f.write(",".join(header)+"\n")
    for d in data:
        f.write(",".join(d) + "\n")

使用熊猫阅读

import pandas as pd
df = pd.read_csv("sample.csv", converters={"a": lambda x: x.encode('hex')})
print(df)
#                  a  b  c
#0  baed858e91d4c7b0  1  a
#1  baed858e91d4c7b1  2  b
#2  baed858e91d4c7b2  3  c

尝试使用Spark读取同一文件

spark_df = spark.read.csv("sample.csv", header=True)
spark_df.show()
#+-----+---+---+
#|a    |b  |c  |
#+-----+---+---+
#|�텎��ǰ|1  |a  |
#|�텎��DZ|2  |b  |
#|�텎��Dz|3  |c  |
#+-----+---+---+

赞!好的,那么转换为hex怎么样?

Yikes! OK, so how about converting to hex?

import pyspark.sql.functions as f
spark_df.withColumn("a", f.hex("a")).show(truncate=False)
#+----------------------------+---+---+
#|a                           |b  |c  |
#+----------------------------+---+---+
#|EFBFBDED858EEFBFBDEFBFBDC7B0|1  |a  |
#|EFBFBDED858EEFBFBDEFBFBDC7B1|2  |b  |
#|EFBFBDED858EEFBFBDEFBFBDC7B2|3  |c  |
#+----------------------------+---+---+

(在此示例中,值是不同的,但是在我的较大文件中不是这样的)

(In this example the values are distinct, but that's not true in my larger file)

如您所见,值是 close ,但是某些字节已被EFBFBD

As you can see, the values are close, but some of the bytes have been replaced by EFBFBD

有什么方法可以在Spark中读取文件(也许使用rdd吗?),以便我的输出看起来像熊猫版本:

Is there any way to read the file in Spark (maybe using rdd?) so that my output looks like the pandas version:

#+----------------+---+---+
#|a               |b  |c  |
#+----------------+---+---+
#|baed858e91d4c7b0|1  |a  |
#|baed858e91d4c7b1|2  |b  |
#|baed858e91d4c7b2|3  |c  |
#+----------------+---+---+

我尝试过强制转换为byte并指定架构,以使此列为ByteType(),但这没用.

I've tried casting to byte and specifying the schema so that this column is ByteType() but that didn't work.

修改

我正在使用Spark v 2.1.

I am using Spark v 2.1.

推荐答案

如何将其存储为base 64编码并在读取时进行解码?

How about storing it as base 64 encoding and decode it when reading it?

import base64

data = [
    (base64.b64encode(bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb0')), '1', 'a'),
    (base64.b64encode(bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb1')), '2', 'b'),
    (base64.b64encode(bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb2')), '3', 'c')
]

with open('sample.csv', 'wb') as f:
    header = ["a", "b", "c"]
    f.write(",".join(header)+"\n")
    for d in data:
        f.write(",".join(d) + "\n")

阅读

import pyspark.sql.functions as f
import base64

spark_df.withColumn("a", base64.b64decode("a"))

这篇关于读取Spark中的字节列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆