服务器与Jersey发送事件:客户端丢弃后,EventOutput未关闭 [英] Server sent event with Jersey: EventOutput is not closed after client drops

查看:184
本文介绍了服务器与Jersey发送事件:客户端丢弃后,EventOutput未关闭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用jersey来实现SSE场景。

I am using jersey to implement a SSE scenario.

服务器保持连接活动。并定期将数据推送到客户端。

The server keeps connections alive. And push data to clients periodically.

在我的方案中,存在连接限制,只有一定数量的客户端可以同时订阅服务器。

In my scenario, there is a connection limit, only a certain number of clients can subscribe to the server at the same time.

因此,当新客户端尝试订阅时,我会进行检查(EventOutput.isClosed)以查看是否有任何旧连接不再处于活动状态,因此可以为新客户腾出空间连接。

So when a new client is trying to subscribe, I do a check(EventOutput.isClosed) to see if any old connections are not active anymore, so they can make room for new connections.

但EventOutput.isClosed的结果始终为false,除非客户端显式调用EventSource的close。这意味着如果客户意外掉线(断电或互联网截止),它仍然会占用连接,新客户无法订阅。

But the result of EventOutput.isClosed is always false, unless the client explicitly calls close of EventSource. This means that if a client drops accidentally(power outage or internet cutoff), it's still hogging the connection, and new clients can not subscribe.

是否有解决方法这个?

推荐答案

@CuiPengFei,

@CuiPengFei,

所以在我的旅行中尝试为了找到自己的答案,我偶然发现了一个存储库,它解释了如何正常处理来自断开连接的客户端的连接。

So in my travels trying to find an answer to this myself I stumbled upon a repository that explains how to handle gracefully cleaning up the connections from disconnected clients.

将所有SSE EventOutput逻辑封装到服务/管理器中。在这里,他们启动一个线程,检查客户端是否已关闭EventOutput。如果是这样,他们正式关闭连接(EventOutput#close())。如果不是,他们会尝试写入流。如果它抛出一个Exception,那么客户端在没有关闭的情况下断开连接并处理它的关闭。如果写入成功,则EventOutput将返回到池,因为它仍然是活动连接。

The encapsulate all of the SSE EventOutput logic into a Service/Manager. In this they spin up a thread that checks to see if the EventOutput has been closed by the client. If so they formally close the connection (EventOutput#close()). If not they try to write to the stream. If it throws an Exception then the client has disconnected without closing and it handles closing it. If the write is successful then the EventOutput is returned to the pool as it is still an active connection.

回购(及实际类别)可用此处。如果删除了回购,我还包括没有进口的类。

The repo (and the actual class) are available here. Ive also included the class without imports below in case the repo is ever removed.

请注意,它们将此绑定到Singleton。该商店应该是全球唯一的。

public class SseWriteManager {

private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>();

private final ScheduledExecutorService messageExecutorService;

private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class);

public SseWriteManager() {
    messageExecutorService = Executors.newScheduledThreadPool(1);
    messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS);
}

public void addSseConnection(String id, EventOutput eventOutput) {
    logger.info("adding connection for id={}.", id);
    connectionMap.put(id, eventOutput);
}

private class messageProcessor implements Runnable {
    @Override
    public void run() {
        try {
            Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator();
            while (iterator.hasNext()) {
                boolean remove = false;
                Map.Entry<String, EventOutput> entry = iterator.next();
                EventOutput eventOutput = entry.getValue();
                if (eventOutput != null) {
                    if (eventOutput.isClosed()) {
                        remove = true;
                    } else {
                        try {
                            logger.info("writing to id={}.", entry.getKey());
                            eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build());
                        } catch (Exception ex) {
                            logger.info(String.format("write failed to id=%s.", entry.getKey()), ex);
                            remove = true;
                        }
                    }
                }
                if (remove) {
                    // we are removing the eventOutput. close it is if it not already closed.
                    if (!eventOutput.isClosed()) {
                        try {
                            eventOutput.close();
                        } catch (Exception ex) {
                            // do nothing.
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (Exception ex) {
            logger.error("messageProcessor.run threw exception.", ex);
        }
    }
}

public void shutdown() {
    if (messageExecutorService != null && !messageExecutorService.isShutdown()) {
        logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown.");
        messageExecutorService.shutdown();
    } else {
        logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown().");
    }

}} 

这篇关于服务器与Jersey发送事件:客户端丢弃后,EventOutput未关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆