redis实现消息队列-java代码实现
转:https://blog.csdn.net/qq_42175986/article/details/88576849
pom.xml
<!-- redis 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <!-- redis 依赖 -->
application.yml
spring: application: name: yys-redismq redis: host: 127.0.0.1 port: 6379 password: 123456 pool: max-active: 100 max-idle: 10 min-idle: 0 max-wait: 100000 timeout: 0 database: 2server:
port: 8081
tomcat:
uri-encoding: UTF-8
RedisConfig.java
package com.yys.demo.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import redis.clients.jedis.JedisPoolConfig;/**
描述:redis 配置类
@author yys
@date 2019.03.15
*/
@Configuration
public class RedisConfig {/** redis 服务器地址 */
@Value("${spring.redis.host}")
private String host;/** redis 端口号 */
@Value("${spring.redis.port}")
private int port;/** redis 服务器密码 */
@Value("${spring.redis.password}")
private String password;/** redis 连接池最大连接数 (使用负值无限制) */
@Value("${spring.redis.pool.max-active}")
private int maxActive;/** redis 连接池最大空闲数 */
@Value("${spring.redis.pool.max-idle}")
private int maxIdle;/** redis 连接池小空闲数 */
@Value("${spring.redis.pool.min-idle}")
private int minIdle;/** redis 连接池最大阻塞等待时间 (负值无限制) */
@Value("${spring.redis.pool.max-wait}")
private int maxWait;/** redis 数据库索引 (默认 0) */
@Value("${spring.redis.database}")
private int database;/** redis 超时时间 */
@Value("${spring.redis.timeout}")
private int timeout;@Bean
public JedisPoolConfig getRedisConfig(){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(maxWait);
return config;
}@Bean
public JedisConnectionFactory getConnectionFactory() {
JedisConnectionFactory factory = new JedisConnectionFactory();
factory.setHostName(host);
factory.setPort(port);
factory.setPassword(password);
factory.setDatabase(database);
JedisPoolConfig config = getRedisConfig();
factory.setPoolConfig(config);
return factory;
}@Bean
public RedisTemplate<?, ?> getRedisTemplate() {
JedisConnectionFactory factory = getConnectionFactory();
RedisTemplate<?, ?> redisTemplate = new StringRedisTemplate(factory);
return redisTemplate;
}}
RedisClient.java
package com.yys.demo.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;/**
描述:redis 工具类 (本配置类只有 redis 消息队列相关命令方法)
@author yys
@date 2019.03.15
*/
@Component
public class RedisClient {@Autowired
private RedisTemplate<String, Object> redisTemplate;/ ---------------------------------- redis 消息队列 ---------------------------------- */
/
- 存值
- @param key 键
- @param value 值
- @return
*/
public boolean lpush(String key, Object value) {
try {
redisTemplate.opsForList().leftPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}/**
- 取值 - <rpop:非阻塞式 >
- @param key 键
- @return
*/
public Object rpop(String key) {
try {
return redisTemplate.opsForList().rightPop(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}/**
- 取值 - <brpop:阻塞式 > - 推荐使用
- @param key 键
- @param timeout 超时时间
- @param timeUnit 给定单元粒度的时间段
TimeUnit.DAYS //天
TimeUnit.HOURS //小时
TimeUnit.MINUTES //分钟
TimeUnit.SECONDS //秒
TimeUnit.MILLISECONDS //毫秒
- @return
*/
public Object brpop(String key, long timeout, TimeUnit timeUnit) {
try {
return redisTemplate.opsForList().rightPop(key, timeout, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}/**
- 查看值
- @param key 键
- @param start 开始
- @param end 结束 0 到 -1 代表所有值
- @return
*/
public List<Object> lrange(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}/** ---------------------------------- redis 消息队列 ---------------------------------- */
}
RedisProducerController.java
package com.yys.demo.controller;import com.yys.demo.config.RedisClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Arrays;
import java.util.List;/**
描述:生产者 (消息发送方)
@author yys
@date 2019.03.15
*/
@RestController
@RequestMapping("/producer")
public class RedisProducerController {@Autowired
RedisClient redisClient;/** 公共配置 */
private final static String SUCCESS = "success";
private final static String MESSAGE = "testmq";
private static final List<String> list;static {
list = Arrays.asList(new String[]{"猿医生", "CD", "yys"});
}/**
- 消息发送 API
- @return
*/
@RequestMapping("/sendMessage")
public String sendMessage() {
for (String message : list) {
redisClient.lpush(MESSAGE, message);
}
return SUCCESS;
}}
RedisConsumerController.java
package com.yys.demo.controller;import com.yys.demo.config.RedisClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.TimeUnit;
/**
描述:消费者 (消息接收方)
@author yys
@date 2019.03.15
*/
@RestController
@RequestMapping("/consumer")
public class RedisConsumerController {@Autowired
RedisClient redisClient;/** 公共配置 */
private final static String MESSAGE = "testmq";/**
- 接收消息 API
- @return
*/
@RequestMapping("/receiveMessage")
public String sendMessage() {
return (String) redisClient.brpop(MESSAGE, 0, TimeUnit.SECONDS);
}}
注:批量消费消息例子如下
package com.cashloan.analytics.controller;import com.cashloan.analytics.model.DcProjectCount;
import com.cashloan.analytics.utils.RedisQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;@RestController
@RequestMapping("/test")
public class Test {
@Autowired
private RedisQueue redisQueue;</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> 公共配置 </span><span style="color: rgba(0, 128, 0, 1)">*/</span> <span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">final</span> <span style="color: rgba(0, 0, 255, 1)">static</span> String MESSAGE = "testmq"<span style="color: rgba(0, 0, 0, 1)">; @RequestMapping(</span>"/send"<span style="color: rgba(0, 0, 0, 1)">) </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String sendMessage(String[] strs) { List</span><String> str =<span style="color: rgba(0, 0, 0, 1)"> Arrays.asList(strs); </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (String msg : str) { DcProjectCount dcProjectCount </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> DcProjectCount(); dcProjectCount.setId(Long.valueOf(msg)); redisQueue.lpush(MESSAGE, dcProjectCount); } </span><span style="color: rgba(0, 0, 255, 1)">return</span> "1"<span style="color: rgba(0, 0, 0, 1)">; } @GetMapping(</span>"/see"<span style="color: rgba(0, 0, 0, 1)">) </span><span style="color: rgba(0, 0, 255, 1)">public</span> List<Object><span style="color: rgba(0, 0, 0, 1)"> see() { List</span><Object> lrange = redisQueue.lrange(MESSAGE, 0, -1<span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> lrange; } @GetMapping(</span>"/get"<span style="color: rgba(0, 0, 0, 1)">) </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Object get() { List</span><Object> lrange = redisQueue.lrange(MESSAGE, 0, -1<span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 0, 255, 1)">int</span> size =<span style="color: rgba(0, 0, 0, 1)"> lrange.size(); System.out.println(</span>"size:" +<span style="color: rgba(0, 0, 0, 1)"> size); </span><span style="color: rgba(0, 0, 255, 1)">if</span> (size >= 10<span style="color: rgba(0, 0, 0, 1)">) { List</span><DcProjectCount> dcProjectCounts = <span style="color: rgba(0, 0, 255, 1)">new</span> ArrayList<><span style="color: rgba(0, 0, 0, 1)">(); </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 0; i < size; i++<span style="color: rgba(0, 0, 0, 1)">) { Object brpop </span>= redisQueue.brpop(MESSAGE, 0<span style="color: rgba(0, 0, 0, 1)">, TimeUnit.SECONDS); </span><span style="color: rgba(0, 0, 255, 1)">if</span> (brpop != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) { dcProjectCounts.add((DcProjectCount) brpop); } } </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (DcProjectCount dcProjectCount : dcProjectCounts) { System.out.println(dcProjectCount.getId()); } } </span><span style="color: rgba(0, 0, 255, 1)">return</span> "true"<span style="color: rgba(0, 0, 0, 1)">; }
}
DcProjectCount 类只要设置 private Long id 并添加 set、get 方法即可!