Java:使用异步编程优化应用程序 [英] Java: Optimizing an application using asynchronous programming

查看:100
本文介绍了Java:使用异步编程优化应用程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须修改dropwizard应用程序以缩短其运行时间.基本上,此应用程序每天接收大约300万个URL,然后下载并解析它们以检测恶意内容.问题在于该应用程序只能处理100万个URL.当我查看该应用程序时,我发现它正在进行许多顺序调用.我想对如何通过使其成为异步或其他技术来改进应用程序提出一些建议.

I have to modify a dropwizard application to improve its running time. Basically, this application receives around 3 million URLs daily and downloads and parses them to detect for malicious content. The problem is that the application is only able to process 1 million URLs. When I looked at the application I found that it is making a lot of sequential calls. I would like some suggestion on how can I improve the application by making it asynchronous or other techniques.

所需的代码如下:-

/* Scheduler */
private long triggerDetection(String startDate, String endDate) {
for (UrlRequest request : urlRequests) {
                if (!validateRequests.isWhitelisted(request)) {
                    ContentDetectionClient.detectContent(request);
                }
            }
}

/* Client */
public void detectContent(UrlRequest urlRequest){
        Client client = new Client();
        URI uri = buildUrl(); /* It returns the URL of this dropwizard application's resource method provided below */

        ClientResponse response = client.resource(uri)
                .type(MediaType.APPLICATION_JSON_TYPE)
                .post(ClientResponse.class, urlRequest);

        Integer status = response.getStatus();
        if (status >= 200 && status < 300) {
            log.info("Completed request for url: {}", urlRequest.getUrl());

        }else{
            log.error("request failed for url: {}", urlRequest.getUrl());
        }
    }

    private URI buildUrl() {
        return UriBuilder
                .fromPath(uriConfiguration.getUrl())
                .build();
    }

/* Resource Method */
 @POST
    @Path("/pageDetection")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    /**
     * Receives the url of the publisher, crawls the content of that url, applies a detector to check if the content is malicious.
     * @returns returns the probability of the page being malicious
     * @throws throws exception if the crawl call failed
     **/
    public DetectionScore detectContent(UrlRequest urlRequest) throws Exception {

        return contentAnalysisOrchestrator.detectContentPage(urlRequest);
    }

/* Orchestrator */
public DetectionScore detectContentPage(UrlRequest urlRequest) {
        try {

            Pair<Integer, HtmlPage> response =  crawler.rawLoad(urlRequest.getUrl());
            String content =   response.getValue().text();

            DetectionScore detectionScore = detector.getProbability(urlRequest.getUrl(), content);
            contentDetectionResultDao.insert(urlRequest.getAffiliateId(), urlRequest.getUrl(),detectionScore.getProbability()*1000,
                    detectionScore.getRecommendation(), urlRequest.getRequestsPerUrl(), -1, urlRequest.getCreatedAt() );

            return detectionScore;

        } catch (IOException e) {
            log.info("Error while analyzing the url : {}", e);
            throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
        }
    }

我正在考虑以下方法:-

I was thinking of following approaches:-

  • 我不是通过POST调用dropwizard资源方法,而是直接从调度程序中调用orchestrator.detectContent(urlRequest).

协调器可以返回detectionScore,我将把所有detectScores存储在地图/表中并执行批处理数据库插入,而不是像当前代码中那样单独插入.

The orchestrator can return detectionScore and I'll store all the detectScores in a map/table and perform batch database insertion instead of individual insertion as in the present code.

我想对上述方法以及可能改善运行时间的其他技术发表一些评论.另外,我只是阅读了Java异步编程,但似乎无法理解如何在上面的代码中使用它,因此也希望对此有所帮助.

I would like some comments on the above approaches and possibly other techniques by which I can improve the running time. Also, I just read about Java asynchronous programming but can't seem to understand how to use it in the above code, so would like some help with this also.

谢谢.

我可以想到两个瓶颈:

I can think of two bottlenecks:

  • 网页下载
  • 将结果插入数据库(数据库位于另一个系统中)
  • 似乎一次处理了1个URL

系统具有8 GB的内存,其中4 GB似乎是可用的

The system has 8 GB of memory out of which 4 GB appears to be free

$ free -m
             total       used       free     shared    buffers     cached
Mem:          7843       4496       3346          0        193       2339
-/+ buffers/cache:       1964       5879 
Swap:         1952        489       1463 

CPU使用率也极小:

CPU usage is also minimal:

top - 13:31:19 up 19 days, 15:39,  3 users,  load average: 0.00, 0.00, 0.00
Tasks: 215 total,   1 running, 214 sleeping,   0 stopped,   0 zombie
Cpu(s):  0.5%us,  0.0%sy,  0.0%ni, 99.4%id,  0.1%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:   8031412k total,  4605196k used,  3426216k free,   198040k buffers
Swap:  1999868k total,   501020k used,  1498848k free,  2395344k cached

推荐答案

受戴维(伟大)的启发,这里是一个示例,使用简单反应(我写的我的库).请注意,使用客户端驱动服务器上的并发性稍有不同.

Inspired by Davide's (great) answer here is an example, easy way to parallelise this using simple-react (I library I wrote). Note it is slightly different, using the client to drive the concurrency on the server.

示例

LazyReact streamBuilder = new LazyReact(15,15);

streamBuilder.fromIterable(urlRequests)
      .filter(urlReq->!validateRequests.isWhitelisted(urlReq))
      .forEach(request -> {
           ContentDetectionClient.detectContent(request);
       });

说明

看起来您可以从客户端驱动并发.这意味着您可以在服务器端的线程之间分配工作,而无需执行其他工作.在此示例中,我们发出了15个并发请求,但您可以将其设置为接近服务器可以处理的最大值.您的应用程序是IO绑定,因此您可以使用很多线程来提高性能.

It looks like you can drive the concurrency from the client. Which means you can distribute the work across threads on the serverside with no additional work. In this example we are making 15 concurrent requests, but you could set it somewhere close to max the server can handle. Your application is IO Bound, so you can use a lot of threads to drive performance.

简单反应可作为期货之源.因此,在这里我们为对ContentDetection客户端的每次调用创建一个异步任务.我们有15个可用线程,因此可以一次对服务器进行15次调用.

simple-react works as a Stream of Futures. So here we create an Async task for each call to the ContentDetection client. We have 15 threads available, so 15 calls can be made at once to the server.

Java 7

有一个Java 7的JDK 8功能反向移植,称为 StreamSupport ,您也可以反向移植Lambda通过 RetroLambda 表示.

There is a backport of JDK 8 functionality for Java 7 called StreamSupport and you can also backport Lambda Expressions via RetroLambda.

要使用CompletableFutures实现相同的解决方案,我们可以为每个合格的URL创建一个Future Task. 更新我认为我们不需要批处理它们,我们可以使用执行器来限制活跃期货的数量.我们只需要最后加入他们.

To implement the same solution with CompletableFutures we can create a Future Task for each eligible URL. UPDATE I don't think we need to batch them, we can use the Executor to limit the number of active futures. We merely need to join on them all at the end.

   Executor exec = Executors.newFixedThreadPool(maxActive);//15 threads
   List<CompletableFuture<Void>> futures= new ArrayList<>();

   for (UrlRequest request : urlRequests) {
            if (!validateRequests.isWhitelisted(request)) {
                futures.add(CompletableFuture.runAsync(()->ContentDetectionClient.detectContent(request), exec));
            }
        }
 CompletableFuture.allOf(futures.toArray())
                      .join();

这篇关于Java:使用异步编程优化应用程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆