java实现 redis的发布订阅(简单易懂)

redis 的应用场景实在太多了,现在介绍一下它的几大特性之一   发布订阅(pub/sub)。

特性介绍:

  什么是 redis 的发布订阅(pub/sub)?   Pub/Sub 功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub 是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者 (如客户端) 以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者 (如服务器) 可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与 23 种设计模式中的观察者模式极为相似。 

     同样,Redis 的 pub/sub 是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,  Redis 作为一个 pub/sub 的 server, 在订阅者和发布者之间起到了消息路由的功能。

  如果没听懂上述的专业解释,没关系,其实我也没太听懂。

  简单来讲,这里面还有个 channel 的概念,这里就是频道的意思,比如你订阅了银行的频道,当你的资金发生变动时,银行就会通过它的频道给你发送信息,在这里,你是属于被动接收的,而不是向银行索要信息,这个例子中,你就是 sub(订阅者),而银行就是 pub(发布者)。

项目运用场景:

  一直都认为你会一样技术之前,都必须先明白这样一种技术在哪些地方会被用到,不能盲目的学东西。

  看到发布订阅的特性,用来做一个简单的实时聊天系统再适合不过了。这是其中之一,当然这样的东西,我们开发中很少涉及到。再举一个常用的,在我们的分布式架构中,常常会遇到读写分离的场景,在写入的过程中,就可以使用 redis 发布订阅,使得写入值及时发布到各个读的程序中,就保证数据的完整一致性。再比如,在一个博客网站中,有 100 个粉丝订阅了你,当你发布新文章,就可以推送消息给粉丝们拉。总之场景很多,需要去挖掘。。

回顾 java 如何操作 redis:

  redis 是一种缓存数据库,它也是 C/S 的结构,也就是客户端和服务端,一般来说,在 java 中,我们通常使用 jedis(客户端)去操作 redis(服务端),这其中操作的时候,两者之间肯定要建立连接,就像数据库链接一样,在关系型数据库中,我们一般都维护一个连接池,以达到链接的复用,来省去建立连接和关闭连接的时间。所以在 jedis 中,同样也存在一个 jedispool(jedis 连接池)的概念,我们都是从池中去取连接使用。

上代码:

想使用 jedis 先引入依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

 

建立一个 Publisher (发布者)

public class Publisher extends Thread{
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">final</span><span style="color: rgba(0, 0, 0, 1)"> JedisPool jedisPool;

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Publisher(JedisPool jedisPool) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.jedisPool =<span style="color: rgba(0, 0, 0, 1)"> jedisPool;
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() {
    BufferedReader reader </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> BufferedReader(<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> InputStreamReader(System.in));
    Jedis jedis </span>= jedisPool.getResource();   <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">连接池中取出一个连接</span>
    <span style="color: rgba(0, 0, 255, 1)">while</span> (<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">) {
        String line </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
        </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
            line </span>=<span style="color: rgba(0, 0, 0, 1)"> reader.readLine();
            </span><span style="color: rgba(0, 0, 255, 1)">if</span> (!"quit"<span style="color: rgba(0, 0, 0, 1)">.equals(line)) {
                jedis.publish(</span>"mychannel", line);   <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">从 mychannel 的频道上推送消息</span>
            } <span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
                </span><span style="color: rgba(0, 0, 255, 1)">break</span><span style="color: rgba(0, 0, 0, 1)">;
            }
        } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (IOException e) {
            e.printStackTrace();
        }
    }
}

}

再建立一个订阅者

public class Subscriber extends JedisPubSub {
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Subscriber(){}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> onMessage(String channel, String message) {       <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">收到消息会调用</span>
    System.out.println(String.format("receive redis published message, channel %s, message %s"<span style="color: rgba(0, 0, 0, 1)">, channel, message));
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> onSubscribe(String channel, <span style="color: rgba(0, 0, 255, 1)">int</span> subscribedChannels) {    <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">订阅了频道会调用</span>
    System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d"<span style="color: rgba(0, 0, 0, 1)">,
            channel, subscribedChannels));
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> onUnsubscribe(String channel, <span style="color: rgba(0, 0, 255, 1)">int</span> subscribedChannels) {   <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">取消订阅 会调用</span>
    System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d"<span style="color: rgba(0, 0, 0, 1)">,
            channel, subscribedChannels));

}

}

这里订阅者需要继承 JedisPubSub,来重写它的三个方法。用途 注释上已经写了,很简单。

我们这里只是定义了一个订阅者,下面去订阅频道。

public class SubThread extends Thread {
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">final</span><span style="color: rgba(0, 0, 0, 1)"> JedisPool jedisPool;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">final</span> Subscriber subscriber = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Subscriber();

</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">final</span> String channel = "mychannel"<span style="color: rgba(0, 0, 0, 1)">;

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> SubThread(JedisPool jedisPool) {
    </span><span style="color: rgba(0, 0, 255, 1)">super</span>("SubThread"<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.jedisPool =<span style="color: rgba(0, 0, 0, 1)"> jedisPool;
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() {
    System.out.println(String.format(</span>"subscribe redis, channel %s, thread will be blocked"<span style="color: rgba(0, 0, 0, 1)">, channel));
    Jedis jedis </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
        jedis </span>= jedisPool.getResource();   <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">取出一个连接</span>
        jedis.subscribe(subscriber, channel);    <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">通过subscribe 的api去订阅,入参是订阅者和频道名</span>
    } <span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
        System.out.println(String.format(</span>"subsrcibe channel error, %s"<span style="color: rgba(0, 0, 0, 1)">, e));
    } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
        </span><span style="color: rgba(0, 0, 255, 1)">if</span> (jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
            jedis.close();
        }
    }
}

}

最后,再写一个测试类去跑一下。键盘输入消息,订阅者就会触发 onMessage 方法

public class PubSubDemo {
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> main( String[] args )
{
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 连接redis服务端</span>
    JedisPool jedisPool = <span style="color: rgba(0, 0, 255, 1)">new</span> JedisPool(<span style="color: rgba(0, 0, 255, 1)">new</span> JedisPoolConfig(), "127.0.0.1", 6379<span style="color: rgba(0, 0, 0, 1)">);
    
    System.out.println(String.format(</span>"redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 6379<span style="color: rgba(0, 0, 0, 1)">));

    SubThread subThread </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> SubThread(jedisPool);  <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">订阅者</span>

subThread.start();

    Publisher publisher </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Publisher(jedisPool);    <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">发布者</span>

publisher.start();
}
}

看打印结果