使用Rest High Level Client检索数据或将数据插入Elastic Search时发生SocketTimeoutException [英] SocketTimeoutException while retrieving or inserting data into Elastic Search by using Rest High Level Client

查看:1159
本文介绍了使用Rest High Level Client检索数据或将数据插入Elastic Search时发生SocketTimeoutException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在从Elastic检索数据/向Elastic插入数据时,我正面对SocketTimeoutException.当10-30 request/second附近时,就会发生这种情况.这些请求是get/put的组合.

I'm facing SocketTimeoutException while retrieving/inserting data from/to elastic. This is happening when there are around 10-30 request/second. These requests are combination of get/put.

这是我的弹性配置:

  • 3 master nodes每个4GB RAM
  • 2 data nodes每个8GM RAM
  • 连接到上述数据节点的Azure负载均衡器(似乎仅打开了9200端口).而且Java客户端因为只公开而连接到此负载均衡器.
  • 弹性版本:7.2.0
  • 其他高级客户端:

  • 3 master nodes each of 4GB RAM
  • 2 data nodes each of 8GM RAM
  • Azure load balancer which connects to above data node (seems only 9200 port is opened on it). And java client connects to this load balancer as it's only exposed.
  • Elastic Version: 7.2.0
  • Rest High Level Client:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.2.0</version>
</dependency>

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.2.0</version>
</dependency>

索引信息:

  • 索引分片:2
  • 索引副本:1
  • 索引总数字段:10000
  • 来自基巴纳语的索引大小:总计-27.2 MB& Primaries: 12.2MB
  • 索引结构:
  • Index shards: 2
  • Index replica: 1
  • Index total fields: 10000
  • Size of index from kibana: Total-27.2 MB & Primaries: 12.2MB
  • Index structure:
{
  "dev-index": {
    "mappings": {
      "properties": {
        "dataObj": {
          "type": "object",
          "enabled": false
        },
        "generatedID": {
          "type": "keyword"
        },
        "transNames": { //it's array of string
          "type": "keyword"
        }
      }
    }
  }
}

  • 动态映射已禁用.
  • 以下是我的elastic Config文件.这里有两个连接bean,一个用于读取&另一个用于写弹性.

    Following is my elastic Config file. Here I've two connection bean, one is for read & another for write to elastic.

    ElasticConfig.java:

    @Configuration
    public class ElasticConfig {
    
        @Value("${elastic.host}")
        private String elasticHost;
    
        @Value("${elastic.port}")
        private int elasticPort;
    
        @Value("${elastic.user}")
        private String elasticUser;
    
        @Value("${elastic.pass}")
        private String elasticPass;
    
        @Value("${elastic-timeout:20}")
        private int timeout;
    
        @Bean(destroyMethod = "close")
        @Qualifier("readClient")
        public RestHighLevelClient readClient(){
    
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));
    
            RestClientBuilder builder = RestClient
                    .builder(new HttpHost(elasticHost, elasticPort))
                    .setHttpClientConfigCallback(httpClientBuilder -> 
                            httpClientBuilder
                                    .setDefaultCredentialsProvider(credentialsProvider)
                                    .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
                    );
    
            builder.setRequestConfigCallback(requestConfigBuilder -> 
                    requestConfigBuilder
                            .setConnectTimeout(10000)
                            .setSocketTimeout(60000)
                            .setConnectionRequestTimeout(0)
            );
    
            RestHighLevelClient restClient = new RestHighLevelClient(builder);
            return restClient;
        }
    
        @Bean(destroyMethod = "close")
        @Qualifier("writeClient")
        public RestHighLevelClient writeClient(){
    
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));
    
            RestClientBuilder builder = RestClient
                    .builder(new HttpHost(elasticHost, elasticPort))
                    .setHttpClientConfigCallback(httpClientBuilder -> 
                            httpClientBuilder
                                    .setDefaultCredentialsProvider(credentialsProvider)
                                    .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
                    );
    
            builder.setRequestConfigCallback(requestConfigBuilder -> 
                    requestConfigBuilder
                            .setConnectTimeout(10000)
                            .setSocketTimeout(60000)
                            .setConnectionRequestTimeout(0)
            );
    
            RestHighLevelClient restClient = new RestHighLevelClient(builder);
            return restClient;
        }
    
    }
    

    这里是调用弹性的函数,如果弹性中的数据可用,它将接走它,否则将生成数据&变得富有弹性.

    Here is the function which makes a call to elastic, if data is available in elastic it will take it else it will generate data & put into elastic.

    public Object getData(Request request) {
    
        DataObj elasticResult = elasticService.getData(request);
        if(elasticResult!=null){
            return elasticResult;
        }
        else{
            //code to generate data
            DataObj generatedData = getData();//some function which will generated data
            //put above data into elastic by Async call.
            elasticAsync.putData(generatedData);
            return generatedData;
        }
    }
    

    ElasticService.java getData函数:

    @Service
    public class ElasticService {
    
        @Value("${elastic.index}")
        private String elasticIndex;
    
        @Autowired
        @Qualifier("readClient")
        private RestHighLevelClient readClient;
    
        public DataObj getData(Request request){
            String generatedId = request.getGeneratedID();
    
            GetRequest getRequest = new GetRequest()
                    .index(elasticIndex)   //elastic index name
                    .id(generatedId);   //retrieving by index id from elastic _id field (as key-value)
    
            DataObj result = null;
            try {
                GetResponse response = readClient.get(getRequest, RequestOptions.DEFAULT);
                if(response.isExists()) {
                    ObjectMapper objectMapper = new ObjectMapper();
                    result = objectMapper.readValue(response.getSourceAsString(), DataObj.class);
                }
            }  catch (Exception e) {
                LOGGER.error("Exception occurred during  fetch from elastic !!!! " + ,e);
            }
            return result;
        }
    
    }
    

    ElasticAsync.java异步放置数据功能:

    @Service
    public class ElasticAsync {
    
        private static final Logger LOGGER = Logger.getLogger(ElasticAsync.class.getName());
    
        @Value("${elastic.index}")
        private String elasticIndex;
    
        @Autowired
        @Qualifier("writeClient")
        private RestHighLevelClient writeClient;
    
        @Async
        public void putData(DataObj generatedData){
         ElasticVO updatedRequest = toElasticVO(generatedData);//ElasticVO matches to the structure of index given above.
    
            try {
                ObjectMapper objectMapper = new ObjectMapper();
                String jsonString = objectMapper.writeValueAsString(updatedRequest);
    
                IndexRequest request = new IndexRequest(elasticIndex);
                request.id(generatedData.getGeneratedID());
                request.source(jsonString, XContentType.JSON);
                request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
                request.timeout(TimeValue.timeValueSeconds(5));
                IndexResponse indexResponse = writeClient.index(request, RequestOptions.DEFAULT);
                LOGGER.info("response id: " + indexResponse.getId());
    
                }
    
            } catch (Exception e) {
                LOGGER.error("Exception occurred during saving into elastic !!!!",e);
            }
    
    
        }
    
    }
    

    在将数据保存到弹性模块期间发生异常时,这是堆栈跟踪的一部分:

    Here is the some part of the stack trace when exception is occurred during saving data into elastic:

    2019-07-19 07:32:19.997 ERROR [data-retrieval,341e6ecc5b10f3be,1eeb0722983062b2,true] 1 --- [askExecutor-894] a.c.s.a.service.impl.ElasticAsync        : Exception occurred during saving into elastic !!!!
    
    java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-34 [ACTIVE]
    at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:789) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:225) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:212) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1448) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1418) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1388) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
    
    
    Caused by: java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-34 [ACTIVE]
        at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
        at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        ... 1 common frames omitted
    

    在将数据检索到弹性期间发生异常时,这是堆栈跟踪的一部分:

    Here is the some part of the stack trace when exception is occurred during retrieving data into elastic:

    2019-07-19 07:22:37.844 ERROR [data-retrieval,104cf6b2ab5b3349,b302d3d3cd7ebc84,true] 1 --- [o-8080-exec-346] a.c.s.a.service.impl.ElasticService      : Exception occurred during  fetch from elastic !!!! 
    
    java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-30 [ACTIVE]
        at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:789) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:225) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:212) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1433) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1403) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1373) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestHighLevelClient.get(RestHighLevelClient.java:699) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
    
    
    
    Caused by: java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-30 [ACTIVE]
            at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
        at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        ... 1 common frames omitted
    

    我已经经历过几次stackoverflow& elastic相关博客中提到此问题的原因可能是RAM& cluster弹性配置.然后,我将分片从5更改为2,因为只有两个数据节点.还将Data节点的内存从4GB增加到8GB,因为我知道弹性将仅使用RAM总数中的50%.异常的发生率有所降低,但问题仍然存在.

    I've gone through couple of stackoverflow & elastic related blogs where they have mentioned this issue could be due to RAM & cluster configuration of elastic. Then I've changed my shards from 5 to 2 as there were only two data nodes. Also increased ram of Data nodes from 4GB to 8GB, as I get to know that elastic will use only 50% of total RAM. The occurrences of exception have decreased but problem still persist.

    解决此问题的可能方法是什么?从Java/弹性配置的角度来看,我经常缺少这种SocketTimeoutException的东西是什么?如果您需要有关该配置的更多详细信息,请告诉我.

    What could be possible ways to solve this problem ? What I'm missing from java/elastic configuration point of view which frequently throwing this kind of SocketTimeoutException ? Let me know if you require any more details regarding the configuration.

    推荐答案

    我们遇到了同样的问题,经过一番挖掘后,我发现了根本原因:客户端与弹性服务器内核配置之间的防火墙配置不匹配为TCP保持生命.

    We've had the same issue and after quite some digging I found the root cause: a config mismatch of the firewall between the client and the elastic servers kernel config for tcp keep alive.

    防火墙在3600秒后丢弃空闲连接.问题是tcp keep alive的内核参数设置为7200秒(RedHat 6.x/7.x中的默认设置):

    The firewall drops idle connections after 3600 seconds. The problem was that the kernel parameter for the tcp keep alive was set to 7200 seconds (default in RedHat 6.x/7.x):

    sysctl -n net.ipv4.tcp_keepalive_time
    7200
    

    因此,在发送保持活动探测之前,连接已断开.弹性http客户端中的asyncHttpClient似乎不能很好地处理掉线连接,它只是等待套接字超时.

    So the connections are dropped before a keep alive probe is being sent. The asyncHttpClient in the elastic http client doesn't seem to handle dropped connections very well, it just waits until the socket timeout.

    因此,请检查客户端和服务器之间是否有会话超时或类似的网络设备(负载均衡器,防火墙,代理等),并增加超时时间或降低tcp_keep_alive内核参数.

    So check whether you have any network device (Loadbalancer, Firewall, Proxy etc.) between your client and server which has a session timeout or similar and either increase that timeout or lower the tcp_keep_alive kernel parameter.

    这篇关于使用Rest High Level Client检索数据或将数据插入Elastic Search时发生SocketTimeoutException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆