Elasticsearch/dataflow - 约 60 个并发连接后连接超时 [英] Elasticsearch/dataflow - connection timeout after ~60 concurrent connection

查看:48
本文介绍了Elasticsearch/dataflow - 约 60 个并发连接后连接超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们在 Elastic Cloud 上托管 elatsicsearch 集群并从数据流 (GCP) 中调用它.作业在 dev 中运行良好,但当我们部署到 prod 时,我们在客户端看到很多连接超时.

回溯(最近一次调用最后一次):文件apache_beam/runners/common.py",第 1213 行,在 apache_beam.runners.common.DoFnRunner.process文件apache_beam/runners/common.py",第 570 行,在 apache_beam.runners.common.SimpleInvoker.invoke_process文件main.py",第 159 行,正在处理中文件/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py",第 152 行,在 _wrapped 中返回 func(*args, params=params, headers=headers, **kwargs)文件/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py",第1617行,在搜索中身体=身体,文件/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py",第390行,在perform_request中提高e文件/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py",第365行,在perform_request中超时=超时,文件/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py",第258行,在perform_request中引发 ConnectionError(N/A", str(e), e)elasticsearch.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to build a new connection: [Errno 110] Connection timed out) 引起:NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe56904e);: 建立新连接失败: [Errno 110] 连接超时)

我将 elasticsearch 客户端中的超时设置增加到 300 秒,如下所示,但似乎没有帮助.

self.elasticsearch = Elasticsearch([es_host], http_auth=http_auth, timeout=300)

我尝试通过 ssh 进入我们遇到连接错误的 VM.netstat 显示有大约 60 个 ESTABLISHED 连接到弹性搜索 IP 地址.当我从 VM 卷曲到 elasticsearch 地址时,我能够重现超时.我可以很好地卷曲到其他 URL.此外,我可以从本地很好地卷曲到 elasticsearch,因此问题仅在于 VM 和 elasticsaerch 服务器之间的连接.

数据流(计算引擎)或 ElasticSearch 对并发连接数有限制吗?我在网上找不到任何信息.

解决方案

我对 ElasticSearch 的连接器进行了一些研究.您可能需要遵循两个原则来确保您的连接器尽可能高效.

注意 如另一个答案中所建议的那样,设置最大工作线程数可能不会有太大帮助(目前) - 让我们提高 Beam/Elastic 集群资源的利用率,如果我们开始达到任一限制,然后我们可以考虑限制工作人员的数量 - 但现在,您可以尝试改进您的连接器.

使用对外部服务的批量请求

您提供的代码为进入 DoFn 的每个元素发出单独的搜索请求.正如您所指出的,这工作正常,但它会导致您的管道花费太多时间等待每个元素的外部请求 - 因此您对往返的等待时间为 O(n).

很高兴,Elasticsearch 客户端有一个 msearch 方法,它应该允许您批量执行搜索.你可以这样做:

class PredictionFn(beam.DoFn):def __init__(self, ...):self.buffer = []...定义过程(自我,元素):self.buffer.append(元素)如果 len(self.buffer) >BATCH_SIZE:返回 self.flush()def冲洗(自我):结果 = []# 执行用户ID的搜索请求user_ids = [uid for cid, did, uid in self.buffer]user_ids_request = self._build_uid_reqs(user_ids)resp = es.msearch(body=user_ids_request)user_id_and_device_id_lists = []对于 r, elm in zip(resp['responses'], self.buffer):如果 len(r[hits"][hits"]) == 0:继续# 获取新的 device_id_listuser_id_and_device_id_lists.append((elm[2], # 用户 IDdevice_id_list))device_id_lists = [elm[1] for elm in user_id_and_device_id_lists]device_ids_request = self._build_device_id_reqs(device_id_lists)resp = es.msearch(body=device_ids_request)resp = self.elasticsearch.search(index=sessions", body={query": {match": {userId": user_id }}})# 处理结果,输出任何需要的东西def _build_uid_reqs(self, uids):# 依靠这个答案:https://stackoverflow.com/questions/28546253/how-to-create-request-body-for-python-elasticsearch-msearch/37187352资源 = []对于 uid 中的 uid:res.append(json.dumps({'index': 'sessions'})) # 请求 HEADres.append(json.dumps({"query": {"match": {"userId": uid }}})) # 请求 BODY返回 '​​\n'.join(res)

重用客户端,因为它是线程安全的

Elasticsearch 客户端 也是线程安全

因此,您可以执行以下操作,而不是每次都创建一个新的:

class PredictionFn(beam.DoFn):客户 = 无定义 init_elasticsearch(self):如果 PredictionFn.CLIENT 不是 None:返回 PredictionFn.CLIENTes_host = fetch_host()http_auth = fetch_auth()PredictionFn.CLIENT = Elasticsearch([es_host], http_auth=http_auth,超时=300,sniff_on_connection_fail=真,retry_on_timeout=True,max_retries=2,maxsize=5) # 每个客户端 5 个连接返回 PredictionFn.CLIENT

这应该确保您为每个工作人员保留一个客户端,并且您不会创建与 ElasticSearch 的这么多连接 - 从而不会收到拒绝消息.

让我知道这两个是否有帮助,或者我们是否需要尝试进一步改进!

We host elatsicsearch cluster on Elastic Cloud and call it from dataflow (GCP). Job works fine in dev but when we deploy to prod we're seeing lots of connection timeout on the client side.

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "main.py", line 159, in process
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1617, in search
    body=body,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 390, in perform_request
    raise e
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 365, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 258, in perform_request
    raise ConnectionError("N/A", str(e), e)
elasticsearch.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out)

I increased timeout setting in elasticsearch client to 300s like below but it didn't seem to help.

self.elasticsearch = Elasticsearch([es_host], http_auth=http_auth, timeout=300)

Looking at deployment at https://cloud.elastic.co/deployments//metrics CPU and memory usage are very low (below 10%) and search response time is also order of 200ms. What could be the bottleneck here and how we can we avoid such timeouts?

As seen in below log most of requests are failing with connection timeout while successful request receives response very quick:

I tried ssh into the VM where we experience the connection error. netstat showed there were about 60 ESTABLISHED connections to the elastic search IP address. When I curl from the VM to elasticsearch address I was able to reproduce timeout. I can curl fine to other URLs. Also I can curl fine to elasticsearch from my local so issue is only connection between VM and elasticsaerch server.

Does dataflow (compute engine) or ElasticSearch has limitation on number of concurrent connection? I could not find any information online.

解决方案

I did a little bit of research about the connector for ElasticSearch. There are a two principles that you may want to try to ensure your connector is as efficient as possible.

Note Setting a maximum number of workers, as suggested in the other answer, will probably not help as much (for now) - let's improve utilization from your Beam/Elastic cluster resources, and if we start hitting limits for either, then we can consider restricting # of workers - but right now, you can try to improve your connector.

Using bulk requests to external services

The code you provide issues an individual search request for every element coming into the DoFn. As you've noted, this works fine, but it will cause your pipeline to spend too much time waiting on external requests for each element - so your wait for roundtrips will be O(n).

Gladly, the Elasticsearch client has an msearch method, which should allow you to perform searches in bulk. You can do something like this:

class PredictionFn(beam.DoFn):
    def __init__(self, ...):
      self.buffer = []
    ...
    def process(self, element):
        self.buffer.append(element)
        if len(self.buffer) > BATCH_SIZE:
          return self.flush()

    def flush(self):
        result = []

        # Perform the search requests for user ids
        user_ids = [uid for cid, did, uid in self.buffer]
        user_ids_request = self._build_uid_reqs(user_ids)

        resp = es.msearch(body=user_ids_request)

        user_id_and_device_id_lists = []
        for r, elm in zip(resp['responses'], self.buffer):
          if len(r["hits"]["hits"]) == 0:
            continue
          # Get new device_id_list
          user_id_and_device_id_lists.append((elm[2],  # User ID
                                              device_id_list))
          

        device_id_lists = [elm[1] for elm in user_id_and_device_id_lists]
        device_ids_request = self._build_device_id_reqs(device_id_lists)

        resp = es.msearch(body=device_ids_request)

        resp = self.elasticsearch.search(index="sessions", body={"query": {"match": {"userId": user_id }}})
        # Handle the result, output anything necessary

    def _build_uid_reqs(self, uids):
      # Relying on this answer: https://stackoverflow.com/questions/28546253/how-to-create-request-body-for-python-elasticsearch-msearch/37187352
      res = []
      for uid in uids:
        res.append(json.dumps({'index': 'sessions'}))  # Request HEAD
        res.append(json.dumps({"query": {"match": {"userId": uid }}}))  # Request BODY

      return '\n'.join(res)

Reusing the client as it's thread-safe

The Elasticsearch client is also thread safe!

So rather than creating a new one every time, you can do something like this:

class PredictionFn(beam.DoFn):
    CLIENT = None

    def init_elasticsearch(self):
        if PredictionFn.CLIENT is not None:
          return PredictionFn.CLIENT
        es_host = fetch_host()
        http_auth = fetch_auth()
        PredictionFn.CLIENT = Elasticsearch([es_host], http_auth=http_auth, 
            timeout=300, sniff_on_connection_fail=True,
            retry_on_timeout=True, max_retries=2,
            maxsize=5) # 5 connections per client
        return PredictionFn.CLIENT

This should ensure that you keep a single client for each worker, and you won't be creating so many connections to ElasticSearch - and thus not getting the rejection messages.

Let me know if these two help, or if we need to try further improvements!

这篇关于Elasticsearch/dataflow - 约 60 个并发连接后连接超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆