Elasticsearch分析()不使用Python星火兼容? [英] Elasticsearch analyze() not compatible with Spark in Python?

查看:762
本文介绍了Elasticsearch分析()不使用Python星火兼容?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是使用Python 3内PySpark的elasticsearch-PY客户端和我使用的是ES的分析()函数与RDD一起运行到一个问题。特别是,在我的RDD每个记录文本字符串,我试图分析它走出令牌信息,但我越来越想在星火映射函数中使用它时出错。

I'm using the elasticsearch-py client within PySpark using Python 3 and I'm running into a problem using the analyze() function with ES in conjunction with an RDD. In particular, each record in my RDD is a string of text and I'm trying to analyze it to get out the token information, but I'm getting an error when trying to use it within a map function in Spark.

例如,这工作完全正常:

For example, this works perfectly fine:

from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]

{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}

然而,当我试试这个:

However, when I try this:

trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()

我得到的相关酸洗真的很长的错误信息(这里是它的结束):

I get a really really long error message related to pickling (Here's the end of it):

(self, obj)    109if'recursion'in.[0]:    110="""Could not pickle object as excessively deep recursion required."""--> 111                  picklePicklingErrormsg

  save_memoryviewself obj

: Could not pickle object as excessively deep recursion required.

raise.()    112    113def(,):PicklingError

我不知道是什么错误意味着。难道我做错了什么?有没有一种方法来映射ES分析功能到一个RDD?

I'm not sure what the error means. Am I doing something wrong? Is there a way to map the ES analyze function onto records of an RDD?

编辑:从elasticsearch-PY应用等功能时,以及我也得到这个行为(例如,es.termvector())

I'm also getting this behavior when applying other functions from elasticsearch-py as well (for example, es.termvector()).

推荐答案

本质上 Elasticsearch 客户端不是序列化。所以,你需要做的是在客户端创建的每个分区的一个实例,并对其进行处理:

Essentially the Elasticsearch client is not serializable. So what you need to do is create an instance of the client for each partition, and process them:

高清get_tokens(部分):
    ES = Elasticsearch()
    收率[es.indices.analyze(文字= X)['令牌'] [0]中的x​​部分]
RDD = sc.parallelize(['敏捷的棕色狐狸'],['褐快速狗'],numSlices = 2)
rdd.mapPartitions(拉姆达号码:get_tokens(P))收集()。

应,得到以下的结果:
出[17]:
[[{u'end_offset':3,
   u'position':1,
   u'start_offset':0,
   u'token':u'the',
   u'type':U'&LT; ALPHANUM&GT;'}],
 [{u'end_offset':5,
   u'position':1,
   u'start_offset':0,
   u'token':u'brown',
   u'type':U'&LT; ALPHANUM&GT;'}]]

Should give the following result: Out[17]: [[{u'end_offset': 3, u'position': 1, u'start_offset': 0, u'token': u'the', u'type': u'<ALPHANUM>'}], [{u'end_offset': 5, u'position': 1, u'start_offset': 0, u'token': u'brown', u'type': u'<ALPHANUM>'}]]

请注意,对于大型数据集,这将是非常低效的,因为它涉及到一个REST调用ES在数据集中的每个元素。

Note that for large data sets, this is going to be very inefficient as it involves a REST call to ES for each element in the dataset.

这篇关于Elasticsearch分析()不使用Python星火兼容?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆