redis实现消息队列-java代码实现

转:https://blog.csdn.net/qq_42175986/article/details/88576849

pom.xml

<!-- redis 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
<!-- redis 依赖 -->

application.yml

spring:
  application:
      name: yys-redismq
  redis:
    host: 127.0.0.1
    port: 6379
    password: 123456
    pool:
      max-active: 100
      max-idle: 10
      min-idle: 0
      max-wait: 100000
    timeout: 0
    database: 2

server:
port: 8081
tomcat:
uri
-encoding: UTF-8

RedisConfig.java

package com.yys.demo.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import redis.clients.jedis.JedisPoolConfig;

/**

  • 描述:redis 配置类

  • @author yys

  • @date 2019.03.15
    */
    @Configuration
    public class RedisConfig {

    /** redis 服务器地址 */
    @Value(
    "${spring.redis.host}")
    private String host;

    /** redis 端口号 */
    @Value(
    "${spring.redis.port}")
    private int port;

    /** redis 服务器密码 */
    @Value(
    "${spring.redis.password}")
    private String password;

    /** redis 连接池最大连接数 (使用负值无限制) */
    @Value(
    "${spring.redis.pool.max-active}")
    private int maxActive;

    /** redis 连接池最大空闲数 */
    @Value(
    "${spring.redis.pool.max-idle}")
    private int maxIdle;

    /** redis 连接池小空闲数 */
    @Value(
    "${spring.redis.pool.min-idle}")
    private int minIdle;

    /** redis 连接池最大阻塞等待时间 (负值无限制) */
    @Value(
    "${spring.redis.pool.max-wait}")
    private int maxWait;

    /** redis 数据库索引 (默认 0) */
    @Value(
    "${spring.redis.database}")
    private int database;

    /** redis 超时时间 */
    @Value(
    "${spring.redis.timeout}")
    private int timeout;

    @Bean
    public JedisPoolConfig getRedisConfig(){
    JedisPoolConfig config
    = new JedisPoolConfig();
    config.setMaxTotal(maxActive);
    config.setMaxIdle(maxIdle);
    config.setMinIdle(minIdle);
    config.setMaxWaitMillis(maxWait);
    return config;
    }

    @Bean
    public JedisConnectionFactory getConnectionFactory() {
    JedisConnectionFactory factory
    = new JedisConnectionFactory();
    factory.setHostName(host);
    factory.setPort(port);
    factory.setPassword(password);
    factory.setDatabase(database);
    JedisPoolConfig config
    = getRedisConfig();
    factory.setPoolConfig(config);
    return factory;
    }

    @Bean
    public RedisTemplate<?, ?> getRedisTemplate() {
    JedisConnectionFactory factory
    = getConnectionFactory();
    RedisTemplate
    <?, ?> redisTemplate = new StringRedisTemplate(factory);
    return redisTemplate;
    }

}

RedisClient.java

package com.yys.demo.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**

  • 描述:redis 工具类 (本配置类只有 redis 消息队列相关命令方法)

  • @author yys

  • @date 2019.03.15
    */
    @Component
    public class RedisClient {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    / ---------------------------------- redis 消息队列 ---------------------------------- */
    /

    • 存值
    • @param key 键
    • @param value 值
    • @return
      */
      public boolean lpush(String key, Object value) {
      try {
      redisTemplate.opsForList().leftPush(key, value);
      return true;
      }
      catch (Exception e) {
      e.printStackTrace();
      return false;
      }
      }

    /**

    • 取值 - <rpop:非阻塞式 >
    • @param key 键
    • @return
      */
      public Object rpop(String key) {
      try {
      return redisTemplate.opsForList().rightPop(key);
      }
      catch (Exception e) {
      e.printStackTrace();
      return null;
      }
      }

    /**

    • 取值 - <brpop:阻塞式 > - 推荐使用
    • @param key 键
    • @param timeout 超时时间
    • @param timeUnit 给定单元粒度的时间段
    •             TimeUnit.DAYS          //天
      
    •             TimeUnit.HOURS         //小时
      
    •             TimeUnit.MINUTES       //分钟
      
    •             TimeUnit.SECONDS       //秒
      
    •             TimeUnit.MILLISECONDS  //毫秒
      
    • @return
      */
      public Object brpop(String key, long timeout, TimeUnit timeUnit) {
      try {
      return redisTemplate.opsForList().rightPop(key, timeout, TimeUnit.SECONDS);
      }
      catch (Exception e) {
      e.printStackTrace();
      return null;
      }
      }

    /**

    • 查看值
    • @param key 键
    • @param start 开始
    • @param end 结束 0 到 -1 代表所有值
    • @return
      */
      public List<Object> lrange(String key, long start, long end) {
      try {
      return redisTemplate.opsForList().range(key, start, end);
      }
      catch (Exception e) {
      e.printStackTrace();
      return null;
      }
      }

    /** ---------------------------------- redis 消息队列 ---------------------------------- */

}

RedisProducerController.java

package com.yys.demo.controller;

import com.yys.demo.config.RedisClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;

/**

  • 描述:生产者 (消息发送方)

  • @author yys

  • @date 2019.03.15
    */
    @RestController
    @RequestMapping(
    "/producer")
    public class RedisProducerController {

    @Autowired
    RedisClient redisClient;

    /** 公共配置 */
    private final static String SUCCESS = "success";
    private final static String MESSAGE = "testmq";
    private static final List<String> list;

    static {
    list
    = Arrays.asList(new String[]{"猿医生", "CD", "yys"});
    }

    /**

    • 消息发送 API
    • @return
      */
      @RequestMapping(
      "/sendMessage")
      public String sendMessage() {
      for (String message : list) {
      redisClient.lpush(MESSAGE, message);
      }
      return SUCCESS;
      }

}

RedisConsumerController.java

package com.yys.demo.controller;

import com.yys.demo.config.RedisClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

/**

  • 描述:消费者 (消息接收方)

  • @author yys

  • @date 2019.03.15
    */
    @RestController
    @RequestMapping(
    "/consumer")
    public class RedisConsumerController {

    @Autowired
    RedisClient redisClient;

    /** 公共配置 */
    private final static String MESSAGE = "testmq";

    /**

    • 接收消息 API
    • @return
      */
      @RequestMapping(
      "/receiveMessage")
      public String sendMessage() {
      return (String) redisClient.brpop(MESSAGE, 0, TimeUnit.SECONDS);
      }

}

 

注:批量消费消息例子如下

package com.cashloan.analytics.controller;

import com.cashloan.analytics.model.DcProjectCount;
import com.cashloan.analytics.utils.RedisQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/test")
public class Test {
@Autowired
private RedisQueue redisQueue;

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)"> 公共配置 </span><span style="color: rgba(0, 128, 0, 1)">*/</span>
<span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">final</span> <span style="color: rgba(0, 0, 255, 1)">static</span> String MESSAGE = "testmq"<span style="color: rgba(0, 0, 0, 1)">;

@RequestMapping(</span>"/send"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String sendMessage(String[] strs) {
    List</span>&lt;String&gt; str =<span style="color: rgba(0, 0, 0, 1)"> Arrays.asList(strs);
    </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (String msg : str) {
        DcProjectCount dcProjectCount </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> DcProjectCount();
        dcProjectCount.setId(Long.valueOf(msg));
        redisQueue.lpush(MESSAGE, dcProjectCount);
    }
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> "1"<span style="color: rgba(0, 0, 0, 1)">;
}

@GetMapping(</span>"/see"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">public</span> List&lt;Object&gt;<span style="color: rgba(0, 0, 0, 1)"> see() {
    List</span>&lt;Object&gt; lrange = redisQueue.lrange(MESSAGE, 0, -1<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> lrange;
}

@GetMapping(</span>"/get"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Object get() {
    List</span>&lt;Object&gt; lrange = redisQueue.lrange(MESSAGE, 0, -1<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">int</span> size =<span style="color: rgba(0, 0, 0, 1)"> lrange.size();
    System.out.println(</span>"size:" +<span style="color: rgba(0, 0, 0, 1)"> size);
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (size &gt;= 10<span style="color: rgba(0, 0, 0, 1)">) {
        List</span>&lt;DcProjectCount&gt; dcProjectCounts = <span style="color: rgba(0, 0, 255, 1)">new</span> ArrayList&lt;&gt;<span style="color: rgba(0, 0, 0, 1)">();
        </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 0; i &lt; size; i++<span style="color: rgba(0, 0, 0, 1)">) {
            Object brpop </span>= redisQueue.brpop(MESSAGE, 0<span style="color: rgba(0, 0, 0, 1)">, TimeUnit.SECONDS);
            </span><span style="color: rgba(0, 0, 255, 1)">if</span> (brpop != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
                dcProjectCounts.add((DcProjectCount) brpop);
            }
        }
        </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (DcProjectCount dcProjectCount : dcProjectCounts) {
            System.out.println(dcProjectCount.getId());
        }
    }
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> "true"<span style="color: rgba(0, 0, 0, 1)">;
}

}

 DcProjectCount 类只要设置 private Long id 并添加 set、get 方法即可!