我可以在常规Spark映射操作中使用Spark DataFrame吗? [英] Can I use Spark DataFrame inside regular Spark map operation?

查看:75
本文介绍了我可以在常规Spark映射操作中使用Spark DataFrame吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图通过如下所示的常规Spark映射操作使用在Spark DataFrame之前定义的内容:

I tried to use defined before Spark DataFrame from a regular Spark map operation like below:

businessJSON = os.path.join(targetDir, 'business.json')
businessDF = sqlContext.read.json(businessJSON)

reviewsJSON = os.path.join(targetDir, 'review.json')
reviewsDF = sqlContext.read.json(reviewsJSON)

contains = udf(lambda xs, val: val in xs, BooleanType())

def selectReviews(category):
    businessesByCategory = businessDF[contains(businessDF.categories, lit(category))]
    selectedReviewsDF = reviewsDF.join(businessesByCategory,\
                                   businessesByCategory.business_id == reviewsDF.business_id)      
    return selectedReviewsDF.select("text").map(lambda x: x.text)

categories = ['category1', 'category2'] 
rdd = (sc.parallelize(cuisines)
       .map(lambda c: (c, selectReviews(c)))
       )

rdd.take(1)

并且我收到一条巨大的错误消息:

and I've got a huge error message:

Py4JError                                 Traceback (most recent call last)
<ipython-input-346-051af5183a76> in <module>()
     12        )
     13 
---> 14 rdd.take(1)

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in take(self, num)
   1275 
   1276             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1277             res = self.context.runJob(self, takeUpToNumLeft, p, True)
   1278 
   1279             items += res

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    894         # SparkContext#runJob.
    895         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 896         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions,
    897                                           allowLocal)
    898         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in _jrdd(self)
   2361         command = (self.func, profiler, self._prev_jrdd_deserializer,
   2362                    self._jrdd_deserializer)
-> 2363         pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
   2364         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
   2365                                              bytearray(pickled_cmd),

 /usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
   2281     # the serialized command will be compressed by broadcast
   2282     ser = CloudPickleSerializer()
-> 2283     pickled_command = ser.dumps(command)
   2284     if len(pickled_command) > (1 << 20):  # 1M
   2285         # The broadcast will have same life cycle as created PythonRDD

 ...

/Users/igorsokolov/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    304             reduce = getattr(obj, "__reduce_ex__", None)
    305             if reduce:
--> 306                 rv = reduce(self.proto)
    307             else:
    308                 reduce = getattr(obj, "__reduce__", None)

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    302                 raise Py4JError(
    303                     'An error occurred while calling {0}{1}{2}. Trace:\n{3}\n'.
--> 304                     format(target_id, '.', name, value))
    305         else:
    306             raise Py4JError(

Py4JError: An error occurred while calling o96495.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

我进行了一些调查,以了解哪一行确切地导致了此错误,并且我发现获得此错误的最低代码是:

I make some investigation to understand which line exactly leads to this error and I found out that the minimum code to get this error is:

def selectReviews(category):
    return reviewsDF.select("text")

rdd = (sc.parallelize(categories)
       .map(lambda c: (c, selectReviews(c)))
       )

rdd.take(1)

因此,我得出的结论是我以某种方式使用了错误的DataFrame,但从Spark文档中还不清楚.我怀疑reviewDF应该分布在群集中的所有计算机上,但是我猜想,因为我已经使用SqlContext创建了它,所以它应该已经在Spark Context中了.

Thus I make conclusion I use somehow wrong DataFrame, but what exactly is not clear from the Spark documentation. I have suspicious that reviewsDF should be distributed across all machines in a cluster, but I guess since I've created using SqlContext this should be already in the Spark Context.

谢谢.

推荐答案

火花不是可重入的.具体来说,工作人员无法在另一个动作或转换的步骤内执行新的RDD动作或转换.

Spark is not re-entrant. Specifically, workers can not execute new RDD actions or transformations within a step of another action or transformation.

当在工作节点上发生的映射lambda函数中调用selectReviews时,会发生此问题,因为selectReviews要求在RDD后备reviewsDF上执行.select().

This issue occurs when selectReviews is called in a map's lambda function, which is occuring on a worker node, as selectReviews requires executing .select() on the RDD backing reviewsDF.

解决方法是在本地执行的简单的for循环或类似的替换categories的地方替换sc.parallelize.在每次调用selectReviews时发生的数据帧过滤中仍会涉及从spark加速.

The workaround is to replace sc.parallelize with a simple for loop or similar, over categories, executed locally. Speedup from spark will still be involved in the data frame filtering that occurs in each call to selectReviews.

这篇关于我可以在常规Spark映射操作中使用Spark DataFrame吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆