Java Redis+Spring-data-redis 队列 单机版

1、redis.properties

##redisIP 地址
#redis.host=10.14.2.212
redis.host=127.0.0.1
##redis 默认端口号
redis.port=6379
#redis 密码
redis.pass=a7217sec!@#

##redis.database=0 ## 指定使用第几个库

redis.maxIdle=300
redis.maxActive
=600
redis.maxWait
=1000
redis.testOnBorrow
=true
redis.timeout
=1000

redis.pool.maxActive=200
redis.pool.maxIdle
=50
redis.pool.minIdle
=0
redis.pool.maxWait
=15000
## 向调用者输出链接资源时,是否检测是否有效,如果无效则从连接池中移除,尝试继续获取,默认为 false,建议保留默认值
redis.pool.testOnBorrow
=false
## 向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为 false。建议保持默认值.
redis.pool.testOnReturn
=false
##maxActive: 链接池中最大连接数, 默认为 8.
##maxIdle: 链接池中最大空闲的连接数, 默认为 8.
##minIdle: 连接池中最少空闲的连接数, 默认为 0.
##maxWait: 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数; 默认为
-1. 表示永不超时.
##minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值 (
-1) 表示不移除。
##softMinEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲链接将会被移除,且保留“minIdle”个空闲连接数。默认为
-1.
##numTestsPerEvictionRun: 对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为 3.
##testOnBorrow: 向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为 false。建议保持默认值.
##testOnReturn: 向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为 false。建议保持默认值.
##testWhileIdle:
false 向调用者输出“链接”对象时,是否检测它的空闲超时;默认为 false。如果“链接”空闲超时,将会被移除。建议保持默认值.
##timeBetweenEvictionRunsMillis: “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为
-1.
##whenExhaustedAction: 当“连接池”中 active 数量达到阀值时,即“链接”资源耗尽时,连接池需要采取的手段, 默认为 1:

-> 0 : 抛出异常,

-> 1 : 阻塞,直到有可用链接资源

-> 2 : 强制创建新的链接资源

2、spring-redis.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
&lt;context:annotation-config /&gt;

&lt;!-- 引入外部属性文件. --&gt;
&lt;context:property-placeholder location="classpath*:redis.properties" ignore-unresolvable="true" /&gt;

&lt;!-- 注册 --&gt;
&lt;context:component-scan base-<span style="color: rgba(0, 0, 255, 1)">package</span>="com.xiaomei.queue.redis.demo01"/&gt;

&lt;bean id="poolConfig" <span style="color: rgba(0, 0, 255, 1)">class</span>="redis.clients.jedis.JedisPoolConfig"&gt;
    &lt;!--&lt;property name="maxTotal" value="${redis.pool.maxActive}" /&gt;--&gt;
    &lt;!--最大空闲连接数 --&gt;
    &lt;property name="maxIdle" value="${redis.pool.maxIdle}" /&gt;
    &lt;!--初始化连接数 --&gt;
    &lt;property name="minIdle" value="${redis.pool.minIdle}" /&gt;
    &lt;!--最大等待时间 --&gt;
    &lt;property name="maxWaitMillis" value="${redis.pool.maxWait}" /&gt;
    &lt;!--对拿到的connection进行validateObject校验 --&gt;
    &lt;property name="testOnBorrow" value="${redis.pool.testOnBorrow}" /&gt;
    &lt;!--在进行returnObject对返回的connection进行validateObject校验 --&gt;
    &lt;property name="testOnReturn" value="${redis.pool.testOnReturn}" /&gt;
    &lt;!--定时对线程池中空闲的链接进行validateObject校验 --&gt;
    &lt;property name="testWhileIdle" value="false" /&gt;
&lt;/bean&gt;

&lt;bean id="connectionFactory" <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"<span style="color: rgba(0, 0, 0, 1)">
      p:host</span>-name="${redis.host}" p:port="${redis.port}" p:password="mxb123"  p:pool-config-ref="poolConfig"/&gt;

&lt;bean id="redisTemplate" <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.data.redis.core.StringRedisTemplate"&gt;
    &lt;property name="connectionFactory"   ref="connectionFactory" /&gt;
    &lt;property name="keySerializer"&gt;
        &lt;bean <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.data.redis.serializer.StringRedisSerializer" /&gt;
    &lt;/property&gt;
    &lt;property name="valueSerializer"&gt;<br>        &lt;!-- 采用jdk序列 --&gt;
        &lt;bean <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" /&gt;
    &lt;/property&gt;
&lt;/bean&gt;

  <!-- 发送邮件的队列, 这边只是个例子 -->
<bean id="sentEmailQueue" class="com.xiaomei.queue.redis.demo01.SendEmailQueue">
<property name="redisQueue" >
<bean class="com.xiaomei.queue.redis.demo01.RedisQueue" destroy-method="destroy">
<property name="redisTemplate" ref="redisTemplate"></property>
<property name="queueName" value="email"></property>
</bean>
</property>
</bean>

</beans>

3、IRedisQueue.java

package com.xiaomei.queue.redis.demo01;

import java.util.concurrent.TimeUnit;

/**

  • @author xiaomei

  • @version V1.0

  • @Title: IRedisQueue

  • @Package com.xiaomei.queue.redis.demo01

  • @Description:

  • @date 11/7/17
    */
    public interface IRedisQueue<T> {

    /**

    • 从头开始拿
    • 拿出,没有就等待
    • @return
    • @throws InterruptedException
      */
      public T take() throws InterruptedException;

    /**

    • 从尾开始拿
    • 拿出,没有就等待
    • @return
    • @throws InterruptedException
      */
      public T takeOpposite() throws InterruptedException;

    /**

    • 从头开始拿
    • 拿出,没有就等待 seconds 秒
    • @param seconds
    • @return
    • @throws InterruptedException
      */
      public T poll(int seconds) throws InterruptedException;

    /**

    • 从头开始拿
    • 拿出,没有就等待 seconds 秒
    • @param seconds
    • @return
    • @throws InterruptedException
      */
      public T pollOpposite(int seconds) throws InterruptedException;

    /**

    • 从尾开始放
    • 入队
    • @param value
    • @return
    • @throws InterruptedException
      */
      public void add(T value);

    /**

    • 从头开始放
    • 入队
    • @param value
    • @return
    • @throws InterruptedException
      */
      public void addOpposite(T value);

//
// /**
// * 从头开始删除
// * @return
// /
// public T remove();
//
//
// /
*
// * 从尾开始删除
// * @return
// /
// public T removeOpposite();
//

/**
* 删除所有
/
public void clearAll();

}

4、RedisQueue.java

package com.xiaomei.queue.redis.demo01;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**

  • @author xiaomei

  • @version V1.0

  • @Title: RedisQueue

  • @Package com.xiaomei.queue.redis.demo01

  • @Description:

  • @date 11/7/17
    */
    public class RedisQueue<T> implements InitializingBean,DisposableBean,IRedisQueue<T> {

    //队列名称
    public String queueName;
    //原生 key
    private byte[] rawKey;

    private RedisTemplate redisTemplate;
    private RedisConnectionFactory factory;
    private RedisConnection connection; //为了堵塞
    private BoundListOperations<String, T> listOperations;
    private Lock lock = new ReentrantLock();//基于底层 IO 阻塞考虑 如果分布式的话,就是用分式式的锁

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> T take() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> poll(0<span style="color: rgba(0, 0, 0, 1)">);
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> T takeOpposite() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> pollOpposite(0<span style="color: rgba(0, 0, 0, 1)">);
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> T poll(<span style="color: rgba(0, 0, 255, 1)">int</span> seconds) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException {
    lock.lockInterruptibly();
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">{
        List</span>&lt;<span style="color: rgba(0, 0, 255, 1)">byte</span>[]&gt; results =<span style="color: rgba(0, 0, 0, 1)"> connection.bRPop(seconds, rawKey);
        </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)">(CollectionUtils.isEmpty(results)){
            </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
        }
        </span><span style="color: rgba(0, 0, 255, 1)">return</span> (T)redisTemplate.getValueSerializer().deserialize(results.get(1<span style="color: rgba(0, 0, 0, 1)">));
    }</span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)">{
        lock.unlock();
    }
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> T pollOpposite(<span style="color: rgba(0, 0, 255, 1)">int</span> seconds) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException {
    lock.lockInterruptibly();
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">{
        List</span>&lt;<span style="color: rgba(0, 0, 255, 1)">byte</span>[]&gt; results =<span style="color: rgba(0, 0, 0, 1)"> connection.bLPop(seconds, rawKey);
        </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)">(CollectionUtils.isEmpty(results)){
            </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
        }
        </span><span style="color: rgba(0, 0, 255, 1)">return</span> (T)redisTemplate.getValueSerializer().deserialize(results.get(1<span style="color: rgba(0, 0, 0, 1)">));
    }</span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)">{
        lock.unlock();
    }
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> add(T value) {
    listOperations.rightPush(value);
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> addOpposite(T value) {
    listOperations.leftPush(value);
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> clearAll() {

// listOperations.
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> afterPropertiesSet() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    factory </span>=<span style="color: rgba(0, 0, 0, 1)"> redisTemplate.getConnectionFactory();
    connection </span>=<span style="color: rgba(0, 0, 0, 1)"> RedisConnectionUtils.getConnection(factory);
    rawKey </span>=<span style="color: rgba(0, 0, 0, 1)"> redisTemplate.getKeySerializer().serialize(queueName);
    listOperations </span>=<span style="color: rgba(0, 0, 0, 1)"> redisTemplate.boundListOps(queueName);
}


@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> destroy() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {</span>

RedisConnectionUtils.releaseConnection(connection, factory);
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setQueueName(String queueName) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.queueName =<span style="color: rgba(0, 0, 0, 1)"> queueName;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setRedisTemplate(RedisTemplate redisTemplate) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.redisTemplate =<span style="color: rgba(0, 0, 0, 1)"> redisTemplate;
}

}

5、SendEmailQueue

package com.xiaomei.queue.redis.demo01;

import com.xiaomei.queue.redis.demo01.entity.EmailEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.data.redis.core.RedisTemplate;

/**

  • @author xiaomei

  • @version V1.0

  • @Title: SendEmailQueue

  • @Package com.xiaomei.queue.redis.demo01

  • @Description:

  • @date 11/7/17
    */
    public class SendEmailQueue implements Runnable{

    //@Autowired
    private IRedisQueue<EmailEntity> redisQueue;

    public IRedisQueue<EmailEntity> getRedisQueue() {
    return redisQueue;
    }

    public void setRedisQueue(IRedisQueue<EmailEntity> redisQueue) {
    this.redisQueue = redisQueue;
    }

    public void sentEmail(EmailEntity emailEntity) {
    redisQueue.add(emailEntity);
    }

    public EmailEntity getEmail() throws InterruptedException {
    return redisQueue.poll(1);
    }

    @Override
    public void run() {
    try {
    while (true){
    System.out.println(
    "take"+getEmail());
    }
    }
    catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    public static void main(String[] args) {
    ClassPathXmlApplicationContext applicationContext
    = new ClassPathXmlApplicationContext("spring-redis.xml");
    SendEmailQueue sendEmailQueue
    = (SendEmailQueue) applicationContext.getBean("sentEmailQueue");
    // Thread thread = new Thread(){
    // @Override
    // public void run() {
    // for (int i = 0 ; i <100 ; i++){
    // EmailEntity emailEntity = new EmailEntity();
    // emailEntity.setEmailAddr(i+"-- - @qq.com");
    // emailEntity.setUserName("name:"+i);
    // sendEmailQueue.sentEmail(emailEntity);
    // }
    // }
    // };
    // thread.run();
    sendEmailQueue.run();
    }
    }

6、EmailEntity.java   注意必须 实现 Serializeable 不然不能序列化

package com.xiaomei.queue.redis.demo01.entity;

import java.io.Serializable;

/**

  • @author 梅谢兵

  • @version V1.0

  • @Title: EmailEntity

  • @Package com.xiaomei.queue.redis.demo01.entity

  • @Description:

  • @date 11/7/17
    */
    public class EmailEntity implements Serializable {

    private String userName;
    private String emailAddr;

    public String getUserName() {
    return userName;
    }

    public void setUserName(String userName) {
    this.userName = userName;
    }

    public String getEmailAddr() {
    return emailAddr;
    }

    public void setEmailAddr(String emailAddr) {
    this.emailAddr = emailAddr;
    }

    @Override
    public String toString() {
    return "EmailEntity{" +
    "userName='"+ userName +''' +
    ", emailAddr='"+ emailAddr +''' +
    '}';
    }
    }

采用的是生产者和消费者的模式。

最后加上一直图