Elasticsearch/dataflow-约60个并发连接后的连接超时 [英] Elasticsearch/dataflow - connection timeout after ~60 concurrent connection

查看:140
本文介绍了Elasticsearch/dataflow-约60个并发连接后的连接超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们在弹性云上托管elatsicsearch集群,并从数据流(GCP)中调用它.Job在开发人员中工作正常,但是当我们部署到产品中时,我们会在客户端看到很多连接超时.

  Traceback(最近一次通话最近):在apache_beam.runners.common.DoFnRunner.process中,文件"apache_beam/runners/common.py"第1213行在apache_beam.runners.common.SimpleInvoker.invoke_process中的文件"apache_beam/runners/common.py",第570行文件"main.py",第159行,正在处理中_wrapped中的文件"/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py",第152行返回func(* args,params = params,headers = headers,** kwargs)搜索中的文件"/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py",第1617行身体=身体,在perform_request中,文件"/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py",第390行提高e在perform_request中的第365行中的文件"/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py"超时=超时,在perform_request中的文件"/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py",第258行引发ConnectionError("N/A",str(e),e)elasticsearch.exceptions.ConnectionError:ConnectionError(< urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>):无法建立新连接:[Errno 110]连接超时)由以下原因引起:NewConnectionError(< urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>;:无法建立新连接:[Errno 110]连接超时) 

我将Elasticsearch客户端中的超时设置增加到300s,如下所示,但这似乎无济于事.

  self.elasticsearch = Elasticsearch([es_host],http_auth = http_auth,超时= 300) 

我尝试ssh进入我们遇到连接错误的VM. netstat 显示,到弹性搜索IP地址大约有60个已建立的连接.当我从VM卷曲到elasticsearch地址时,我能够重现超时.我可以将其卷曲到其他URL.另外,我还可以从本地本地卷曲到Elasticsearch,因此问题仅是VM和elasticsaerch服务器之间的连接.

数据流(计算引擎)或ElasticSearch是否对并发连接数有限制?我在网上找不到任何信息.

解决方案

我对ElasticSearch的连接器做了一些研究.您可能要尝试两个原则,以确保连接器尽可能高效.

注意(如其他答案所示),设置最大数量的工作人员可能并没有太大帮助(目前)-让我们从Beam/Elastic集群资源中提高利用率,如果开始达到其中一个限制,然后我们可以考虑限制工人数-但现在,您可以尝试改善连接器.

使用对外部服务的批量请求

您提供的代码会对进入DoFn的每个元素发出单独的搜索请求.正如您已经指出的那样,此方法很好用,但是它将导致您的管道在等待每个元素的外部请求上花费过多的时间-因此,您等待的往返次数将为O(n).

很高兴, Elasticsearch 客户端具有 msearch 方法,该方法应允许您批量执行搜索.您可以执行以下操作:

  class PredictionFn(beam.DoFn):def __init __(self,...):self.buffer = []...def进程(自身,元素):self.buffer.append(元素)如果len(self.buffer)>BATCH_SIZE:返回self.flush()def flush(self):结果= []#执行对用户ID的搜索请求user_ids = [用于cid的uid,用于self.buffer中的uid]user_ids_request = self._build_uid_reqs(user_ids)resp = es.msearch(body = user_ids_request)user_id_and_device_id_lists = []对于zip中的r,elm(resp ['responses'],self.buffer):如果len(r ["hits"] ["hits"])== 0:继续#获取新的device_id_listuser_id_and_device_id_lists.append((elm [2],#用户IDdevice_id_list))device_id_lists = [elm [1]用于user_id_and_device_id_lists中的榆木]device_ids_request = self._build_device_id_reqs(device_id_lists)resp = es.msearch(body = device_ids_request)resp = self.elasticsearch.search(index ="sessions",body = {"query":{"match":{"userId":user_id}}})#处理结果,输出任何必要的内容def _build_uid_reqs(self,uids):#依靠这个答案:https://stackoverflow.com/questions/28546253/how-to-create-request-body-for-python-elasticsearch-msearch/37187352res = []对于uid中的uid:res.append(json.dumps({'index':'sessions'})))请求HEADres.append(json.dumps({"query":{"match":{"userId":uid}}}))#请求BODY返回'\ n'.join(res) 

重新使用客户端,因为它是线程安全的

Elasticsearch 客户端也是线程安全

因此,您不必每次都创建一个新的东西,而可以这样做:

  class PredictionFn(beam.DoFn):客户=无def init_elasticsearch():如果PredictionFn.CLIENT不为None:返回PredictionFn.CLIENTes_host = fetch_host()http_auth = fetch_auth()PredictionFn.CLIENT = Elasticsearch([es_host],http_auth = http_auth,超时= 300,sniff_on_connection_fail =正确,retry_on_timeout = True,max_retries = 2,maxsize = 5)每个客户端5个连接返回PredictionFn.CLIENT 

这应该确保您为每个工作人员保留一个客户端,并且不会与ElasticSearch创建太多连接-从而不会收到拒绝消息.

让我知道这两项是否有帮助,或者我们是否需要尝试进一步的改进!

We host elatsicsearch cluster on Elastic Cloud and call it from dataflow (GCP). Job works fine in dev but when we deploy to prod we're seeing lots of connection timeout on the client side.

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "main.py", line 159, in process
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1617, in search
    body=body,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 390, in perform_request
    raise e
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 365, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 258, in perform_request
    raise ConnectionError("N/A", str(e), e)
elasticsearch.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out)

I increased timeout setting in elasticsearch client to 300s like below but it didn't seem to help.

self.elasticsearch = Elasticsearch([es_host], http_auth=http_auth, timeout=300)

Looking at deployment at https://cloud.elastic.co/deployments//metrics CPU and memory usage are very low (below 10%) and search response time is also order of 200ms. What could be the bottleneck here and how we can we avoid such timeouts?

As seen in below log most of requests are failing with connection timeout while successful request receives response very quick:

I tried ssh into the VM where we experience the connection error. netstat showed there were about 60 ESTABLISHED connections to the elastic search IP address. When I curl from the VM to elasticsearch address I was able to reproduce timeout. I can curl fine to other URLs. Also I can curl fine to elasticsearch from my local so issue is only connection between VM and elasticsaerch server.

Does dataflow (compute engine) or ElasticSearch has limitation on number of concurrent connection? I could not find any information online.

解决方案

I did a little bit of research about the connector for ElasticSearch. There are a two principles that you may want to try to ensure your connector is as efficient as possible.

Note Setting a maximum number of workers, as suggested in the other answer, will probably not help as much (for now) - let's improve utilization from your Beam/Elastic cluster resources, and if we start hitting limits for either, then we can consider restricting # of workers - but right now, you can try to improve your connector.

Using bulk requests to external services

The code you provide issues an individual search request for every element coming into the DoFn. As you've noted, this works fine, but it will cause your pipeline to spend too much time waiting on external requests for each element - so your wait for roundtrips will be O(n).

Gladly, the Elasticsearch client has an msearch method, which should allow you to perform searches in bulk. You can do something like this:

class PredictionFn(beam.DoFn):
    def __init__(self, ...):
      self.buffer = []
    ...
    def process(self, element):
        self.buffer.append(element)
        if len(self.buffer) > BATCH_SIZE:
          return self.flush()

    def flush(self):
        result = []

        # Perform the search requests for user ids
        user_ids = [uid for cid, did, uid in self.buffer]
        user_ids_request = self._build_uid_reqs(user_ids)

        resp = es.msearch(body=user_ids_request)

        user_id_and_device_id_lists = []
        for r, elm in zip(resp['responses'], self.buffer):
          if len(r["hits"]["hits"]) == 0:
            continue
          # Get new device_id_list
          user_id_and_device_id_lists.append((elm[2],  # User ID
                                              device_id_list))
          

        device_id_lists = [elm[1] for elm in user_id_and_device_id_lists]
        device_ids_request = self._build_device_id_reqs(device_id_lists)

        resp = es.msearch(body=device_ids_request)

        resp = self.elasticsearch.search(index="sessions", body={"query": {"match": {"userId": user_id }}})
        # Handle the result, output anything necessary

    def _build_uid_reqs(self, uids):
      # Relying on this answer: https://stackoverflow.com/questions/28546253/how-to-create-request-body-for-python-elasticsearch-msearch/37187352
      res = []
      for uid in uids:
        res.append(json.dumps({'index': 'sessions'}))  # Request HEAD
        res.append(json.dumps({"query": {"match": {"userId": uid }}}))  # Request BODY

      return '\n'.join(res)

Reusing the client as it's thread-safe

The Elasticsearch client is also thread safe!

So rather than creating a new one every time, you can do something like this:

class PredictionFn(beam.DoFn):
    CLIENT = None

    def init_elasticsearch(self):
        if PredictionFn.CLIENT is not None:
          return PredictionFn.CLIENT
        es_host = fetch_host()
        http_auth = fetch_auth()
        PredictionFn.CLIENT = Elasticsearch([es_host], http_auth=http_auth, 
            timeout=300, sniff_on_connection_fail=True,
            retry_on_timeout=True, max_retries=2,
            maxsize=5) # 5 connections per client
        return PredictionFn.CLIENT

This should ensure that you keep a single client for each worker, and you won't be creating so many connections to ElasticSearch - and thus not getting the rejection messages.

Let me know if these two help, or if we need to try further improvements!

这篇关于Elasticsearch/dataflow-约60个并发连接后的连接超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆