PySpark + Google云存储(wholeTextFiles) [英] PySpark + Google Cloud Storage (wholeTextFiles)

查看:151
本文介绍了PySpark + Google云存储(wholeTextFiles)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用PySpark(Google Dataproc)解析大约一百万个HTML文件,并将相关字段写到压缩文件中.每个HTML文件约为200KB.因此,所有数据约为200GB.

I am trying to parse about 1 million HTML files using PySpark (Google Dataproc) and write the relevant fields out to a condensed file. Each HTML file is about 200KB. Hence, all the data is about 200GB.

如果我使用数据的子集,则下面的代码可以正常工作,但是运行了几个小时,然后在整个数据集上运行时崩溃.此外,没有使用工作节点(<5%的CPU),因此我知道存在一些问题.

The code below works fine if I use a subset of the data, but runs for hours and then crashes when running on the whole dataset. Furthermore, the worker nodes are not utilized (<5% CPU) so I know there is some issue.

我认为系统在从GCS提取数据方面令人cho目结舌.有一个更好的方法吗?另外,当我以这种方式使用WholeTextFiles时,主服务器会尝试下载所有文件,然后将其发送给执行者,还是让执行者下载它们?

I believe the system is choking on ingesting the data from GCS. Is there a better way to do this? Also, when I use wholeTextFiles in this fashion, does the master attempt to download all the files and then send them to the executors, or does it let the executors download them?

def my_func(keyval):
   keyval = (file_name, file_str)
   return parser(file_str).__dict__

data = sc.wholeTextFiles("gs://data/*")
output = data.map(my_func)
output.saveAsTextFile("gs://results/a")

推荐答案

要回答您的问题,主服务器不会读取所有包含的数据,但是它将在开始工作之前获取所有输入文件的状态.默认情况下,Dataproc会将属性"mapreduce.input.fileinputformat.list-status.num-threads"设置为20,以帮助缩短查找时间,但仍在GCS中对每个文件执行RPC.

To answer your question the master won't read all of the contained data, but it will fetch status for all input files before beginning work. Dataproc sets the property "mapreduce.input.fileinputformat.list-status.num-threads" to 20 by default to help improve the time of this lookup, but an RPC is still performed per file in GCS.

似乎您发现了一种情况,即使添加线程也无济于事,只是将驱动程序引导到OOM更快.

It seems you've found a case where even adding threads isn't helping very much and is just leading the driver to OOM faster.

关于如何使阅读并行化,我有两个想法.

Expanding on how to parallelize the read, I have two ideas.

但是首先,要警告一点:这两种解决方案都不能对包含在glob中的目录非常健壮.您可能要防止出现在要读取的文件列表中的目录.

But first, a bit of a warning: neither of these solutions as they are are very robust to directories being included in the glob. You will probably want to guard against directories appearing in the list of files to read.

首先使用python和hadoop命令行工具完成此操作(也可以使用gsutil完成).下面是一个示例,它看起来如何并在工作程序上执行文件列表,将文件内容成对读取,最后计算成对的(文件名,文件长度):

The first is done with python and the hadoop command line tools (this could also be done with gsutil). The below is an example of how it might look and performs a file listing on workers, reads file content into pairs and finally computes pairs of (file name, file length):

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext

import sys
import subprocess


def hadoop_ls(file_glob):
  lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n")
  files = [line.split()[7] for line in lines if len(line) > 0]
  return files

def hadoop_cat(file):
  return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8")

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  # This is just for testing. You'll want to generate a list 
  # of prefix globs instead of having a list passed in from the 
  # command line.
  globs = sys.argv[1:]
  # Desired listing partition count
  lpc = 100
  # Desired 'cat' partition count, should be less than total number of files
  cpc = 1000
  files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls)
  files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)])
  files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])])
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))

首先,我将从这个子流程解决方案开始,对hadoop_ls和hadoop_cat调用进行分区,看看是否可以得到可以接受的东西.

I would first start with this subprocess solution and play with the partitioning of hadoop_ls and hadoop_cat calls and see if you can get something that is acceptable.

第二种解决方案更加复杂,但是通过避免执行许多次exec调用,可能会产生一个性能更高的管道.

The second solution is more complicated, but will probably yield a pipeline that is more performant by avoiding many, many exec calls.

在第二个解决方案中,我们将编译一个特殊用途的帮助器jar,使用初始化操作将该jar复制到所有工作程序中,最后使用驱动程序中的帮助器.

In this second solution, we'll be compiling a special purpose helper jar, using an initialization action to copy that jar to all workers and finally making use of the helper from our driver.

我们的scala jar项目的最终目录结构如下所示:

The final directory structure of our the scala jar project will look something like this:

helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala
helper/build.sbt

在我们的PysparkHelper.scala文件中,我们将有一个小的scala类,其功能与上述纯python解决方案一样.首先,我们将创建文件组的RDD,然后创建文件名的RDD,最后创建文件名和文件内容对的RDD.

In our PysparkHelper.scala file we will have a small scala class that functions much as our pure python solution above does. First we will create an RDD of file globs, then an RDD of file names and finally an RDD of file name and file content pairs.

package com.google.cloud.dataproc.support

import collection.JavaConversions._

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext}

import java.util.ArrayList
import java.nio.charset.StandardCharsets

class PysparkHelper extends Serializable {
  def wholeTextFiles(
    context: JavaSparkContext,
    paths: ArrayList[String],
    partitions: Int): JavaPairRDD[String, String] = {

    val globRDD = context.sc.parallelize(paths).repartition(partitions)
    // map globs to file names:
    val filenameRDD = globRDD.flatMap(glob => {
      val path = new Path(glob)
      val fs: FileSystem = path.getFileSystem(new Configuration)
      val statuses = fs.globStatus(path)
      statuses.map(s => s.getPath.toString)
    })
    // Map file name to (name, content) pairs:
    // TODO: Consider adding a second parititon count parameter to repartition before
    // the below map.
    val fileNameContentRDD = filenameRDD.map(f => {
      Pair(f, readPath(f, new Configuration))
    })

    new JavaPairRDD(fileNameContentRDD)
  }

  def readPath(file: String, conf: Configuration) = {
    val path = new Path(file)
    val fs: FileSystem = path.getFileSystem(conf)
    val stream = fs.open(path)
    try {
      IOUtils.toString(stream, StandardCharsets.UTF_8)
    } finally {
      stream.close()
    }
  }
}

helper/build.sbt文件如下所示:

The helper/build.sbt file would look something like this:

organization := "com.google.cloud.dataproc.support"
name := "pyspark_support"
version := "0.1"
scalaVersion := "2.10.5"
libraryDependencies +=  "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies +=  "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided"
exportJars := true

然后我们可以使用sbt构建帮助器:

We can then build the helper with sbt:

$ cd helper && sbt package

输出帮助器jar应该是target/scala-2.10/pyspark_support_2.10-0.1.jar

The output helper jar should be target/scala-2.10/pyspark_support_2.10-0.1.jar

我们现在需要将此jar放入集群中,并需要做两件事:1)将jar上载到GCS,以及2)在GCS中创建一个初始化操作,以将jar复制到集群节点.

We now need to get this jar onto our cluster and to do this, we need to do two things: 1) upload the jar to GCS and 2) create an initialization action in GCS to copy the jar to cluster nodes.

为了便于说明,我们假设您的存储桶名为MY_BUCKET(在此处插入与海象相关的适当的模因).

For purposes of illustration, let's assume your bucket is named MY_BUCKET (insert appropriate walrus-related meme here).

$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar

创建一个初始化动作(我们将其命名为pyspark_init_action.sh,根据需要替换MY_BUCKET):

Create an initialization action (let's call it pyspark_init_action.sh, replacing MY_BUCKET as needed):

#!/bin/bash

gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/

最后将初始化操作上传到GCS:

and finally upload the initialization action to GCS:

$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh

现在可以通过将以下标志传递给gcloud来启动集群:

A cluster can now be started by passing the following flags to gcloud:

--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh

在构建,上传和安装我们的新库之后,我们可以最终从pyspark中使用它:

After building, uploading, and installing our new library we can finally make use of it from pyspark:

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext
from pyspark.serializers import PairDeserializer, UTF8Deserializer

import sys

class DataprocUtils(object):

  @staticmethod
  def wholeTextFiles(sc, glob_list, partitions):
    """
    Read whole text file content from GCS.
    :param sc: Spark context
    :param glob_list: List of globs, each glob should be a prefix for part of the dataset.
    :param partitions: number of partitions to use when creating the RDD
    :return: RDD of filename, filecontent pairs.
    """
    helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper()
    return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc,
               PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  globs = sys.argv[1:]
  partitions = 10
  files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions)
  files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1])))
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))

这篇关于PySpark + Google云存储(wholeTextFiles)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆