Elasticsearch分析()不使用Python星火兼容? [英] Elasticsearch analyze() not compatible with Spark in Python?
问题描述
我使用的是使用Python 3内PySpark的elasticsearch-PY客户端和我使用的是ES的分析()函数与RDD一起运行到一个问题。特别是,在我的RDD每个记录文本字符串,我试图分析它走出令牌信息,但我越来越想在星火映射函数中使用它时出错。
I'm using the elasticsearch-py client within PySpark using Python 3 and I'm running into a problem using the analyze() function with ES in conjunction with an RDD. In particular, each record in my RDD is a string of text and I'm trying to analyze it to get out the token information, but I'm getting an error when trying to use it within a map function in Spark.
例如,这工作完全正常:
For example, this works perfectly fine:
from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]
{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}
然而,当我试试这个:
However, when I try this:
trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()
我得到的相关酸洗真的很长的错误信息(这里是它的结束):
I get a really really long error message related to pickling (Here's the end of it):
(self, obj) 109if'recursion'in.[0]: 110="""Could not pickle object as excessively deep recursion required."""--> 111 picklePicklingErrormsg
save_memoryviewself obj
: Could not pickle object as excessively deep recursion required.
raise.() 112 113def(,):PicklingError
我不知道是什么错误意味着。难道我做错了什么?有没有一种方法来映射ES分析功能到一个RDD?
I'm not sure what the error means. Am I doing something wrong? Is there a way to map the ES analyze function onto records of an RDD?
编辑:从elasticsearch-PY应用等功能时,以及我也得到这个行为(例如,es.termvector())
I'm also getting this behavior when applying other functions from elasticsearch-py as well (for example, es.termvector()).
推荐答案
本质上 Elasticsearch
客户端不是序列化。所以,你需要做的是在客户端创建的每个分区的一个实例,并对其进行处理:
Essentially the Elasticsearch
client is not serializable. So what you need to do is create an instance of the client for each partition, and process them:
高清get_tokens(部分):
ES = Elasticsearch()
收率[es.indices.analyze(文字= X)['令牌'] [0]中的x部分]
RDD = sc.parallelize(['敏捷的棕色狐狸'],['褐快速狗'],numSlices = 2)
rdd.mapPartitions(拉姆达号码:get_tokens(P))收集()。
应,得到以下的结果:出[17]:
[[{u'end_offset':3,
u'position':1,
u'start_offset':0,
u'token':u'the',
u'type':U'&LT; ALPHANUM&GT;'}],
[{u'end_offset':5,
u'position':1,
u'start_offset':0,
u'token':u'brown',
u'type':U'&LT; ALPHANUM&GT;'}]]
Should give the following result:
Out[17]:
[[{u'end_offset': 3,
u'position': 1,
u'start_offset': 0,
u'token': u'the',
u'type': u'<ALPHANUM>'}],
[{u'end_offset': 5,
u'position': 1,
u'start_offset': 0,
u'token': u'brown',
u'type': u'<ALPHANUM>'}]]
请注意,对于大型数据集,这将是非常低效的,因为它涉及到一个REST调用ES在数据集中的每个元素。
Note that for large data sets, this is going to be very inefficient as it involves a REST call to ES for each element in the dataset.
这篇关于Elasticsearch分析()不使用Python星火兼容?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!