java redis使用之利用jedis实现redis消息队列

应用场景

对于数据库查询的 IO 连接数高、连接频繁的情况,可以考虑使用缓存实现。

从网上了解到 redis 可以对所有的内容进行二进制的存储,而 java 是可以对所有对象进行序列化的,序列化的方法会在下面的代码中提供实现。

序列化

这里我编写了一个 java 序列化的工具,主要是对对象转换成 byte[],和根据 byte[] 数组反序列化成 java 对象;

主要是用到了 ByteArrayOutputStream 和 ByteArrayInputStream;

需要注意的是每个自定义的需要序列化的对象都要实现 Serializable 接口;

其代码如下:

 1 package com.bean.util;
 2 
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.IOException;
 6 import java.io.ObjectInputStream;
 7 import java.io.ObjectOutputStream;
 8 public class ObjectUtil {
 9     /** 对象转 byte[]
10      * @param obj
11      * @return
12      * @throws IOException
13      */
14     public static byte[] objectToBytes(Object obj) throws Exception{
15         ByteArrayOutputStream bo = new ByteArrayOutputStream();
16         ObjectOutputStream oo = new ObjectOutputStream(bo);
17         oo.writeObject(obj);
18         byte[] bytes = bo.toByteArray();
19         bo.close();
20         oo.close();
21         return bytes;
22     }
23     /**byte[] 转对象
24      * @param bytes
25      * @return
26      * @throws Exception
27      */
28     public static Object bytesToObject(byte[] bytes) throws Exception{
29         ByteArrayInputStream in = new ByteArrayInputStream(bytes);
30         ObjectInputStream sIn = new ObjectInputStream(in);
31         return sIn.readObject();
32     }
33 }

定义一个消息类,主要用于接收消息内容和消息下表的设置。

 1 package com.bean;
 2 
 3 import java.io.Serializable;
 4 
 5 /** 定义消息类接收消息内容和设置消息的下标
 6  * @author lenovo
 7  *
 8  */
 9 public class Message implements Serializable{
10     private static final long serialVersionUID = 7792729L;
11     private int id;
12     private String content;
13     public int getId() {
14         return id;
15     }
16     public void setId(int id) {
17         this.id = id;
18     }
19     public String getContent() {
20         return content;
21     }
22     public void setContent(String content) {
23         this.content = content;
24     }
25 }

用 redis 做队列,我们采用的是 redis 中 list 的 push 和 pop 操作;

结合队列的特点:

只允许在一端插入新元素只能在队列的尾部 FIFO:先进先出原则

redis 中 lpush(rpop)或 rpush(lpop) 可以满足要求,而 redis 中 list 里要 push 或 pop 的对象仅需要转换成 byte[] 即可

java 采用 Jedis 进行 redis 的存储和 redis 的连接池设置。

  1 package com.redis.util;
  2 
  3 import java.util.List;
  4 import java.util.Map;
  5 import java.util.Set;
  6 
  7 import redis.clients.jedis.Jedis;
  8 import redis.clients.jedis.JedisPool;
  9 import redis.clients.jedis.JedisPoolConfig;
 10 
 11 public class JedisUtil {
 12 
 13     private static String JEDIS_IP;
 14     private static int JEDIS_PORT;
 15     private static String JEDIS_PASSWORD;
 16     //private static String JEDIS_SLAVE;
 17 
 18     private static JedisPool jedisPool;
 19 
 20     static {
 21         Configuration conf = Configuration.getInstance();
 22         JEDIS_IP = conf.getString("jedis.ip", "127.0.0.1");
 23         JEDIS_PORT = conf.getInt("jedis.port", 6379);
 24         JEDIS_PASSWORD = conf.getString("jedis.password", null);
 25         JedisPoolConfig config = new JedisPoolConfig();
 26         config.setMaxActive(5000);
 27         config.setMaxIdle(256);//20
 28         config.setMaxWait(5000L);
 29         config.setTestOnBorrow(true);
 30         config.setTestOnReturn(true);
 31         config.setTestWhileIdle(true);
 32         config.setMinEvictableIdleTimeMillis(60000l);
 33         config.setTimeBetweenEvictionRunsMillis(3000l);
 34         config.setNumTestsPerEvictionRun(-1);
 35         jedisPool = new JedisPool(config, JEDIS_IP, JEDIS_PORT, 60000);
 36     }
 37 
 38     /**
 39      * 获取数据
 40      * @param key
 41      * @return
 42      */
 43     public static String get(String key) {
 44 
 45         String value = null;
 46         Jedis jedis = null;
 47         try {
 48             jedis = jedisPool.getResource();
 49             value = jedis.get(key);
 50         } catch (Exception e) {
 51             //释放 redis 对象
 52             jedisPool.returnBrokenResource(jedis);
 53             e.printStackTrace();
 54         } finally {
 55             //返还到连接池
 56             close(jedis);
 57         }
 58 
 59         return value;
 60     }
 61 
 62     public static void close(Jedis jedis) {
 63         try {
 64             jedisPool.returnResource(jedis);
 65 
 66         } catch (Exception e) {
 67             if (jedis.isConnected()) {
 68                 jedis.quit();
 69                 jedis.disconnect();
 70             }
 71         }
 72     }
 73 
 74     /**
 75      * 获取数据
 76      * 
 77      * @param key
 78      * @return
 79      */
 80     public static byte[] get(byte[] key) {
 81 
 82         byte[] value = null;
 83         Jedis jedis = null;
 84         try {
 85             jedis = jedisPool.getResource();
 86             value = jedis.get(key);
 87         } catch (Exception e) {
 88             //释放 redis 对象
 89             jedisPool.returnBrokenResource(jedis);
 90             e.printStackTrace();
 91         } finally {
 92             //返还到连接池
 93             close(jedis);
 94         }
 95 
 96         return value;
 97     }
 98 
 99     public static void set(byte[] key, byte[] value) {
100 
101         Jedis jedis = null;
102         try {
103             jedis = jedisPool.getResource();
104             jedis.set(key, value);
105         } catch (Exception e) {
106             //释放 redis 对象
107             jedisPool.returnBrokenResource(jedis);
108             e.printStackTrace();
109         } finally {
110             //返还到连接池
111             close(jedis);
112         }
113     }
114 
115     public static void set(byte[] key, byte[] value, int time) {
116 
117         Jedis jedis = null;
118         try {
119             jedis = jedisPool.getResource();
120             jedis.set(key, value);
121             jedis.expire(key, time);
122         } catch (Exception e) {
123             //释放 redis 对象
124             jedisPool.returnBrokenResource(jedis);
125             e.printStackTrace();
126         } finally {
127             //返还到连接池
128             close(jedis);
129         }
130     }
131 
132     public static void hset(byte[] key, byte[] field, byte[] value) {
133         Jedis jedis = null;
134         try {
135             jedis = jedisPool.getResource();
136             jedis.hset(key, field, value);
137         } catch (Exception e) {
138             //释放 redis 对象
139             jedisPool.returnBrokenResource(jedis);
140             e.printStackTrace();
141         } finally {
142             //返还到连接池
143             close(jedis);
144         }
145     }
146 
147     public static void hset(String key, String field, String value) {
148         Jedis jedis = null;
149         try {
150             jedis = jedisPool.getResource();
151             jedis.hset(key, field, value);
152         } catch (Exception e) {
153             //释放 redis 对象
154             jedisPool.returnBrokenResource(jedis);
155             e.printStackTrace();
156         } finally {
157             //返还到连接池
158             close(jedis);
159         }
160     }
161 
162     /**
163      * 获取数据
164      * 
165      * @param key
166      * @return
167      */
168     public static String hget(String key, String field) {
169 
170         String value = null;
171         Jedis jedis = null;
172         try {
173             jedis = jedisPool.getResource();
174             value = jedis.hget(key, field);
175         } catch (Exception e) {
176             //释放 redis 对象
177             jedisPool.returnBrokenResource(jedis);
178             e.printStackTrace();
179         } finally {
180             //返还到连接池
181             close(jedis);
182         }
183 
184         return value;
185     }
186 
187     /**
188      * 获取数据
189      * 
190      * @param key
191      * @return
192      */
193     public static byte[] hget(byte[] key, byte[] field) {
194 
195         byte[] value = null;
196         Jedis jedis = null;
197         try {
198             jedis = jedisPool.getResource();
199             value = jedis.hget(key, field);
200         } catch (Exception e) {
201             //释放 redis 对象
202             jedisPool.returnBrokenResource(jedis);
203             e.printStackTrace();
204         } finally {
205             //返还到连接池
206             close(jedis);
207         }
208 
209         return value;
210     }
211 
212     public static void hdel(byte[] key, byte[] field) {
213 
214         Jedis jedis = null;
215         try {
216             jedis = jedisPool.getResource();
217             jedis.hdel(key, field);
218         } catch (Exception e) {
219             //释放 redis 对象
220             jedisPool.returnBrokenResource(jedis);
221             e.printStackTrace();
222         } finally {
223             //返还到连接池
224             close(jedis);
225         }
226     }
227 
228     /**
229      * 存储 REDIS 队列 顺序存储
230      * @param byte[] key reids 键名
231      * @param byte[] value 键值
232      */
233     public static void lpush(byte[] key, byte[] value) {
234 
235         Jedis jedis = null;
236         try {
237 
238             jedis = jedisPool.getResource();
239             jedis.lpush(key, value);
240 
241         } catch (Exception e) {
242 
243             //释放 redis 对象
244             jedisPool.returnBrokenResource(jedis);
245             e.printStackTrace();
246 
247         } finally {
248 
249             //返还到连接池
250             close(jedis);
251 
252         }
253     }
254 
255     /**
256      * 存储 REDIS 队列 反向存储
257      * @param byte[] key reids 键名
258      * @param byte[] value 键值
259      */
260     public static void rpush(byte[] key, byte[] value) {
261 
262         Jedis jedis = null;
263         try {
264 
265             jedis = jedisPool.getResource();
266             jedis.rpush(key, value);
267 
268         } catch (Exception e) {
269 
270             //释放 redis 对象
271             jedisPool.returnBrokenResource(jedis);
272             e.printStackTrace();
273 
274         } finally {
275 
276             //返还到连接池
277             close(jedis);
278 
279         }
280     }
281 
282     /**
283      * 将列表 source 中的最后一个元素 (尾元素) 弹出,并返回给客户端
284      * @param byte[] key reids 键名
285      * @param byte[] value 键值
286      */
287     public static void rpoplpush(byte[] key, byte[] destination) {
288 
289         Jedis jedis = null;
290         try {
291 
292             jedis = jedisPool.getResource();
293             jedis.rpoplpush(key, destination);
294 
295         } catch (Exception e) {
296 
297             //释放 redis 对象
298             jedisPool.returnBrokenResource(jedis);
299             e.printStackTrace();
300 
301         } finally {
302 
303             //返还到连接池
304             close(jedis);
305 
306         }
307     }
308 
309     /**
310      * 获取队列数据
311      * @param byte[] key 键名
312      * @return
313      */
314     public static List lpopList(byte[] key) {
315 
316         List list = null;
317         Jedis jedis = null;
318         try {
319 
320             jedis = jedisPool.getResource();
321             list = jedis.lrange(key, 0, -1);
322 
323         } catch (Exception e) {
324 
325             //释放 redis 对象
326             jedisPool.returnBrokenResource(jedis);
327             e.printStackTrace();
328 
329         } finally {
330 
331             //返还到连接池
332             close(jedis);
333 
334         }
335         return list;
336     }
337 
338     /**
339      * 获取队列数据
340      * @param byte[] key 键名
341      * @return
342      */
343     public static byte[] rpop(byte[] key) {
344 
345         byte[] bytes = null;
346         Jedis jedis = null;
347         try {
348 
349             jedis = jedisPool.getResource();
350             bytes = jedis.rpop(key);
351 
352         } catch (Exception e) {
353 
354             //释放 redis 对象
355             jedisPool.returnBrokenResource(jedis);
356             e.printStackTrace();
357 
358         } finally {
359 
360             //返还到连接池
361             close(jedis);
362 
363         }
364         return bytes;
365     }
366 
367     public static void hmset(Object key, Map hash) {
368         Jedis jedis = null;
369         try {
370             jedis = jedisPool.getResource();
371             jedis.hmset(key.toString(), hash);
372         } catch (Exception e) {
373             //释放 redis 对象
374             jedisPool.returnBrokenResource(jedis);
375             e.printStackTrace();
376 
377         } finally {
378             //返还到连接池
379             close(jedis);
380 
381         }
382     }
383 
384     public static void hmset(Object key, Map hash, int time) {
385         Jedis jedis = null;
386         try {
387 
388             jedis = jedisPool.getResource();
389             jedis.hmset(key.toString(), hash);
390             jedis.expire(key.toString(), time);
391         } catch (Exception e) {
392             //释放 redis 对象
393             jedisPool.returnBrokenResource(jedis);
394             e.printStackTrace();
395 
396         } finally {
397             //返还到连接池
398             close(jedis);
399 
400         }
401     }
402 
403     public static List hmget(Object key, String... fields) {
404         List result = null;
405         Jedis jedis = null;
406         try {
407 
408             jedis = jedisPool.getResource();
409             result = jedis.hmget(key.toString(), fields);
410 
411         } catch (Exception e) {
412             //释放 redis 对象
413             jedisPool.returnBrokenResource(jedis);
414             e.printStackTrace();
415 
416         } finally {
417             //返还到连接池
418             close(jedis);
419 
420         }
421         return result;
422     }
423 
424     public static Set hkeys(String key) {
425         Set result = null;
426         Jedis jedis = null;
427         try {
428             jedis = jedisPool.getResource();
429             result = jedis.hkeys(key);
430 
431         } catch (Exception e) {
432             //释放 redis 对象
433             jedisPool.returnBrokenResource(jedis);
434             e.printStackTrace();
435 
436         } finally {
437             //返还到连接池
438             close(jedis);
439 
440         }
441         return result;
442     }
443 
444     public static List lrange(byte[] key, int from, int to) {
445         List result = null;
446         Jedis jedis = null;
447         try {
448             jedis = jedisPool.getResource();
449             result = jedis.lrange(key, from, to);
450 
451         } catch (Exception e) {
452             //释放 redis 对象
453             jedisPool.returnBrokenResource(jedis);
454             e.printStackTrace();
455 
456         } finally {
457             //返还到连接池
458             close(jedis);
459 
460         }
461         return result;
462     }
463 
464     public static Map hgetAll(byte[] key) {
465         Map result = null;
466         Jedis jedis = null;
467         try {
468             jedis = jedisPool.getResource();
469             result = jedis.hgetAll(key);
470         } catch (Exception e) {
471             //释放 redis 对象
472             jedisPool.returnBrokenResource(jedis);
473             e.printStackTrace();
474 
475         } finally {
476             //返还到连接池
477             close(jedis);
478         }
479         return result;
480     }
481 
482     public static void del(byte[] key) {
483 
484         Jedis jedis = null;
485         try {
486             jedis = jedisPool.getResource();
487             jedis.del(key);
488         } catch (Exception e) {
489             //释放 redis 对象
490             jedisPool.returnBrokenResource(jedis);
491             e.printStackTrace();
492         } finally {
493             //返还到连接池
494             close(jedis);
495         }
496     }
497 
498     public static long llen(byte[] key) {
499 
500         long len = 0;
501         Jedis jedis = null;
502         try {
503             jedis = jedisPool.getResource();
504             jedis.llen(key);
505         } catch (Exception e) {
506             //释放 redis 对象
507             jedisPool.returnBrokenResource(jedis);
508             e.printStackTrace();
509         } finally {
510             //返还到连接池
511             close(jedis);
512         }
513         return len;
514     }
515 
516 }

Configuration 主要用于读取 redis 配置信息

package com.redis.util;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class Configuration extends Properties {

</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">static</span> final <span style="color: rgba(0, 0, 255, 1)">long</span> serialVersionUID = <span style="color: rgba(128, 0, 128, 1)">50440463580273222L</span><span style="color: rgba(0, 0, 0, 1)">;

</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">static</span> Configuration instance = <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span><span style="color: rgba(0, 0, 0, 1)"> synchronized Configuration getInstance() {
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (instance == <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
        instance </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Configuration();
    }
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> instance;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String getProperty(String key, String defaultValue) {
    String val </span>=<span style="color: rgba(0, 0, 0, 1)"> getProperty(key);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> (val == <span style="color: rgba(0, 0, 255, 1)">null</span> || val.isEmpty()) ?<span style="color: rgba(0, 0, 0, 1)"> defaultValue : val;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String getString(String name, String defaultValue) {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">.getProperty(name, defaultValue);
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">int</span> getInt(String name, <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> defaultValue) {
    String val </span>= <span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">.getProperty(name);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> (val == <span style="color: rgba(0, 0, 255, 1)">null</span> || val.isEmpty()) ?<span style="color: rgba(0, 0, 0, 1)"> defaultValue : Integer.parseInt(val);
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">long</span> getLong(String name, <span style="color: rgba(0, 0, 255, 1)">long</span><span style="color: rgba(0, 0, 0, 1)"> defaultValue) {
    String val </span>= <span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">.getProperty(name);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> (val == <span style="color: rgba(0, 0, 255, 1)">null</span> || val.isEmpty()) ?<span style="color: rgba(0, 0, 0, 1)"> defaultValue : Integer.parseInt(val);
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">float</span> getFloat(String name, <span style="color: rgba(0, 0, 255, 1)">float</span><span style="color: rgba(0, 0, 0, 1)"> defaultValue) {
    String val </span>= <span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">.getProperty(name);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> (val == <span style="color: rgba(0, 0, 255, 1)">null</span> || val.isEmpty()) ?<span style="color: rgba(0, 0, 0, 1)"> defaultValue : Float.parseFloat(val);
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">double</span> getDouble(String name, <span style="color: rgba(0, 0, 255, 1)">double</span><span style="color: rgba(0, 0, 0, 1)"> defaultValue) {
    String val </span>= <span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">.getProperty(name);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> (val == <span style="color: rgba(0, 0, 255, 1)">null</span> || val.isEmpty()) ?<span style="color: rgba(0, 0, 0, 1)"> defaultValue : Double.parseDouble(val);
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">byte</span> getByte(String name, <span style="color: rgba(0, 0, 255, 1)">byte</span><span style="color: rgba(0, 0, 0, 1)"> defaultValue) {
    String val </span>= <span style="color: rgba(0, 0, 255, 1)">this</span><span style="color: rgba(0, 0, 0, 1)">.getProperty(name);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> (val == <span style="color: rgba(0, 0, 255, 1)">null</span> || val.isEmpty()) ?<span style="color: rgba(0, 0, 0, 1)"> defaultValue : Byte.parseByte(val);
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Configuration() {
    InputStream </span><span style="color: rgba(0, 0, 255, 1)">in</span> = ClassLoader.getSystemClassLoader().getResourceAsStream(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">config.xml</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
        </span><span style="color: rgba(0, 0, 255, 1)">this</span>.loadFromXML(<span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)">);
        </span><span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)">.close();
    } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (IOException e) {
    }
}

}

测试 redis 队列

 1 package com.quene.test;
 2 
 3 import com.bean.Message;
 4 import com.bean.util.ObjectUtil;
 5 import com.redis.util.JedisUtil;
 6 
 7 public class TestRedisQuene {
 8     public static byte[] redisKey = "key".getBytes();
 9     static{
10         init();
11     }
12     public static void main(String[] args) {
13         pop();
14     }
15 
16     private static void pop() {
17         byte[] bytes = JedisUtil.rpop(redisKey);
18         Message msg = (Message) ObjectUtil.bytesToObject(bytes);
19         if(msg != null){
20             System.out.println(msg.getId()+"   "+msg.getContent());
21         }
22     }
23 
24     private static void init() {
25         Message msg1 = new Message(1, "内容 1");
26         JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg1));
27         Message msg2 = new Message(2, "内容 2");
28         JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg2));
29         Message msg3 = new Message(3, "内容 3");
30         JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg3));
31     }
32 
33 }
1
测试结果如下:
1   内容 1
2   内容 2
3   内容 3