用相同的线程Jedis订阅多个频道 [英] Subscribe to multiple channels with same thread Jedis

查看:574
本文介绍了用相同的线程Jedis订阅多个频道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用Redis发布/订阅在Java中使用Jedis客户端在客户端之间传输消息的应用程序.我希望能够在用户键入命令时在运行时订阅频道,但是由于订阅是一项阻塞操作,因为它在调用订阅的线程上进行侦听,所以我不确定以后如何订阅其他频道在原始线程上.

I have an application that uses Redis publish/subscribe to transfer messages between clients using the Jedis client in Java. I want to be able to subscribe to channels at runtime when the user types a command but as subscribe is a blocking operation as it listens on the thread that calls subscribe I'm not sure how to go about subscribing to other channels at a later date on the original thread.

示例:

private PubSubListener psl = new PubSubListener();

public void onCommand(String[] args) {
    subscribe(args[0]);
}

public void subscribe(String channel) {
    Jedis jedis = jedisPool.getResource();

    jedis.subscribe(psl, channel);
}

这将起作用,除了用于调度该命令的线程将用于轮询Redis,而我将无法使用该线程订阅更多的频道.

This would work except the thread that dispatches the command would then be used to poll Redis and I wouldn't be able to subscribe to any more channels with that thread.

推荐答案

我发现了同样的问题,即订阅后订阅线程就会阻塞.为了解决这个问题,我使用 Netty 实现了一个优化的发布/订阅客户端,并将其合并到Jedis分支

I observed the same problem, namely that the subscribing thread blocks once you subscribe. To address this, I implemented an optimized pub/sub client using Netty and incorporated it into a Jedis fork here. It's not a comprehensive solution and I have not had time to really finish it up, but it works for basic channel and pattern subscriptions.The basics are:

使用以下命令获取pubsub实例:

Acquire a pubsub instance using:

public static OptimizedPubSub getInstance(String host, int port, String auth, long timeout)

使用以下方式发布/取消模式订阅:

Issue/Cancel pattern subscriptions using:

public ChannelFuture psubscribe(String... patterns)
public ChannelFuture punsubscribe(String... patterns)

您可以忽略返回的ChannelFuture,除非您想100%确定您的请求通过了(这是紧急的).

you can ignore the returned ChannelFuture unless you want to make 100% certain your request gets through (it's asynch).

使用以下方式发布/取消频道订阅:

Issue/Cancel channel subscriptions using:

public ChannelFuture subscribe(String... channels)
public ChannelFuture unsubscribe(String... channels)

然后实现SubListener实例:

Then implement SubListener instances:

public interface SubListener {
    /**
     * Callback when a message is published on a subscribed channel
     * @param channel The channel the message was received on
     * @param message The received message
     */
    public void onChannelMessage(String channel, String message);

    /**
     * Callback when a message is published on a subscribed channel matching a subscribed pattern
     * @param pattern The pattern that the channel matched
     * @param channel The channel the message was received on
     * @param message The received message
     */
    public void onPatternMessage(String pattern, String channel, String message);
}

并使用以下命令注册/取消注册侦听器:

and register/unregister the listeners using:

public void registerListener(SubListener listener)
public void unregisterListener(SubListener listener)

OptimizedPubSub从不阻止,并且事件不会异步传递到已注册的SubListeners.

OptimizedPubSub never blocks and events are delivered to the registered SubListeners asynchronously.

fork现在有点旧了,因此它在当前形式中可能对您没有用,但是您可以轻松地从该包中提取源代码并独立构建它.依赖关系是Jedis和Netty.

The fork is a bit old now, so it may not be useful to you in its current form, but you can easily pull the source in that package and build it stand-alone. The dependencies are Jedis and Netty.

对不起,我没有更全面的解决方案.

Sorry I did not have a more comprehensive solution.

这篇关于用相同的线程Jedis订阅多个频道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆