浅析Redis发布订阅机制及其Java实现

  Redis 是一个开源的内存数据库,它以键值对的形式存储数据。由于数据存储在内存中,因此 Redis 的速度很快,但是每次重启 Redis 服务时,其中的数据也会丢失,因此,Redis 也提供了持久化存储机制,将数据以某种形式保存在文件中,每次重启时,可以自动从文件加载数据到内存当中

  Redis 的架构包括两个部分:Redis Client 和 Redis Server。Redis 客户端负责向服务器端发送请求并接受来自服务器端的响应。服务器端负责处理客户端请求,例如,存储数据,修改数据等。 Redis 通常用作数据库,缓存以及消息系统。

一、Redis 发布订阅机制

1、发布订阅架构

  Redis 提供了发布订阅功能,可以用于消息的传输,Redis 的发布订阅机制包括三个部分:发布者,订阅者和 Channel。

  • PUBLISH 命令向通道发送信息,此客户端称为 publisher 发布者;
  • SUBSCRIBE 向命令通道订阅信息,此客户端称为 subscriber 订阅者;
  • redis 中 发布订阅模块的名字叫着 PubSub,也就是 PublisherSubscriber;
  • 一个发布者向一个通道发送消息,订阅者可以向多个通道订阅消息;当发布者向通道发布消息后,如果有订阅者订阅该通道,订阅者就会收到消息;这有点像电台,我收听了一个电台的频道,当频道发送消息后,我就能收到消息;

  发布者和订阅者都是 Redis 客户端,Channel 则为 Redis 服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis 的这种发布订阅机制与基于主题的发布订阅类似,Channel 相当于主题。

2、PUBSub 模块命令

  • subscribe: 订阅一个或者多个频道;
  • unsubscribe: 退订一个或者多个频道;
  • publish: 向通道发送消息;
  • psubscribe: 订阅给定模式相匹配的所有频道;
  • punsubscribe: 退订 给定模式所有的频道,若未指定模式,退订所有频道;

  具体的命令使用方式 可以使用 help 命令,示例如下:help subscribe

3、发布订阅功能

(1)发送消息 :Redis 采用 PUBLISH 命令发送消息,其返回值为接收到该消息的订阅者的数量。

(2)订阅某个频道:Redis 采用 SUBSCRIBE 命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中 subscribe 表示已经成功订阅了某个频道。

(3)模式匹配 :模式匹配功能允许客户端订阅符合某个模式的频道,Redis 采用 PSUBSCRIBE 订阅符合某个模式所有频道,用“”表示模式,“”可以被任意值代替。

127.0.0.1:6379> publish news.1 first
(integer) 1
127.0.0.1:6379> publish news.2 second
(integer) 1

  假设客户端同时订阅了某种模式和符合该模式的某个频道,那么发送给这个频道的消息将被客户端接收到两次,只不过这两条消息的类型不同,一个是 message 类型,一个是 pmessage 类型,但其内容相同

(4)取消订阅 :Redis 采用 UNSUBSCRIBE 和 PUNSUBSCRIBE 命令取消订阅,其返回值与订阅类似。

  由于Redis 的订阅操作是阻塞式的,因此一旦客户端订阅了某个频道或模式,就将会一直处于订阅状态直到退出。在 SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE 和 PUNSUBSCRIBE 命令中,其返回值都包含了该客户端当前订阅的频道和模式的数量,当这个数量变为 0 时,该客户端会自动退出订阅状态。

4、发布订阅实现

  由于 Redis 是一个开源的系统,因此我们可以通过其源代码查看内部的实现细节。

(1)SUBSCRIBE

  当客户端订阅某个频道时,Redis 需要将该频道和该客户端绑定。

  首先,在客户端结构体 client 中,有一个属性为 pubsub_channels,该属性表明了该客户端订阅的所有频道,它是一个字典类型,通过哈希表实现,其中的每个元素都包含了一个键值对以及指向下一个元素的指针,每次订阅都要向其中插入一个结点,键表示订阅的频道,值为空。

  然后,在表示服务器端的结构体 redisServer 中,也有一个属性为 pubsub_channels,但此处它表示的是该服务器端中的所有频道以及订阅了这个频道的客户端,它也是一个字典类型,插入结点时,键表示频道,值则是订阅了这个频道的所有客户端组成的链表。

  最后 Redis 通知客户端其订阅成功。

(2)PSUBSCRIBE

  当客户端订阅某个模式时,Redis 同样需要将该模式和该客户端绑定。

  首先,在结构体 client 中,有一个属性为 pubsub_patterns,该属性表示该客户端订阅的所有模式,它是一个链表类型,每个结点包括了订阅的模式和指向下一个结点的指针,每次订阅某个模式时,都要向其中插入一个结点。

  然后,在结构体 redisServer 中,有一个属性也叫 pubsub_patterns,它表示了该服务器端中的所有模式和订阅了这些模式的客户端,它也是一个链表类型,插入结点时,每个结点都要包含订阅的模式,以及订阅这个模式的客户端,和指向下一个结点的指针。

(3)PUBLISH

  当客户端向某个频道发送消息时,Redis 首先在结构体 redisServer 中的 pubsub_channels 中找出键为该频道的结点,遍历该结点的值,即遍历订阅了该频道的所有客户端,将消息发送给这些客户端。

  然后,遍历结构体 redisServer 中的 pubsub_patterns,找出包含该频道的模式的结点,将消息发送给订阅了该模式的客户端。

5、发布订阅在 Redis 中的应用

  Redis 的发布订阅功能与 Redis 中的数据存储是无关的,它不会影响 Redis 的 key space,即不会影响 Redis 中存储的数据,但通过发布订阅机制,Redis 还提供了另一个功能,即 Keyspace Notification,允许客户端通过订阅特定的频道,从而得知是否有改变 Redis 中的数据的事件。

  例如,有一个客户端删除了 Redis 中键为 mykey 的数据,该操作会触发两条消息,mykey del 和 del mykey,前者属于频道 keysapce,表示 keyspace 发生的变化,后者属于频道 keyevent,表示执行的操作。

6、Redis 发布订阅与 ActiveMQ 的比较

(1)ActiveMQ 支持多种消息协议,包括 AMQP,MQTT,Stomp 等,并且支持 JMS 规范,但 Redis 没有提供对这些协议的支持;

(2)ActiveMQ 提供持久化功能,但 Redis 无法对消息持久化存储,一旦消息被发送,如果没有订阅者接收,那么消息就会丢失;

(3)ActiveMQ 提供了消息传输保障,当客户端连接超时或事务回滚等情况发生时,消息会被重新发送给客户端,Redis 没有提供消息传输保障。

  总之,ActiveMQ 所提供的功能远比 Redis 发布订阅要复杂,毕竟 Redis 不是专门做发布订阅的,但是如果系统中已经有了 Redis,并且需要基本的发布订阅功能,就没有必要再安装 ActiveMQ 了,因为可能 ActiveMQ 提供的功能大部分都用不到,而 Redis 的发布订阅机制就能满足需求。

二、Java 实现

  定义 2 个订阅者用于订阅频道的消息,在使用 Jedis 时需要继承 JedisPubSub 类, 并重写 onMessage 方法; 订阅者可以在该方法里面进行消息的业务逻辑处理;

  注意 redis 的 发布订阅模式 是阻塞模式 ,一个订阅者需要 重新起一个线程

  缺点:

(1)PubSub 的生产者来一个消息会直接传递给消费者。如果没有消费者,消息会直接丢弃。如果有多个消费者,一个消费者突然挂掉,生产者会继续发送消息,另外的消费者可以持续收到消息。但是挂掉的消费者重新连上后,断连期间的消息会彻底丢失;

(2)如果 Redis 停机重启,PubSub 的消息是不会持久化的。

  首先我们创建两个客户端执行体:第 2 个是一样的创建即可

package com.example.redisdemo.service;
import redis.clients.jedis.JedisPubSub;

// 订阅消息消费体
public class OneJedisPubSub extends JedisPubSub {
//接收到消息时执行
@Override
public void onMessage(String channel, String message){
System.out.println(
"oneJedisPubSub message is" + message);
}
//接收到模式消息时执行
@Override
public void onPMessage(String pattern, String channel, String message){
System.out.println(
"oneJedisPubSub pattern 是"+pattern+"channel 是"+channel + "message 是" + message);
}
//订阅时执行
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(
"oneJedisPubSub 订阅成功");
}
//取消订阅时执行
@Override
public void onUnsubscribe(String channel, int subscribedChannels){
System.out.println(
"oneJedisPubSub 取消订阅"+channel);
}
//取消模式订阅时执行
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println(
"oneJedisPubSub 取消多订阅"+pattern);
}
}

  然后我们开始给这两个客户端订阅消息

@RestController
@RequestMapping("test")
@Slf4j
public class TestController {
    @Autowired
    private RedisClient redisClient;
    private final OneJedisPubSub oneJedisPubSub = new OneJedisPubSub();
    private final SecondJedisPubSub secondJedisPubSub = new SecondJedisPubSub();
@PostMapping(</span>"subscribe"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)">  subscribe(@RequestBody QueueTest queueTest){
    </span><span style="color: rgba(0, 0, 255, 1)">new</span> Thread(<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Runnable() {
        @Override
        </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() {
            </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>("1"<span style="color: rgba(0, 0, 0, 1)">.equals(queueTest.getTopic())){
                    redisClient.subscribe(oneJedisPubSub,</span>"topic1","topic2"<span style="color: rgba(0, 0, 0, 1)">);
                }
                </span><span style="color: rgba(0, 0, 255, 1)">if</span>("2"<span style="color: rgba(0, 0, 0, 1)">.equals(queueTest.getTopic())){
                    redisClient.subscribe(secondJedisPubSub,</span>"topic2"<span style="color: rgba(0, 0, 0, 1)">);
                }
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
                e.printStackTrace();
            }
        }
    }).start();
}

}

  请求情况如下图:

  请求结果如图:

  可以看出我们将两个客户端都订阅了一定 channel,此时 OneJedisPubSub 订阅了 topic1 和 topic2,SecondJedisPubSub 订阅了 topic2,我们尝试推送消息,demo 如下:

    @PostMapping("push")
    public void push(@RequestBody QueueTest queueTest){
        log.info("发布一条消息");
        Long publish = redisClient.publish(queueTest.getTopic(), queueTest.getName());
        System.out.println("消费者数量"+publish);}

  可以看到我们往 topic1 发布了消息只有 OneJedisPubSub 接收到了消息,接下来我们往 topic2 发布消息

  可以看到此时两个客户端都接收到了消息。

  在测试完毕客户端接收消息的能力,我们这时取消 SecondJedisPubSub 订阅 topic2,demo 如下:

    @PostMapping("unno")
    public void  unno(@RequestBody QueueTest queueTest){
        log.info("取消订阅消息");
        try {secondJedisPubSub.unsubscribe(queueTest.getTopic());
        } catch (Exception e) {e.printStackTrace();
        }
    }

  在取消后我们再往 topic2 推送消息,可以看到只有一个客户端接收消息。

  至此我们实验了大部分场景,至于模式订阅由于贴图太麻烦,我就将代码提供出来,大家可以自己实验:

@PostMapping("subscribe")
    public void  subscribe(@RequestBody QueueTest queueTest){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if("1".equals(queueTest.getTopic())){
                        redisClient.pubsubPattern(oneJedisPubSub,"topic*");}
                    if("2".equals(queueTest.getTopic())){
                        redisClient.subscribe(secondJedisPubSub,"topic2");}
                } catch (Exception e) {e.printStackTrace();
                }
            }
        }).start();
    }
@PostMapping("unno")
    public void  unno(@RequestBody QueueTest queueTest){
        log.info("取消模式订阅消息");
        try {secondJedisPubSub.punsubscribe(queueTest.getTopic());
        } catch (Exception e) {e.printStackTrace();
        }
    }

  最后将 redisclient 的代码提供给大家

@Component("redisClient")
@Slf4j
public class RedisClient {
    @Resource
    private JedisPool jedisPool;
   </span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 发布消息
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> topic
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> message
 </span><span style="color: rgba(0, 128, 0, 1)">*/</span>
<span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Long publish(String topic,String message){
    Jedis jedis </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
        jedis </span>=<span style="color: rgba(0, 0, 0, 1)"> jedisPool.getResource();
        </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> jedis.publish(topic, message);
    } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
        </span><span style="color: rgba(0, 0, 255, 1)">throw</span><span style="color: rgba(0, 0, 0, 1)"> e;
    } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
        </span><span style="color: rgba(0, 0, 255, 1)">if</span>(jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">){
            jedis.close();
        }
    }
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 订阅消息
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> jedisPubSub
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> topics
 </span><span style="color: rgba(0, 128, 0, 1)">*/</span>
<span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> subscribe(JedisPubSub jedisPubSub, String... topics) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    Jedis jedis </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
        jedis </span>=<span style="color: rgba(0, 0, 0, 1)"> jedisPool.getResource();
        jedis.subscribe(jedisPubSub,topics);
    } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
        </span><span style="color: rgba(0, 0, 255, 1)">throw</span><span style="color: rgba(0, 0, 0, 1)"> e;
    } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
        </span><span style="color: rgba(0, 0, 255, 1)">if</span>(jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">){
            jedis.close();
        }

    }
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 模式匹配订阅消息
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> topic
 </span><span style="color: rgba(0, 128, 0, 1)">*/</span>
<span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> pubsubPattern(JedisPubSub jedisPubSub,String topic){
    Jedis jedis </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
        jedis </span>=<span style="color: rgba(0, 0, 0, 1)"> jedisPool.getResource();
        jedis.psubscribe(jedisPubSub,topic);
    } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
        </span><span style="color: rgba(0, 0, 255, 1)">throw</span><span style="color: rgba(0, 0, 0, 1)"> e;
    } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
        </span><span style="color: rgba(0, 0, 255, 1)">if</span>(jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">){
            jedis.close();
        }
    }
}

}

  以上就是 reids 的发布订阅功能,代码部分来自文章:https://zhuanlan.zhihu.com/p/136484218

三、主要命令及其原理

  首先介绍一下实现功能的主要几个命令:

  1. SUBSCRIBE 命令,这个命令可以让我们订阅任意数量的频道
  2. PUBLISH 命令,此命令是用来发布消息
  3. PSUBSCRIBE 命令,此命令用来支持模糊订阅的功能

  在展示具体的 demo 之前,我们先简单了解下这其中的原理:

  在 redisServer 结构中的其中一个属性 pubsub_channels 是用来记录 channel 和客户端之间的关系,是使用 key-->List 的数据格式。如图:

  在我们使用 SUBSCRIBE 命令在客户端 client10086 订阅了 channel1 channel2,channel3

1、订阅:SUBSCRIBE channel1 channel2,channel3

  这时 pubsub_channels 的数据将会变为,如图:

  这就可以看出来执行 SUBSCRIBE 命令就是将客户端信息添加到对应的 channel 对应列表的尾部。

2、模式订阅: 模式订阅设计到 redisServer 的另一个属性 pubsub_patterns,也是一个链表,里面存储着客户端订阅的所有模式。结构如下图:

  当客户端订阅了一个模式,此时结构变为:

3、发布:PUBLISH 命令发布消息将消息推送到对应的客户端

  在执行 PUBLISH 命令发布消息的时候,首先会在 pubsub_channels 上找到对应的 channel, 遍历其中所有的 client 信息,将消息发送到所有 client;同时也会在 pubsub_patterns 上遍历找到匹配的模式,发给对应的客户端

4、取消订阅:UNSUBSCRIBE 命令取消对应客户端的订阅

  当执行 UNSUBSCRIBE 命令时则将对应的 client 从 channel 列表中移除。