Redis之上的分布式Java队列

最近学习的势头大涨,码了很多干货。分享给大家参考学习!

通过优锐课的 java 学习笔记中,了解到关于让我们使用 Redisson Java 框架讨论六种不同类型的基于 Redis 的分布式队列。

 

1、在Redis 中使用队列

Redis 是一个功能强大的工具,支持从字符串和列表到映射和流的许多不同类型的数据结构。 开发人员将 Redis 用于多种目的,包括用于数据库,缓存和消息代理。

像任何消息代理一样,Redis 需要以正确的顺序发送消息。 可以根据消息的年龄或某些其他预定义的优先级等级发送消息。

为了存储这些未决消息,Redis 开发人员需要队列数据结构。 Redisson 是使用 Redis 和 Java 进行分布式编程的框架,它提供了许多分布式数据结构(包括队列)的实现。

Redisson 通过提供 Java API 使 Redis 开发更加容易。 Redisson 不需要开发人员学习 Redis 命令,而是包括所有众所周知的 Java 接口,例如 Queue 和 BlockingQueue。 Redisson 还处理 Redis 中繁琐的幕后工作,例如连接管理,故障转移处理和数据序列化。

2、基于Redis 的分布式 Java 队列

Redisson 提供了 Java 中基本队列数据结构的多个基于 Redis 的实现,每种实现都有不同的功能。 这使可以选择最适合目的的队列类型。

下面,我们将使用Redisson Java 框架讨论六种不同类型的基于 Redis 的分布式队列。

3、队列

Redisson 中的 RQueue 对象实现了 java.util.Queue 接口。 队列用于需要从最早的最早的元素开始处理(也称为“先进先出”或 FIFO)的情况。

与普通Java 一样,可以使用 peek()方法检查 RQueue 的第一个元素,或者使用 poll()方法检查和删除 RQueue 的第一个元素:

1 RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
2 
3 queue.add(new SomeObject());
4 
5 SomeObject obj = queue.peek();
6 
7 SomeObject someObj = queue.poll();

 

4、阻塞队列

Redisson 中的 RBlockingQueue 对象实现了 java.util.BlockingQueue 接口。

BlockingQueues 是阻塞线程的队列,这些线程试图从空队列中进行轮询,或者试图在已满的队列中插入元素。 该线程将被阻塞,直到另一个线程将一个元素插入到空队列中,或从完整队列中轮询第一个元素为止。

下面的示例代码演示了RBlockingQueue 的正确实例化和使用。 特别是,可以使用参数指定对象将等待线程变得可用的时间来调用poll()方法:

1 RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
2 
3 queue.offer(new SomeObject());
4 
5 SomeObject obj = queue.peek();
6 
7 SomeObject someObj = queue.poll();
8 
9 SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

 

在故障转移或重新连接到Redis 服务器的过程中,将自动重新预订 poll(),pollFromAny(),pollLastAndOfferFirstTo()和 take()Java 方法。

5、BoundedBlockingQueue

Redisson 中的 RBoundedBlockingQueue 对象实现了有界的阻塞队列结构。 有界阻塞队列是容量已受限制(即有限)的阻塞队列。

以下代码演示了如何在Redisson 中实例化和使用 RBoundedBlockingQueue。 trySetCapacity()方法用于尝试设置阻塞队列的容量。 trySetCapacity()返回布尔值“ true”或“ false”,这取决于是否成功设置了容量或是否已经设置了容量:

 1 RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
 2 
 3 queue.trySetCapacity(2);
 4 
 5 queue.offer(new SomeObject(1));
 6 
 7 queue.offer(new SomeObject(2));
 8 
 9 // will be blocked until free space available in queue
10 
11 queue.put(new SomeObject());
12 
13 SomeObject obj = queue.peek();
14 
15 SomeObject someObj = queue.poll();
16 
17 SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

 

6、延迟排队

Redisson 中的 RDelayedQueue 对象允许Redis 中实现延迟队列。 当使用诸如指数补偿的策略将消息传递给消费者时,这可能会很有用。 每次尝试发送邮件失败后,重试之间的时间将成倍增加。

在与元素一起指定的延迟之后,延迟队列中的每个元素将被转移到目标队列。 此目标队列可以是实现RQueue 接口的任何队列,例如 RBlockingQueue 或 RBoundedBlockingQueue。

 1 RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
 2 
 3 RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
 4 
 5 // move object to destinationQueue in 10 seconds
 6 
 7 delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
 8 
 9 // move object to destinationQueue in 1 minute
10 
11 delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

 

在不再需要队列之后,通过使用destroy()方法销毁延迟的队列是一个好主意。 但是,如果要关闭 Redisson,则没有必要。

7、PriorityQueue

Redisson 中的 RPriorityQueue 对象实现了 java.util.Queue 接口。 优先级队列是不是按元素的使用期限而是按照与每个元素相关联的优先级排序的队列。

如下面的示例代码所示,RPriorityQueue 使用比较器对队列中的元素进行排序:

 1 RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
 2 
 3 queue.trySetComparator(new MyComparator()); // set object comparator
 4 
 5 queue.add(3);
 6 
 7 queue.add(1);
 8 
 9 queue.add(2);
10 
11 queue.removeAsync(0);
12 
13 queue.addAsync(5);
14 
15 queue.poll();

 

8、PriorityBlockingQueue

Redisson 中的 RPriorityBlockingQueue 对象结合了 RPriorityQueue 和 RBlockingQueue 的功能。 与 RPriorityQueue 一样,RPriorityBlockingQueue 也使用 Comparator 对队列中的元素进行排序。

 1 RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
 2 
 3 queue.trySetComparator(new MyComparator()); // set object comparator
 4 
 5 queue.add(3);
 6 
 7 queue.add(1);
 8 
 9 queue.add(2);
10 
11 queue.removeAsync(0);
12 
13 queue.addAsync(5);
14 
15 queue.take();

 

在故障转移或重新连接到Redis 服务器的过程中,将自动重新预订 poll(),pollLastAndOfferFirstTo()和 take()Java 方法。

 文章分享到这里,如有不足之处,欢迎补充评论!

抽丝剥茧,细说架构那些事!