阅读分发的制表符分隔的CSV [英] Read a distributed Tab delimited CSV

查看:126
本文介绍了阅读分发的制表符分隔的CSV的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

受此启发问题,我写了一些代码来存储一个RDD(从Parquet文件读取),其中一个Schema(photo_id,data)成对,由制表符分隔,并且就像一个详细信息base 64对它进行编码,如下所示:

  def do_pipeline(itr):
...
item_id = x.photo_id
$ b $ def toTabCSVLine(data):
return'\t'.join(str(d)for d in data)

serialize_vec_b64pkl = lambda x:( (数据):
返回toTabCSVLine(serialize_vec_b64pkl(data))$ b $
dataset = sqlContext.read.parquet('mydir')
lines = dataset.map(format)
lines.saveAsTextFile('outdir')

现在,关注点: 如何读取该数据集 并打印它的反序列化数据?

我正在使用Python 2.6.6。






我的尝试在这里,只是为了验证一切都可以我写了这段代码:

  deserialize_vec_b64pkl = lambda x:(x [0],cPickle.loads(base64.b64decode (x [1])))

ase64_dataset = sc.textFile('outdir')
collect_base64_dataset = base64_dataset.collect()
print(deserialize_vec_b64pkl(collected_base64_dataset [0])。 split('\t')))

调用 collect(),这对于测试是可以的,但在现实世界的情况下会很难...




编辑:



当我尝试zero323的建议时:

  foo =(base64_dataset.map(str.split).map(deserialize_vec_b64pkl) ).collect()

我得到这个错误,归结为这:

  PythonRDD [2]在RDRDD在PythonRDD.scala:43 
16/08/04 18:32 :30 WARN TaskSetManager:在阶段0.0中丢失的任务4.0(TID 4,gsta31695.tan.ygrid.yahoo.com):org.apache.spark.api.python.PythonException:Traceback(最近一次调用的最后一次):
文件/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/worker.py,第98行,在主
命令= pickleSer._read_with_length(infile )
文件/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py,line 164,in _read_with_length
return self .loads(obj)
文件/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py,第422行,载入
返回pickle.loads(obj)
UnpicklingError :NEWOBJ类参数具有NULL tp_new

在org.apache.spark.api.python.PythonRunner $$ anon $ 1.read(PythonRDD.scala:166)
在org.apache.spark .api.python.PythonRunner $$ anon $ 1.< init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
在org.apache。 spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:227)
at java.util.concurrent。 ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$ b $在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)$ b $在java.lang.Thread.run(Thread.java: 745)

16/08 / 04 18:32:30错误TaskSetManager:阶段0.0中的任务12失败4次;中止作业
16/08/04 18:32:31 WARN TaskSetManager:在阶段0.0中丢失任务14.3(TID 38,gsta31695.tan.ygrid.yahoo.com):TaskKilled(故意杀死)
16 / 08/04 18:32:31 WARN TaskSetManager:在阶段0.0(TID 39,gsta31695.tan.ygrid.yahoo.com)中丢失任务13.3:TaskKilled(故意杀死)
16/08/04 18:32 :31 WARN TaskSetManager:在阶段0.0(TID 42,gsta31695.tan.ygrid.yahoo.com)中丢失任务16.3:TaskKilled(故意杀死)
--------------- -------------------------------------------------- ----------
Py4JJavaError Traceback(最近一次调用最后一次)
/homes/gsamaras/code/read_and_print.py in< module>()
17 print( base64_dataset.map(str.split).map(deserialize_vec_b64pkl))
18
---> 19 foo =(base64_dataset.map(str.split).map(deserialize_vec_b64pkl))。collect()
20 print(foo)

/ home / gs / spark / current / python / lib / pyspark.zip / pyspark / rdd.py in collect(self)
769
770 with SCCallSiteSync(self.context)as css:
- > 771 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
772返回列表(_load_from_socket(port,self._jrdd_deserializer))
773

/ home /gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call __(self,* args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
- > 813答案,self.gateway_client,self.target_id,self.name)
814
temp_args中temp_arg的值为815:

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer,gateway_client,target_id,name)
306增加Py4JJavaError(
307调用{0} {1} {2} .\\\
时发生错误。
- > 308格式(target_id,。,name),值)
309 else:
310引发Py4JError(

Py4JJavaError:调用z:org.apache时发生错误。 spark.api.python.PythonRDD.collectAndServe。


解决方案

尝试一个简单的例子,为了方便起见,我将使用方便的 toolz

  import sys 
import base64

if sys.version_info<(3,):
将cPickle导入为pickle
else:
从toolz.functoolz导入pickle


导入组合

rdd = sc.parallelize([(1,{foo:bar}),(2,{bar:foo})])

现在,您的代码现在不是完全可移植的。 code>返回 str ,而在Python 3中它返回 tes 。让我们来说明一下:




  • Python 2

      type(base64.b64encode(pickle.dumps({foo:bar})))
    ## str


  • Python 3 b $ b

      type(base64.b64encode(pickle.dumps({foo:bar})))
    ## bytes
    <



    $ b $ p $所以让我们将解码添加到流水线中:

     #相当于
    #def pickle_and_b64(x):
    #return base64.b64encode(pickle.dumps(x))。decode (ascii)

    pickle_and_b64 =撰写(
    lambda x:x.decode(ascii),
    base64.b64encode,
    pickle.dumps

    $ / code>

    请注意,这不会假设任何特定形状的数据。因此,我们将使用 mapValues 来仅序列化键:

      serialized = rdd.mapValues(pickle_and_b64)
    serialized.first()
    ## 1,u'KGRwMApTJ2ZvbycKcDEKUydiYXInCnAyCnMu')

    现在我们可以按照简单格式进行操作并保存:

      from tempfile import mkdtemp 
    import os

    outdir = os.path.join(mkdtemp(),foo)

    serialized.map(lambda x:{0} \格式(* x))。saveAsTextFile(outdir)

    读取文件我们逆过程:

     #相当于
    #def b64_and_unpickle(x):
    #return pickle .loads(base64.b64decode(x))

    b64_and_unpickle = compose(
    pickle.loads,
    base64.b64decode


    解码=(sc.textFile(outdir)
    .map(lambda x:x.split(\ t))#在Python 3中,我们可以简单地使用str.split
    .mapValues(b64_and_unpickle) )

    de coded.first()
    ##(u'1',{'foo':'bar'})


    Inspired from this question, I wrote some code to store an RDD (which was read from a Parquet file), with a Schema of (photo_id, data), in pairs, delimited by tabs, and just as a detail base 64 encode it, like this:

    def do_pipeline(itr):
       ...
       item_id = x.photo_id
    
    def toTabCSVLine(data):
      return '\t'.join(str(d) for d in data)
    
    serialize_vec_b64pkl = lambda x: (x[0], base64.b64encode(cPickle.dumps(x[1])))
    
    def format(data):
        return toTabCSVLine(serialize_vec_b64pkl(data))
    
    dataset = sqlContext.read.parquet('mydir')
    lines = dataset.map(format)
    lines.saveAsTextFile('outdir')
    

    So now, the point of interest: How to read that dataset and print for example its deserialized data?

    I am using Python 2.6.6.


    My attempt lies here, where for just verifying that everything can be done, I wrote this code:

    deserialize_vec_b64pkl = lambda x: (x[0], cPickle.loads(base64.b64decode(x[1])))
    
    base64_dataset = sc.textFile('outdir')
    collected_base64_dataset = base64_dataset.collect()
    print(deserialize_vec_b64pkl(collected_base64_dataset[0].split('\t')))
    

    which calls collect(), which for testing is OK, but in a real-world scenario would struggle...


    Edit:

    When I tried zero323's suggestion:

    foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect()
    

    I got this error, which boils down to this:

    PythonRDD[2] at RDD at PythonRDD.scala:43
    16/08/04 18:32:30 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, gsta31695.tan.ygrid.yahoo.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/worker.py", line 98, in main
        command = pickleSer._read_with_length(infile)
      File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
        return self.loads(obj)
      File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 422, in loads
        return pickle.loads(obj)
    UnpicklingError: NEWOBJ class argument has NULL tp_new
    
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    
    16/08/04 18:32:30 ERROR TaskSetManager: Task 12 in stage 0.0 failed 4 times; aborting job
    16/08/04 18:32:31 WARN TaskSetManager: Lost task 14.3 in stage 0.0 (TID 38, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
    16/08/04 18:32:31 WARN TaskSetManager: Lost task 13.3 in stage 0.0 (TID 39, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
    16/08/04 18:32:31 WARN TaskSetManager: Lost task 16.3 in stage 0.0 (TID 42, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
    ---------------------------------------------------------------------------
    Py4JJavaError                             Traceback (most recent call last)
    /homes/gsamaras/code/read_and_print.py in <module>()
         17     print(base64_dataset.map(str.split).map(deserialize_vec_b64pkl))
         18 
    ---> 19     foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect()
         20     print(foo)
    
    /home/gs/spark/current/python/lib/pyspark.zip/pyspark/rdd.py in collect(self)
        769         """
        770         with SCCallSiteSync(self.context) as css:
    --> 771             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
        772         return list(_load_from_socket(port, self._jrdd_deserializer))
        773 
    
    /home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
        811         answer = self.gateway_client.send_command(command)
        812         return_value = get_return_value(
    --> 813             answer, self.gateway_client, self.target_id, self.name)
        814 
        815         for temp_arg in temp_args:
    
    /home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
        306                 raise Py4JJavaError(
        307                     "An error occurred while calling {0}{1}{2}.\n".
    --> 308                     format(target_id, ".", name), value)
        309             else:
        310                 raise Py4JError(
    
    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    

    解决方案

    Let's try a simple example. For convenience I'll be using handy toolz library but it is not really required here.

    import sys
    import base64
    
    if sys.version_info < (3, ):
        import cPickle as pickle
    else:
        import pickle
    
    
    from toolz.functoolz import compose
    
    rdd = sc.parallelize([(1, {"foo": "bar"}), (2, {"bar": "foo"})])
    

    Now, your code is not exactly portable right now. In Python 2 base64.b64encode returns str, while in Python 3 it returns bytes. Lets illustrate that:

    • Python 2

      type(base64.b64encode(pickle.dumps({"foo": "bar"})))
      ## str
      

    • Python 3

      type(base64.b64encode(pickle.dumps({"foo": "bar"})))
      ## bytes
      

    So lets add decoding to the pipeline:

    # Equivalent to 
    # def pickle_and_b64(x):
    #     return base64.b64encode(pickle.dumps(x)).decode("ascii")
    
    pickle_and_b64 = compose(
        lambda x: x.decode("ascii"),
        base64.b64encode,
        pickle.dumps
    )
    

    Please note that this doesn't assume any particular shape of the data. Because of that, we'll use mapValues to serialize only keys:

    serialized = rdd.mapValues(pickle_and_b64)
    serialized.first()
    ## 1, u'KGRwMApTJ2ZvbycKcDEKUydiYXInCnAyCnMu')
    

    Now we can follow it with simple format and save:

    from tempfile import mkdtemp
    import os
    
    outdir = os.path.join(mkdtemp(), "foo")
    
    serialized.map(lambda x: "{0}\t{1}".format(*x)).saveAsTextFile(outdir)
    

    To read the file we reverse the process:

    # Equivalent to
    # def  b64_and_unpickle(x):
    #     return pickle.loads(base64.b64decode(x))
    
    b64_and_unpickle = compose(
        pickle.loads,
        base64.b64decode
    )
    
    decoded = (sc.textFile(outdir)
        .map(lambda x: x.split("\t"))  # In Python 3 we could simply use str.split
        .mapValues(b64_and_unpickle))
    
    decoded.first()
    ## (u'1', {'foo': 'bar'})
    

    这篇关于阅读分发的制表符分隔的CSV的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆