PySpark + 谷歌云存储(wholeTextFiles) [英] PySpark + Google Cloud Storage (wholeTextFiles)

查看:30
本文介绍了PySpark + 谷歌云存储(wholeTextFiles)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 PySpark (Google Dataproc) 解析大约 100 万个 HTML 文件,并将相关字段写入压缩文件.每个 HTML 文件大约 200KB.因此,所有数据约为 200GB.

I am trying to parse about 1 million HTML files using PySpark (Google Dataproc) and write the relevant fields out to a condensed file. Each HTML file is about 200KB. Hence, all the data is about 200GB.

如果我使用数据的子集,下面的代码可以正常工作,但运行几个小时,然后在整个数据集上运行时崩溃.此外,工作节点未使用(<5% CPU),所以我知道存在一些问题.

The code below works fine if I use a subset of the data, but runs for hours and then crashes when running on the whole dataset. Furthermore, the worker nodes are not utilized (<5% CPU) so I know there is some issue.

我相信系统会因从 GCS 中提取数据而窒息.有一个更好的方法吗?另外,当我以这种方式使用 wholeTextFiles 时,主程序是否会尝试下载所有文件,然后将它们发送给执行程序,还是让执行程序下载它们?

I believe the system is choking on ingesting the data from GCS. Is there a better way to do this? Also, when I use wholeTextFiles in this fashion, does the master attempt to download all the files and then send them to the executors, or does it let the executors download them?

def my_func(keyval):
   keyval = (file_name, file_str)
   return parser(file_str).__dict__

data = sc.wholeTextFiles("gs://data/*")
output = data.map(my_func)
output.saveAsTextFile("gs://results/a")

推荐答案

为了回答你的问题,master 不会读取所有包含的数据,但它会在开始工作之前获取所有输入文件的状态.默认情况下,Dataproc 将属性mapreduce.input.fileinputformat.list-status.num-threads"设置为 20,以帮助缩短此查找的时间,但在 GCS 中仍会为每个文件执行 RPC.

To answer your question the master won't read all of the contained data, but it will fetch status for all input files before beginning work. Dataproc sets the property "mapreduce.input.fileinputformat.list-status.num-threads" to 20 by default to help improve the time of this lookup, but an RPC is still performed per file in GCS.

您似乎发现了一个案例,即使添加线程也无济于事,只是让驱动程序更快地进入 OOM.

It seems you've found a case where even adding threads isn't helping very much and is just leading the driver to OOM faster.

关于如何并行化读取,我有两个想法.

Expanding on how to parallelize the read, I have two ideas.

但首先,有一点警告:这些解决方案都不是非常健壮的,因为它们对包含在 glob 中的目录非常健壮.您可能希望防止目录出现在要读取的文件列表中.

But first, a bit of a warning: neither of these solutions as they are are very robust to directories being included in the glob. You will probably want to guard against directories appearing in the list of files to read.

第一个是使用 python 和 hadoop 命令行工具完成的(这也可以使用 gsutil 完成).下面是一个例子,它可能看起来如何并在工作人员上执行文件列表,将文件内容成对读取并最终计算(文件名,文件长度)对:

The first is done with python and the hadoop command line tools (this could also be done with gsutil). The below is an example of how it might look and performs a file listing on workers, reads file content into pairs and finally computes pairs of (file name, file length):

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext

import sys
import subprocess


def hadoop_ls(file_glob):
  lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("
")
  files = [line.split()[7] for line in lines if len(line) > 0]
  return files

def hadoop_cat(file):
  return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8")

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  # This is just for testing. You'll want to generate a list 
  # of prefix globs instead of having a list passed in from the 
  # command line.
  globs = sys.argv[1:]
  # Desired listing partition count
  lpc = 100
  # Desired 'cat' partition count, should be less than total number of files
  cpc = 1000
  files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls)
  files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)])
  files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])])
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))

我会首先从这个子流程解决方案开始,并尝试对 hadoop_ls 和 hadoop_cat 调用进行分区,看看你是否能得到一些可以接受的东西.

I would first start with this subprocess solution and play with the partitioning of hadoop_ls and hadoop_cat calls and see if you can get something that is acceptable.

第二种解决方案更复杂,但可能会通过避免很多很多 exec 调用来产生一个性能更高的管道.

The second solution is more complicated, but will probably yield a pipeline that is more performant by avoiding many, many exec calls.

在第二个解决方案中,我们将编译一个特殊用途的帮助程序 jar,使用初始化操作将该 jar 复制到所有工作人员,最后使用我们驱动程序中的帮助程序.

In this second solution, we'll be compiling a special purpose helper jar, using an initialization action to copy that jar to all workers and finally making use of the helper from our driver.

我们的 scala jar 项目的最终目录结构如下所示:

The final directory structure of our the scala jar project will look something like this:

helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala
helper/build.sbt

在我们的 PysparkHelper.scala 文件中,我们将有一个小的 scala 类,其功能与我们上面的纯 Python 解决方案一样.首先,我们将创建一个文件 glob 的 RDD,然后是文件名的 RDD,最后是文件名和文件内容对的 RDD.

In our PysparkHelper.scala file we will have a small scala class that functions much as our pure python solution above does. First we will create an RDD of file globs, then an RDD of file names and finally an RDD of file name and file content pairs.

package com.google.cloud.dataproc.support

import collection.JavaConversions._

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext}

import java.util.ArrayList
import java.nio.charset.StandardCharsets

class PysparkHelper extends Serializable {
  def wholeTextFiles(
    context: JavaSparkContext,
    paths: ArrayList[String],
    partitions: Int): JavaPairRDD[String, String] = {

    val globRDD = context.sc.parallelize(paths).repartition(partitions)
    // map globs to file names:
    val filenameRDD = globRDD.flatMap(glob => {
      val path = new Path(glob)
      val fs: FileSystem = path.getFileSystem(new Configuration)
      val statuses = fs.globStatus(path)
      statuses.map(s => s.getPath.toString)
    })
    // Map file name to (name, content) pairs:
    // TODO: Consider adding a second parititon count parameter to repartition before
    // the below map.
    val fileNameContentRDD = filenameRDD.map(f => {
      Pair(f, readPath(f, new Configuration))
    })

    new JavaPairRDD(fileNameContentRDD)
  }

  def readPath(file: String, conf: Configuration) = {
    val path = new Path(file)
    val fs: FileSystem = path.getFileSystem(conf)
    val stream = fs.open(path)
    try {
      IOUtils.toString(stream, StandardCharsets.UTF_8)
    } finally {
      stream.close()
    }
  }
}

helper/build.sbt 文件看起来像这样:

The helper/build.sbt file would look something like this:

organization := "com.google.cloud.dataproc.support"
name := "pyspark_support"
version := "0.1"
scalaVersion := "2.10.5"
libraryDependencies +=  "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies +=  "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided"
exportJars := true

然后我们可以使用 sbt 构建帮助器:

We can then build the helper with sbt:

$ cd helper && sbt package

输出帮助 jar 应该是 target/scala-2.10/pyspark_support_2.10-0.1.jar

The output helper jar should be target/scala-2.10/pyspark_support_2.10-0.1.jar

我们现在需要将这个 jar 放到我们的集群中,为此,我们需要做两件事:1)将 jar 上传到 GCS 和 2)在 GCS 中创建一个初始化操作以将 jar 复制到集群节点.

We now need to get this jar onto our cluster and to do this, we need to do two things: 1) upload the jar to GCS and 2) create an initialization action in GCS to copy the jar to cluster nodes.

为了便于说明,我们假设您的存储桶名为 MY_BUCKET(在此处插入适当的海象相关 meme).

For purposes of illustration, let's assume your bucket is named MY_BUCKET (insert appropriate walrus-related meme here).

$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar

创建一个初始化操作(我们称之为 pyspark_init_action.sh,根据需要替换 MY_BUCKET):

Create an initialization action (let's call it pyspark_init_action.sh, replacing MY_BUCKET as needed):

#!/bin/bash

gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/

最后将初始化动作上传到GCS:

and finally upload the initialization action to GCS:

$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh

现在可以通过将以下标志传递给 gcloud 来启动集群:

A cluster can now be started by passing the following flags to gcloud:

--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh

在构建、上传和安装我们的新库之后,我们终于可以从 pyspark 中使用它了:

After building, uploading, and installing our new library we can finally make use of it from pyspark:

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext
from pyspark.serializers import PairDeserializer, UTF8Deserializer

import sys

class DataprocUtils(object):

  @staticmethod
  def wholeTextFiles(sc, glob_list, partitions):
    """
    Read whole text file content from GCS.
    :param sc: Spark context
    :param glob_list: List of globs, each glob should be a prefix for part of the dataset.
    :param partitions: number of partitions to use when creating the RDD
    :return: RDD of filename, filecontent pairs.
    """
    helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper()
    return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc,
               PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  globs = sys.argv[1:]
  partitions = 10
  files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions)
  files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1])))
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))

这篇关于PySpark + 谷歌云存储(wholeTextFiles)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆