PySpark UDF优化挑战 [英] PySpark UDF optimization challenge

查看:75
本文介绍了PySpark UDF优化挑战的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试优化以下代码.使用1000行数据运行时,大约需要12分钟才能完成.我们的用例要求数据大小在25K-50K行左右,这将使该实现完全不可行.

I am trying to optimize the code below. The when run with 1000 lines of data takes about 12 minutes to complete. Our use case would require data sizes to be around 25K - 50K rows which would make this implementation completely infeasible.

import pyspark.sql.types as Types
import numpy
import spacy
from pyspark.sql.functions import udf

inputPath = "s3://myData/part-*.parquet"
df = spark.read.parquet(inputPath)

test_df = df.select('uid', 'content').limit(1000).repartition(10)

# print(df.rdd.getNumPartitions()) -> 4
# print(test_df.rdd.getNumPartitions()) -> 1

def load_glove(fn):
    vector_dict = {}
    count = 0
    with open(fn) as inf:
        for line in inf:
            count += 1
            eles = line.strip().split()
            token = eles[0]
            try:
                vector_dict[token] = numpy.array([float(x) for x in eles[1:]])
                assert len(vector_dict[token]) == 300
            except:
                print("Exception in load_glove")
                pass
    return vector_dict

# Returning an Array of doubles from the udf
@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = spacy.load('en', max_length=6000000)
  gloveEmbeddingsPath = "/home/hadoop/short_glove_1000.300d.txt"
  glove_embeddings_dict = load_glove(gloveEmbeddingsPath)
  spacy_doc = nlp(text)
  doc_vec = numpy.array([0.0] * 300)
  doc_vec = numpy.float32(doc_vec)
  wordcount = 0
  for sentence_id, sentence in enumerate(spacy_doc.sents):
      for word in sentence:
          if word.text in glove_embeddings_dict:
              # Pre-convert to glove dictionary to float32 representations
              doc_vec += numpy.float32(glove_embeddings_dict[word.text])
              wordcount += 1

  # Document Vector is the average of all word vectors in the document
  doc_vec = doc_vec/(1.0 * wordcount)
  return doc_vec.tolist()

spark.udf.register("generateVectorRepresentation", generateVectorRepresentation)

document_vector_df = test_df.withColumn("Glove Document Vector", generateVectorRepresentation('content'))

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_document_vector_df = document_vector_df.toPandas()

# print(pandas_document_vector_df)
pandas_document_vector_df.head()

我想知道你们是否可以帮助回答以下问题

I was wondering if you guys could help answer the questions below

是否在每次迭代中都调用了spacy.load()和load_glove()方法?有没有办法为每个工作节点准备一次load_glove()数据,而不是为每行数据准备一次?load_glove方法返回一个字典对象,该对象可能最大为5GB.有没有一种方法可以在主节点上进行准备,然后将其作为参数传递给UDF?

Is the spacy.load() and load_glove() method invoked on every iteration? Is there a way to prepare the load_glove() data once for every worker node instead of once for every line of data? The load_glove method returns a dictionary object which could be as large as 5GB. Is there a way to prepare that on the master node and then pass as a parameter to the UDF?

感谢您的建议.预先感谢!

Appreciate your suggestions. Thanks in advance!

推荐答案

是的,在当前实现中,每次运行函数时都会执行所有模型加载代码,这远非最佳选择.无法将其直接从驱动程序传递到工作程序节点,但是有一种类似的方法-在每个工作程序上初始化模型,但只能初始化一次.为此,您将不得不使用惰性函数,该函数仅在需要实际结果时才执行-因此,对工人而言.

Yes, in the current implementation, all the model-loading code would be executed each time your function is run, which is far from optimal. There's no way to pass it from driver to worker nodes directly, but there's a similar way - initialize model on each worker, but only once. For that, you'll have to use lazy function, that will be executed only when the actual result would be required - so, on workers.

尝试执行以下操作:

# Here we are not loading the model at the loading time, only the worker code
# will invoke this routine and gets the spacy object. Which means we are loading
# new spacy models on every executors.
SPACY_MODEL = None
def get_spacy_model():
    global SPACY_MODEL
    if not SPACY_MODEL:
       _model = spacy.load('en', max_length=6000000)
    SPACY_MODEL = _model
    return SPACY_MODEL

@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = get_spacy_model()
  # your further processing

我认为您可以尝试将手套加载代码添加到类似的功能中.

I think you can try adding glove loading code into a similar function.

您可以尝试在此处详细了解以下内容: https://haridas.in/run-spacy-jobs-on-apache-spark.html (这不是我的博客,只是在尝试使用Spacy模型执行相同操作时才找到此信息).

You can try reading more about that here: https://haridas.in/run-spacy-jobs-on-apache-spark.html (this is not my blog, just found this info while trying to do same thing you need with Spacy model).

这篇关于PySpark UDF优化挑战的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆