Java Redis Pipeline 使用示例
1. 参考的优秀文章
2. 来源
原来,系统中一个树结构的数据来源是 Redis,由于数据增多、业务复杂,查询速度并不快。究其原因,是单次查询的数量太多了,一个树结构,大概要几万次 Redis 的交互。于是,尝试用 Redis 的 Pipelining 特性。
3. 测试 Pipelining 使用与否的差别
3.1. 不使用 pipelining
首先,不使用 pipelining,插入 10w 条记录,再删除 10w 条记录,看看需要多久。
首先来个小程序,用于计算程序消耗的时间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | import java.util.Date; import java.util.concurrent.TimeUnit; public class TimeLag { private Date start; private Date end; public TimeLag() { start = new Date(); } public String cost() { end = new Date(); long c = end.getTime() - start.getTime(); String s = new StringBuffer().append( "cost " ).append(c).append( " milliseconds (" ).append(c / 1000 ).append( " seconds)." ).toString(); return s; } public static void main(String[] args) throws InterruptedException { TimeLag t = new TimeLag(); TimeUnit.SECONDS.sleep( 2 ); System.out.println(t.cost()); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | package com.nicchagil.study.jedis; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class HowToTest { public static void main(String[] args) { // 连接池 JedisPool jedisPool = new JedisPool( "192.168.1.9" , 6379 ); /* 操作Redis */ Jedis jedis = null ; try { jedis = jedisPool.getResource(); TimeLag t = new TimeLag(); System.out.println( "操作前,全部Key值:" + jedis.keys( "*" )); /* 插入多条数据 */ for (Integer i = 0 ; i < 100000 ; i++) { jedis.set(i.toString(), i.toString()); } /* 删除多条数据 */ for (Integer i = 0 ; i < 100000 ; i++) { jedis.del(i.toString()); } System.out.println( "操作前,全部Key值:" + jedis.keys( "*" )); System.out.println(t.cost()); } finally { if (jedis != null ) { jedis.close(); } } } } |
日志,Key 值“user_001”是我的 Redis 存量的值,忽略即可:
1 2 3 | 操作前,全部Key值:[user_001] 操作前,全部Key值:[user_001] cost 35997 milliseconds ( 35 seconds). |
3.2. 使用 pipelining
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | package com.nicchagil.study.jedis; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; public class HowToTest { public static void main(String[] args) { // 连接池 JedisPool jedisPool = new JedisPool( "192.168.1.9" , 6379 ); /* 操作Redis */ Jedis jedis = null ; try { jedis = jedisPool.getResource(); TimeLag t = new TimeLag(); System.out.println( "操作前,全部Key值:" + jedis.keys( "*" )); Pipeline p = jedis.pipelined(); /* 插入多条数据 */ for (Integer i = 0 ; i < 100000 ; i++) { p.set(i.toString(), i.toString()); } /* 删除多条数据 */ for (Integer i = 0 ; i < 100000 ; i++) { p.del(i.toString()); } p.sync(); System.out.println( "操作前,全部Key值:" + jedis.keys( "*" )); System.out.println(t.cost()); } finally { if (jedis != null ) { jedis.close(); } } } } |
日志:
1 2 3 | 操作前,全部Key值:[user_001] 操作前,全部Key值:[user_001] cost 629 milliseconds ( 0 seconds). |
4. 为什么 Pipelining 这么快?
先看看原来的多条命令,是如何执行的:
1 2 3 4 5 6 7 | sequenceDiagram Redis Client->>Redis Server: 发送第 1 个命令 Redis Server->>Redis Client: 响应第 1 个命令 Redis Client->>Redis Server: 发送第 2 个命令 Redis Server->>Redis Client: 响应第 2 个命令 Redis Client->>Redis Server: 发送第n个命令 Redis Server->>Redis Client: 响应第n个命令 |
Pipeling 机制是怎样的呢:
1 2 3 4 5 6 | sequenceDiagram Redis Client->>Redis Server: 发送第 1 个命令(缓存在Redis Client,未即时发送) Redis Client->>Redis Server: 发送第 2 个命令(缓存在Redis Client,未即时发送) Redis Client->>Redis Server: 发送第n个命令(缓存在Redis Client,未即时发送) Redis Client->>Redis Server: 发送累积的命令 Redis Server->>Redis Client: 响应第 1 、 2 、n个命令 |
5. Pipelining 的局限性(重要!)
基于其特性,它有两个明显的局限性:
- 鉴于 Pipepining 发送命令的特性,Redis 服务器是以队列来存储准备执行的命令,而队列是存放在有限的内存中的,所以不宜一次性发送过多的命令。如果需要大量的命令,可分批进行,效率不会相差太远滴,总好过内存溢出嘛 ~~
- 由于 pipeline 的原理是收集需执行的命令,到最后才一次性执行。所以无法在中途立即查得数据的结果(需待 pipelining 完毕后才能查得结果),这样会使得无法立即查得数据进行条件判断(比如判断是非继续插入记录)。
比如,以下代码中,response.get()
在p.sync();
完毕前无法执行,否则,会报异常
1 | redis.clients.jedis.exceptions.JedisDataException: Please close pipeline or multi block before calling this method. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | package com.nicchagil.study.jedis; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; import redis.clients.jedis.Response; public class HowToTest { public static void main(String[] args) { // 连接池 JedisPool jedisPool = new JedisPool( "192.168.1.9" , 6379 ); /* 操作Redis */ Jedis jedis = null ; try { jedis = jedisPool.getResource(); TimeLag t = new TimeLag(); System.out.println( "操作前,全部Key值:" + jedis.keys( "*" )); Pipeline p = jedis.pipelined(); /* 插入多条数据 */ for (Integer i = 0 ; i < 100000 ; i++) { p.set(i.toString(), i.toString()); } Response<String> response = p.get( "999" ); // System.out.println(response.get()); // 执行报异常:redis.clients.jedis.exceptions.JedisDataException: Please close pipeline or multi block before calling this method. /* 删除多条数据 */ for (Integer i = 0 ; i < 100000 ; i++) { p.del(i.toString()); } p.sync(); System.out.println(response.get()); System.out.println( "操作前,全部Key值:" + jedis.keys( "*" )); System.out.println(t.cost()); } finally { if (jedis != null ) { jedis.close(); } } } } |
6. 如何使用 Pipelining 查询大量数据
用Map<String, Response<String>>
先将Response
缓存起来再使用就 OK 了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | package com.nicchagil.study.jedis; import java.util.HashMap; import java.util.Map; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; import redis.clients.jedis.Response; public class GetMultiRecordWithPipelining { public static void main(String[] args) { // 连接池 JedisPool jedisPool = new JedisPool( "192.168.1.9" , 6379 ); /* 操作Redis */ Jedis jedis = null ; Map<String, Response<String>> map = new HashMap<String, Response<String>>(); try { jedis = jedisPool.getResource(); TimeLag t = new TimeLag(); // 开始计算时间 Pipeline p = jedis.pipelined(); /* 插入多条数据 */ for (Integer i = 0 ; i < 100000 ; i++) { if (i % 2 == 1 ) { map.put(i.toString(), p.get(i.toString())); } } p.sync(); /* 由Response对象获取对应的值 */ Map<String, String> resultMap = new HashMap<String, String>(); String result = null ; for (String key : map.keySet()) { result = map.get(key).get(); if (result != null && result.length() > 0 ) { resultMap.put(key, result); } } System.out.println( "get record num : " + resultMap.size()); System.out.println(t.cost()); // 计时结束 } finally { if (jedis != null ) { jedis.close(); } } } } |