Java利用Redis实现消息队列
应用场景
- 为什么要用 redis?
二进制存储、java 序列化传输、IO 连接数高、连接频繁
一、序列化
这里编写了一个 java 序列化的工具, 主要是将对象转化为 byte 数组, 和根据 byte 数组反序列化成 java 对象; 主要是用到了 ByteArrayOutputStream 和 ByteArrayInputStream; 注意: 每个需要序列化的对象都要实现 Serializable 接口;
其代码如下:
1 package Utils; 2 import java.io.*; 3 /** 4 * Created by Kinglf on 2016/10/17. 5 */ 6 public class ObjectUtil { 7 /** 8 * 对象转 byte[] 9 * @param obj 10 * @return 11 * @throws IOException 12 */ 13 public static byte[] object2Bytes(Object obj) throws IOException{14 ByteArrayOutputStream bo=new ByteArrayOutputStream(); 15 ObjectOutputStream oo=new ObjectOutputStream(bo); 16 oo.writeObject(obj); 17 byte[] bytes=bo.toByteArray(); 18 bo.close(); 19 oo.close(); 20 return bytes; 21 } 22 /** 23 * byte[] 转对象 24 * @param bytes 25 * @return 26 * @throws Exception 27 */ 28 public static Object bytes2Object(byte[] bytes) throws Exception{29 ByteArrayInputStream in=new ByteArrayInputStream(bytes); 30 ObjectInputStream sIn=new ObjectInputStream(in); 31 return sIn.readObject(); 32 } 33 }
二、消息类 (实现 Serializable 接口)
package Model;import java.io.Serializable;
/**
-
Created by Kinglf on 2016/10/17.
*/
public class Message implements Serializable {private static final long serialVersionUID = -389326121047047723L;
private int id;
private String content;
public Message(int id, String content) {
this.id = id;
this.content = content;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
三、Redis 的操作
利用 redis 做队列, 我们采用的是 redis 中 list 的 push 和 pop 操作;
结合队列的特点: 只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则
Redis 中 lpush头入
(rpop尾出
) 或 rpush尾入
(lpop头出
)可以满足要求, 而 Redis 中 list 药 push 或 pop 的对象仅需要转换成 byte[] 即可 java采用Jedis进行Redis的存储和Redis的连接池设置
上代码:
package Utils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.util.List; import java.util.Map; import java.util.Set; /** * Created by Kinglf on 2016/10/17. */ public class JedisUtil { private static String JEDIS_IP; private static int JEDIS_PORT; private static String JEDIS_PASSWORD; private static JedisPool jedisPool; static { //Configuration 自行写的配置文件解析类, 继承自 Properties Configuration conf=Configuration.getInstance(); JEDIS_IP=conf.getString("jedis.ip","127.0.0.1"); JEDIS_PORT=conf.getInt("jedis.port",6379); JEDIS_PASSWORD=conf.getString("jedis.password",null); JedisPoolConfig config=new JedisPoolConfig(); config.setMaxActive(5000); config.setMaxIdle(256); config.setMaxWait(5000L); config.setTestOnBorrow(true); config.setTestOnReturn(true); config.setTestWhileIdle(true); config.setMinEvictableIdleTimeMillis(60000L); config.setTimeBetweenEvictionRunsMillis(3000L); config.setNumTestsPerEvictionRun(-1); jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000); } /** * 获取数据 * @param key * @return */ public static String get(String key){ String value=null; Jedis jedis=null; try{jedis=jedisPool.getResource(); value=jedis.get(key); }catch (Exception e){jedisPool.returnBrokenResource(jedis); e.printStackTrace();}finally {close(jedis); } return value; }private static void close(Jedis jedis) { try{ jedisPool.returnResource(jedis); }catch (Exception e){ if(jedis.isConnected()){ jedis.quit(); jedis.disconnect(); } } } public static byte[] get(byte[] key){ byte[] value = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); value = jedis.get(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return value; } public static void set(byte[] key, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.set(key, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static void set(byte[] key, byte[] value, int time) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.set(key, value); jedis.expire(key, time); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static void hset(byte[] key, byte[] field, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hset(key, field, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static void hset(String key, String field, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hset(key, field, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 获取数据 * * @param key * @return */ public static String hget(String key, String field) { String value = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); value = jedis.hget(key, field); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return value; } /** * 获取数据 * * @param key * @return */ public static byte[] hget(byte[] key, byte[] field) { byte[] value = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); value = jedis.hget(key, field); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return value; } public static void hdel(byte[] key, byte[] field) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hdel(key, field); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 存储REDIS队列 顺序存储 * @param key reids键名 * @param value 键值 */ public static void lpush(byte[] key, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.lpush(key, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 存储REDIS队列 反向存储 * @param key reids键名 * @param value 键值 */ public static void rpush(byte[] key, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.rpush(key, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端 * @param key reids键名 * @param destination 键值 */ public static void rpoplpush(byte[] key, byte[] destination) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.rpoplpush(key, destination); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 获取队列数据 * @param key 键名 * @return */ public static List lpopList(byte[] key) { List list = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); list = jedis.lrange(key, 0, -1); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return list; } /** * 获取队列数据 * @param key 键名 * @return */ public static byte[] rpop(byte[] key) { byte[] bytes = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); bytes = jedis.rpop(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return bytes; } public static void hmset(Object key, Map hash) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hmset(key.toString(), hash); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static void hmset(Object key, Map hash, int time) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hmset(key.toString(), hash); jedis.expire(key.toString(), time); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static List hmget(Object key, String... fields) { List result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.hmget(key.toString(), fields); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return result; } public static Set hkeys(String key) { Set result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.hkeys(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return result; } public static List lrange(byte[] key, int from, int to) { List result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.lrange(key, from, to); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return result; } public static Map hgetAll(byte[] key) { Map result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.hgetAll(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return result; } public static void del(byte[] key) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.del(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static long llen(byte[] key) { long len = 0; Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.llen(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return len; }
}
四、Configuration 主要用于读取 Redis 的配置信息
package Utils;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;/**
-
Created by Kinglf on 2016/10/17.
*/
public class Configuration extends Properties {private static final long serialVersionUID = -2296275030489943706L;
private static Configuration instance = null;public static synchronized Configuration getInstance() {
if (instance == null) {
instance = new Configuration();
}
return instance;
}public String getProperty(String key, String defaultValue) {
String val = getProperty(key);
return (val == null || val.isEmpty()) ? defaultValue : val;
}public String getString(String name, String defaultValue) {
return this.getProperty(name, defaultValue);
}public int getInt(String name, int defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}public long getLong(String name, long defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}public float getFloat(String name, float defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
}public double getDouble(String name, double defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
}public byte getByte(String name, byte defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
}public Configuration() {
InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
try {
this.loadFromXML(in);
in.close();
} catch (IOException ioe) {}
}
}
五、测试
import Model.Message; import Utils.JedisUtil; import Utils.ObjectUtil; import redis.clients.jedis.Jedis;import java.io.IOException;
/**
-
Created by Kinglf on 2016/10/17.
*/
public class TestRedisQueue {
public static byte[] redisKey = "key".getBytes();
static {
try {
init();
} catch (IOException e) {
e.printStackTrace();
}
}private static void init() throws IOException {
for (int i = 0; i < 1000000; i++) {
Message message = new Message(i, "这是第" + i + "个内容");
JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
}}
public static void main(String[] args) {
try {
pop();
} catch (Exception e) {
e.printStackTrace();
}
}private static void pop() throws Exception {
byte[] bytes = JedisUtil.rpop(redisKey);
Message msg = (Message) ObjectUtil.bytes2Object(bytes);
if (msg != null) {
System.out.println(msg.getId() + "----" + msg.getContent());
}
}
}
每执行一次 pop() 方法, 结果如下:
<br>1---- 这是第 1 个内容
<br>2---- 这是第 2 个内容
<br>3---- 这是第 3 个内容
<br>4---- 这是第 4 个内容
总结
至此,整个Redis消息队列的生产者和消费者代码已经完成
- Message
需要传送的实体类(需实现Serializable接口)
- Configuration
Redis的配置读取类,继承自Properties
- ObjectUtil
将对象和byte数组双向转换的工具类
- Jedis
通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类