Java Redis+Spring-data-redis 队列 单机版
1、redis.properties
##redisIP 地址 #redis.host=10.14.2.212 redis.host=127.0.0.1 ##redis 默认端口号 redis.port=6379 #redis 密码 redis.pass=a7217sec!@###redis.database=0 ## 指定使用第几个库
redis.maxIdle=300
redis.maxActive=600
redis.maxWait=1000
redis.testOnBorrow=true
redis.timeout=1000redis.pool.maxActive=200
redis.pool.maxIdle=50
redis.pool.minIdle=0
redis.pool.maxWait=15000
## 向调用者输出链接资源时,是否检测是否有效,如果无效则从连接池中移除,尝试继续获取,默认为 false,建议保留默认值
redis.pool.testOnBorrow=false
## 向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为 false。建议保持默认值.
redis.pool.testOnReturn=false
##maxActive: 链接池中最大连接数, 默认为 8.
##maxIdle: 链接池中最大空闲的连接数, 默认为 8.
##minIdle: 连接池中最少空闲的连接数, 默认为 0.
##maxWait: 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数; 默认为-1. 表示永不超时.
##minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值 (-1) 表示不移除。
##softMinEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲链接将会被移除,且保留“minIdle”个空闲连接数。默认为-1.
##numTestsPerEvictionRun: 对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为 3.
##testOnBorrow: 向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为 false。建议保持默认值.
##testOnReturn: 向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为 false。建议保持默认值.
##testWhileIdle:false 向调用者输出“链接”对象时,是否检测它的空闲超时;默认为 false。如果“链接”空闲超时,将会被移除。建议保持默认值.
##timeBetweenEvictionRunsMillis: “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1.
##whenExhaustedAction: 当“连接池”中 active 数量达到阀值时,即“链接”资源耗尽时,连接池需要采取的手段, 默认为 1:-> 0 : 抛出异常,
-> 1 : 阻塞,直到有可用链接资源
-> 2 : 强制创建新的链接资源
2、spring-redis.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"><context:annotation-config /> <!-- 引入外部属性文件. --> <context:property-placeholder location="classpath*:redis.properties" ignore-unresolvable="true" /> <!-- 注册 --> <context:component-scan base-<span style="color: rgba(0, 0, 255, 1)">package</span>="com.xiaomei.queue.redis.demo01"/> <bean id="poolConfig" <span style="color: rgba(0, 0, 255, 1)">class</span>="redis.clients.jedis.JedisPoolConfig"> <!--<property name="maxTotal" value="${redis.pool.maxActive}" />--> <!--最大空闲连接数 --> <property name="maxIdle" value="${redis.pool.maxIdle}" /> <!--初始化连接数 --> <property name="minIdle" value="${redis.pool.minIdle}" /> <!--最大等待时间 --> <property name="maxWaitMillis" value="${redis.pool.maxWait}" /> <!--对拿到的connection进行validateObject校验 --> <property name="testOnBorrow" value="${redis.pool.testOnBorrow}" /> <!--在进行returnObject对返回的connection进行validateObject校验 --> <property name="testOnReturn" value="${redis.pool.testOnReturn}" /> <!--定时对线程池中空闲的链接进行validateObject校验 --> <property name="testWhileIdle" value="false" /> </bean> <bean id="connectionFactory" <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"<span style="color: rgba(0, 0, 0, 1)"> p:host</span>-name="${redis.host}" p:port="${redis.port}" p:password="mxb123" p:pool-config-ref="poolConfig"/> <bean id="redisTemplate" <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.data.redis.core.StringRedisTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="keySerializer"> <bean <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="valueSerializer"><br> <!-- 采用jdk序列 --> <bean <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" /> </property> </bean>
<!-- 发送邮件的队列, 这边只是个例子 -->
<bean id="sentEmailQueue" class="com.xiaomei.queue.redis.demo01.SendEmailQueue">
<property name="redisQueue" >
<bean class="com.xiaomei.queue.redis.demo01.RedisQueue" destroy-method="destroy">
<property name="redisTemplate" ref="redisTemplate"></property>
<property name="queueName" value="email"></property>
</bean>
</property>
</bean></beans>
3、IRedisQueue.java
package com.xiaomei.queue.redis.demo01;import java.util.concurrent.TimeUnit;
/**
@author xiaomei
@version V1.0
@Title: IRedisQueue
@Package com.xiaomei.queue.redis.demo01
@Description:
@date 11/7/17
*/
public interface IRedisQueue<T> {/**
- 从头开始拿
- 拿出,没有就等待
- @return
- @throws InterruptedException
*/
public T take() throws InterruptedException;/**
- 从尾开始拿
- 拿出,没有就等待
- @return
- @throws InterruptedException
*/
public T takeOpposite() throws InterruptedException;/**
- 从头开始拿
- 拿出,没有就等待 seconds 秒
- @param seconds
- @return
- @throws InterruptedException
*/
public T poll(int seconds) throws InterruptedException;/**
- 从头开始拿
- 拿出,没有就等待 seconds 秒
- @param seconds
- @return
- @throws InterruptedException
*/
public T pollOpposite(int seconds) throws InterruptedException;/**
- 从尾开始放
- 入队
- @param value
- @return
- @throws InterruptedException
*/
public void add(T value);/**
- 从头开始放
- 入队
- @param value
- @return
- @throws InterruptedException
*/
public void addOpposite(T value);//
// /**
// * 从头开始删除
// * @return
// /
// public T remove();
//
//
// /*
// * 从尾开始删除
// * @return
// /
// public T removeOpposite();
//
/**
* 删除所有
/
public void clearAll();}
4、RedisQueue.java
package com.xiaomei.queue.redis.demo01;import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/**
@author xiaomei
@version V1.0
@Title: RedisQueue
@Package com.xiaomei.queue.redis.demo01
@Description:
@date 11/7/17
*/
public class RedisQueue<T> implements InitializingBean,DisposableBean,IRedisQueue<T> {//队列名称
public String queueName;
//原生 key
private byte[] rawKey;private RedisTemplate redisTemplate;
private RedisConnectionFactory factory;
private RedisConnection connection; //为了堵塞
private BoundListOperations<String, T> listOperations;
private Lock lock = new ReentrantLock();//基于底层 IO 阻塞考虑 如果分布式的话,就是用分式式的锁@Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> T take() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException { </span><span style="color: rgba(0, 0, 255, 1)">return</span> poll(0<span style="color: rgba(0, 0, 0, 1)">); } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> T takeOpposite() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException { </span><span style="color: rgba(0, 0, 255, 1)">return</span> pollOpposite(0<span style="color: rgba(0, 0, 0, 1)">); } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> T poll(<span style="color: rgba(0, 0, 255, 1)">int</span> seconds) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException { lock.lockInterruptibly(); </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">{ List</span><<span style="color: rgba(0, 0, 255, 1)">byte</span>[]> results =<span style="color: rgba(0, 0, 0, 1)"> connection.bRPop(seconds, rawKey); </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)">(CollectionUtils.isEmpty(results)){ </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">; } </span><span style="color: rgba(0, 0, 255, 1)">return</span> (T)redisTemplate.getValueSerializer().deserialize(results.get(1<span style="color: rgba(0, 0, 0, 1)">)); }</span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)">{ lock.unlock(); } } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> T pollOpposite(<span style="color: rgba(0, 0, 255, 1)">int</span> seconds) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException { lock.lockInterruptibly(); </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">{ List</span><<span style="color: rgba(0, 0, 255, 1)">byte</span>[]> results =<span style="color: rgba(0, 0, 0, 1)"> connection.bLPop(seconds, rawKey); </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)">(CollectionUtils.isEmpty(results)){ </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">; } </span><span style="color: rgba(0, 0, 255, 1)">return</span> (T)redisTemplate.getValueSerializer().deserialize(results.get(1<span style="color: rgba(0, 0, 0, 1)">)); }</span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)">{ lock.unlock(); } } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> add(T value) { listOperations.rightPush(value); } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> addOpposite(T value) { listOperations.leftPush(value); } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> clearAll() {
// listOperations.
}@Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> afterPropertiesSet() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception { factory </span>=<span style="color: rgba(0, 0, 0, 1)"> redisTemplate.getConnectionFactory(); connection </span>=<span style="color: rgba(0, 0, 0, 1)"> RedisConnectionUtils.getConnection(factory); rawKey </span>=<span style="color: rgba(0, 0, 0, 1)"> redisTemplate.getKeySerializer().serialize(queueName); listOperations </span>=<span style="color: rgba(0, 0, 0, 1)"> redisTemplate.boundListOps(queueName); } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> destroy() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {</span>
RedisConnectionUtils.releaseConnection(connection, factory);
}</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setQueueName(String queueName) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.queueName =<span style="color: rgba(0, 0, 0, 1)"> queueName; } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setRedisTemplate(RedisTemplate redisTemplate) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.redisTemplate =<span style="color: rgba(0, 0, 0, 1)"> redisTemplate; }
}
5、SendEmailQueue
package com.xiaomei.queue.redis.demo01;import com.xiaomei.queue.redis.demo01.entity.EmailEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.data.redis.core.RedisTemplate;/**
@author xiaomei
@version V1.0
@Title: SendEmailQueue
@Package com.xiaomei.queue.redis.demo01
@Description:
@date 11/7/17
*/
public class SendEmailQueue implements Runnable{//@Autowired
private IRedisQueue<EmailEntity> redisQueue;public IRedisQueue<EmailEntity> getRedisQueue() {
return redisQueue;
}public void setRedisQueue(IRedisQueue<EmailEntity> redisQueue) {
this.redisQueue = redisQueue;
}public void sentEmail(EmailEntity emailEntity) {
redisQueue.add(emailEntity);
}public EmailEntity getEmail() throws InterruptedException {
return redisQueue.poll(1);
}@Override
public void run() {
try {
while (true){
System.out.println("take"+getEmail());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring-redis.xml");
SendEmailQueue sendEmailQueue = (SendEmailQueue) applicationContext.getBean("sentEmailQueue");
// Thread thread = new Thread(){
// @Override
// public void run() {
// for (int i = 0 ; i <100 ; i++){
// EmailEntity emailEntity = new EmailEntity();
// emailEntity.setEmailAddr(i+"-- - @qq.com");
// emailEntity.setUserName("name:"+i);
// sendEmailQueue.sentEmail(emailEntity);
// }
// }
// };
// thread.run();
sendEmailQueue.run();
}
}
6、EmailEntity.java 注意必须 实现 Serializeable 不然不能序列化
package com.xiaomei.queue.redis.demo01.entity;import java.io.Serializable;
/**
@author 梅谢兵
@version V1.0
@Title: EmailEntity
@Package com.xiaomei.queue.redis.demo01.entity
@Description:
@date 11/7/17
*/
public class EmailEntity implements Serializable {private String userName;
private String emailAddr;public String getUserName() {
return userName;
}public void setUserName(String userName) {
this.userName = userName;
}public String getEmailAddr() {
return emailAddr;
}public void setEmailAddr(String emailAddr) {
this.emailAddr = emailAddr;
}@Override
public String toString() {
return "EmailEntity{" +
"userName='"+ userName +''' +
", emailAddr='"+ emailAddr +''' +
'}';
}
}
采用的是生产者和消费者的模式。
最后加上一直图