如何为客户端添加隐式批处理? [英] How to add batching implicit for client?

查看:109
本文介绍了如何为客户端添加隐式批处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我们考虑以下代码:

客户代码:

public class MyClient {
    private final MyClientSideService myClientSideService;

    public MyClient(MyClientSideService myClientSideService) {
        this.myClientSideService = myClientSideService;
    }

    public String requestRow(Integer req) {
        return myClientSideService.requestSingleRow(req);
    }
}

客户端服务:

public class MyClientSideService {
    private final MyServerSideService myServerSideService;

    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        return myServerSideService.requestRowBatch(Arrays.asList(req)).get(0);
    }
}

服务器端服务:

@Slf4j
public class MyServerSideService {
    //single threaded bottleneck service
    public synchronized List<String> requestRowBatch(List<Integer> batchReq) {
        log.info("Req for {} started");
        try {
            Thread.sleep(100);
            return batchReq.stream().map(String::valueOf).collect(Collectors.toList());

        } catch (InterruptedException e) {
            return null;
        } finally {
            log.info("Req for {} finished");

        }
    }
}

主要:

@Slf4j
public class MainClass {
    public static void main(String[] args) {
        MyClient myClient = new MyClient(new MyClientSideService(new MyServerSideService()));
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int m = 0; m < 100; m++) {
                    int k = m;
                    log.info("Response is {}", myClient.requestRow(k));
                }
            }).start();
        }
    }
}

根据日志,大约需要4分22秒,但这太费时间了.我认为它可能会大大改善.我想实现隐式批处理.因此,MyClientSideService应该收集请求,当它变为50(它是预先配置的批处理大小)或某个预先配置的超时到期时,则请求MyServerSideService并将结果返回给客户端.协议应该是同步的,因此必须阻止客户端,直到获得结果为止.

According the logs it takes approximately 4 min 22 sec but it too much. Ithink it might be improved dramatically. I would like to implement implicit batching. So MyClientSideService should collect requests and when it becomes 50(it is preconfigured batch size) or some preconfigured timeout expired then to request MyServerSideService and back route result to the clients. Protocol should be synchronous so clients must be blocked until result getting.

我尝试使用CountDownLatch es和CyclicBarrier s编写代码,但是我的尝试远未成功.

I tried to write code using CountDownLatches and CyclicBarriers but my attempts were far from success.

我如何实现我的目标?

如果要替换requestRowBatch,则将类型List<String>从转换为Map<Integer, String>,以将请求和响应映射委派给服务器,并带有限制.仅当我发送< = 25请求

If to replace requestRowBatch return type List<String> from to Map<Integer, String> to delegate request and response mapping to server following works with limititations. It works only if I send <=25 requests

@Slf4j
public class MyClientSideService {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Integer> queue = new ArrayBlockingQueue(batchSize);
    private final Map<Integer, String> responseMap = new ConcurrentHashMap();
    private final AtomicBoolean started = new AtomicBoolean();

    private CountDownLatch startBatchRequestLatch = new CountDownLatch(batchSize);
    private CountDownLatch awaitBatchResponseLatch = new CountDownLatch(1);


    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        queue.offer(req);
        if (!started.compareAndExchange(false, true)) {
            log.info("Start batch collecting");
            startBatchCollecting();
        }
        startBatchRequestLatch.countDown();
        try {
            log.info("Awaiting batch response latch for {}...", req);
            awaitBatchResponseLatch.await();
            log.info("Finished awaiting batch response latch for {}...", req);
            return responseMap.get(req);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return "EXCEPTION";
        }
    }

    private void startBatchCollecting() {
        new Thread(() -> {
            try {
                log.info("Await startBatchRequestLatch");
                startBatchRequestLatch.await(maxTimeoutMillis, TimeUnit.MILLISECONDS);
                log.info("await of startBatchRequestLatch finished");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            responseMap.putAll(requestBatch(queue));
            log.info("Releasing batch response latch");
            awaitBatchResponseLatch.countDown();

        }).start();
    }

    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}

更新

根据麦芽的答案,我能够得出以下结论:

Update

According Malt answer I was able to develop following:

@Slf4j
public class MyClientSideServiceCompletableFuture {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Pair<Integer, CompletableFuture>> queue = new ArrayBlockingQueue(batchSize);
    private final AtomicInteger counter = new AtomicInteger(0);
    private final Lock lock = new ReentrantLock();

    public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();
        lock.lock();
        try {
            queue.offer(Pair.of(req, future));
            int counter = this.counter.incrementAndGet();
            if (counter != 0 && counter % batchSize == 0) {
                log.info("request");
                List<Integer> requests = queue.stream().map(p -> p.getKey()).collect(Collectors.toList());
                Map<Integer, String> serverResponseMap = requestBatch(requests);
                queue.forEach(pair -> {
                    String response = serverResponseMap.get(pair.getKey());
                    CompletableFuture<String> value = pair.getValue();
                    value.complete(response);
                });
                queue.clear();
            }
        } finally {
            lock.unlock();
        }
        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }


    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}

但是,如果大小不是批处理大小的倍数,则无法使用

But it doesn't work if size is not multiple of batch size

推荐答案

您可以使用CompletableFuture.
让调用MyClientSideService的线程将其请求放在Queue(可能是BlockingQueue)中,并获得新的CompletableFuture作为返回.调用线程可以调用CompletableFuture.get()进行阻塞,直到结果准备就绪或继续执行其他东西.

You could use CompletableFuture.
Have threads calling MyClientSideService put their request in a Queue (possibly BlockingQueue, and get a new CompletableFuture in return. The calling thread can call CompletableFuture.get() to block until a result is ready, or go on doing other things.

CompletableFuture将与请求一起存储在MyClientSideService中.当您收到50个请求(因此有50个CompletableFuture实例)时,请客户端服务发送批处理请求.

That CompletableFuture will be stored together with the request in MyClientSideService. When you reach 50 requests (and therefore 50 CompletableFuture instances), have the client service send the batch request.

请求完成后,使用队列中每个ComplatableFuture实例的CompletableFuture.complete(value)方法来通知客户端线程响应已准备就绪.如果客户端调用了CompletableFuture.get()之类的阻塞方法,则将取消阻塞该客户端,或者如果稍后调用该客户端,将使它立即返回值.

When the request is complete, use the CompletableFuture.complete(value) method of each ComplatableFuture instance in the queue to notify the client thread that the response is ready. This will unblock the client if it has called blocking method like CompletableFuture.get(), or make it return instantly with value if called later.

这篇关于如何为客户端添加隐式批处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆