redis----java操作redis
添加 jar 包
1 2 3 4 5 | <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version> 3.0 . 1 </version> </dependency> |
简单操作
1 2 3 4 5 6 7 8 9 10 11 | public class Myredis { public static void main(String[] args) { Jedis jedis = new Jedis( "127.0.0.1" , 6379 ); jedis.set( "v5" , "k5" ); //事务 Transaction multi = jedis.multi(); multi.set( "k1" , "k2" ); multi.exec(); } } |
首先不使用 watch
1 2 3 4 5 6 7 8 9 10 11 12 | public class Myredis { public static Boolean transfer( int transfernum){ Jedis jedis = new Jedis( "127.0.0.1" , 6379 ); Transaction multi = jedis.multi(); multi.decrBy( "num" ,transfernum); multi.exec(); return true ; } public static void main(String[] args) { Myredis.transfer( 20 ); } } |
测试
1、首先 debug 代码,停留在了 exec() 之前,还没有提交事务
2、查询到 redis 中的 num 为 80
3、手动修改 num 为 200
4、放行代码,执行 java 全部的代码
5、发现 num 是按照 200 来有重新计算的
6、总结,当执行 exec() 时,消息队列中的代码才真正被执行,注意 redis 不保证原子性,进入队列的代码分开执行,不同队列的代码执行报错对其他队列中的代码不影响。
所以在执行 exec() 过程中,如果出现 num 被修改了,就会发生数据不对的问题,所以我们需要使用 watch
乐观锁
使用 watch 来监听 key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public class Myredis { private static int x = 10 ; private static Jedis jedis = new Jedis( "127.0.0.1" , 6379 ); public static Boolean transfer(Long transfernum){ jedis.watch( "num" ); Transaction multi = jedis.multi(); multi.decrBy( "num" ,transfernum); List<Object> exec = multi.exec(); //如果返回值是null,表示事务被终止 System.out.println(exec); jedis.close(); return true ; } public static void main(String[] args) { Myredis.transfer(20L); } } |
测试结果,如果外部对 num 进行修改,watch 就会监听到,直接终止事务提交.
可以采用循环来重复获取新的值(可以设置可以循环次数,如果在循环次数内没有成功,就退出)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public class Myredis { private static int x = 10 ; private static Jedis jedis = new Jedis( "127.0.0.1" , 6379 ); public static Boolean transfer(Long transfernum){ jedis.watch( "num" ); List<Object> exec = null ; while (exec== null ||exec.size()== 0 ){ Transaction multi = jedis.multi(); multi.decrBy( "num" ,transfernum); exec = multi.exec(); } return true ; } public static void main(String[] args) { Myredis.transfer(20L); } } |
读写分离
1 2 3 4 5 6 7 8 9 10 11 | public class Myredis { public static void main(String[] args) { Jedis jedis_Master = new Jedis( "127.0.0.1" , 6379 ); Jedis jedis_Slave = new Jedis( "127.0.0.1" , 6379 ); jedis_Slave.slaveof( "127.0.0.1" , 6379 ); //主机用来写 jedis_Master.set( "k1" , "v1" ); //从机用来读 String k1 = jedis_Slave.get( "k1" ); } } |
JedisPool
JedisPool 的配置参数大部分是由 JedisPoolConfig 的对应项来赋值的。
maxActive:控制一个 pool 可分配多少个 jedis 实例,通过 pool.getResource()来获取;如果赋值为 -1,则表示不限制;如果 pool 已经分配了 maxActiye 个 jedis 实例,则此时 pool 的状态为 exhausted。
maxIdle:控制一个 pool 最少有多少个状态为 idle(空闲)的 jedis 实例;whenExhaustedAction:表示当 pool 中的 jedis 实例都被 allocated 完时,pool 要采取的操作;默认有三种。
WHEN_EXHAUSTED_FAIL--> 表示无 jedis 实例时,直接抛出 NoSuchElementException;WHEN_EXHAUSTED_BLOCK--> 则表示阻塞住,或者达到 maxWait 时抛出 JedisConnectionException;WHEN_EXHAUSTED_GROW--> 则表示新建一个 jedis 实例,也就说设置的 maxActive 无用;
maxwait:表示当 borrow 一个 jedis 实例时,最大的等待时间,如果超过等待时间,则直接抛 jedisConnectionException;
testOnBorrow:获得一个 jedis 实例的时候是否检查连接可用性(ping());如果为 true,则得到的 jedis 实例均是可用的;
testonReturn:return 一个 jedis 实例给 pool 时,是否检查连接可用性(ping());
简单版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | public class Myredis { public static void main(String[] args) { JedisPool jedisPoll = MyJedisPoll.getJedisPoll(); Jedis jedis = jedisPoll.getResource(); MyJedisPoll.release(jedis); } } class MyJedisPoll{ private static volatile JedisPool jedisPool= null ; //获取连接池 public static JedisPool getJedisPoll(){ if (jedisPool== null ){ synchronized (MyJedisPoll. class ){ if (jedisPool== null ){ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal( 1000 ); jedisPoolConfig.setMaxIdle( 4 ); jedisPoolConfig.setMaxWaitMillis( 100 * 1000 ); jedisPoolConfig.setTestOnBorrow( true ); jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1" , 6379 ); } } return jedisPool; } else { return jedisPool; } } //释放连接 public static void release(Jedis jedis){ //如果连接池已经关闭了,则返回-1,最大活跃数不会超过MAX_ACTIVE,最大空闲数不会超过MAX_OLDE System.out.println(jedisPool.getNumWaiters()+ "链接归还前活跃数:" +jedisPool.getNumActive()+ "空闲连接数:" +jedisPool.getNumIdle()); jedis.close(); System.out.println(jedisPool.getNumWaiters()+ "链接归还后活跃数:" +jedisPool.getNumActive()+ "空闲连接数:" +jedisPool.getNumIdle()); } } |
复杂版本
下面的版本我觉得有一点需要改进 getJedis 的时候锁加的位置不对!只需要锁住实例化 initpoll,注意双重判断(性能问题)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | public class RedisConnectPollUtil{ private static final Log LOG = LogFactory.getLog(RedisConnectPollUtil. class ); //redis获取链接的并发锁 private static ReentrantLock redisPollLock= new ReentrantLock(); //连接redis实例的ip private static final String REDIS_ADDRESS = "localhost" ; //连接redis实例的端口 private static final int PORT = 6379 ; //多线程环境中,连接实例的最大数,如果设为-1则无上线,建议设置,否则有可能导致资源耗尽 private static final int MAX_ACTIVE = 8 ; //在多线程环境中,连接池中最大空闲连接数,单线程环境没有实际意义 private static final int MAX_OLDE = 4 ; //在多线程环境中,连接池中最小空闲连接数 private static final int MIN_OLDE = 1 ; //多长时间将空闲线程进行回收,单位毫秒 private static final int METM = 2000 ; //对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略) private static final int SMETM = 2000 ; //逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1,只有运行了此线程,MIN_OLDE METM/SMETM才会起作用 private static final int TBERM = 1000 ; //当连接池中连接不够用时,等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException; private static final int MAX_WAIT = 1000 ; //超时时间,单位毫秒 private static final int TIME_OUT = 5000 ; //在借用一个jedis连接实例时,是否提前进行有效性确认操作;如果为true,则得到的jedis实例均是可用的; private static final boolean TEST_ON_BORROW = false ; //连接池实例 private static JedisPool jedisPool = null ; //初始化连接池,有好多重载的构造函数,根据自己业务实际需要来实例化JedisPoll private static void initPoll() { try { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(MAX_ACTIVE); config.setMaxIdle(MAX_OLDE); config.setMaxWaitMillis(MAX_WAIT); config.setTestOnBorrow(TEST_ON_BORROW); config.setMinIdle(MIN_OLDE); // config.setMinEvictableIdleTimeMillis(METM); config.setSoftMinEvictableIdleTimeMillis(SMETM); config.setTimeBetweenEvictionRunsMillis(TBERM); jedisPool = new JedisPool(config, REDIS_ADDRESS, PORT, TIME_OUT); } catch (Exception e) { LOG.error( "initial JedisPoll fail:" ,e); } } //获取jedis连接实例 public static Jedis getJedis() { redisPollLock.lock(); if (jedisPool == null ) { initPoll(); } Jedis jedis = null ; try { if (jedisPool != null ) { jedis = jedisPool.getResource(); } } catch (Exception e) { LOG.error( "get jedis fail:" ,e); } finally { redisPollLock.unlock(); } return jedis; } //归还jedis实例,2.9版本后jedisPool.returnResource(jedis);过期,被close替代,源码如下 /* @Override public void close() { if (dataSource != null) { if (client.isBroken()) { this.dataSource.returnBrokenResource(this); } else { this.dataSource.returnResource(this); } } else { client.close(); } } */ //如果每次获取了jedis连接后不进行归还,redis不会自动回收,那么获取的最多连接数量为MAX_ACTIVE //超出数量则会抛出异常redis.clients.jedis.exceptions.JedisException: Could not get a resource from the pool public static void returnSource(Jedis jedis) { if (jedis != null ) { //如果连接池已经关闭了,则返回-1,最大活跃数不会超过MAX_ACTIVE,最大空闲数不会超过MAX_OLDE System.out.println(jedisPool.getNumWaiters()+ "链接归还前活跃数:" +jedisPool.getNumActive()+ "空闲连接数:" +jedisPool.getNumIdle()); jedis.close(); System.out.println(jedisPool.getNumWaiters()+ "链接归还后活跃数:" +jedisPool.getNumActive()+ "空闲连接数:" +jedisPool.getNumIdle()); } } private static int k = 0 ; public static void main(String[] args) { for ( int i= 0 ;i< 20 ;i++) { new Thread( new Runnable() { @Override public void run() { Jedis jedis = getJedis(); System.out.println( "第" +(k++)+ "次" +jedis.lpop( "pageList" )); returnSource(jedis); //判断此连接是否还有效,有效返回true,否则返回false //连接归还后,将不可用,会抛出redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream. if (!jedis.isConnected()) { jedis.lpop( "pageList" ); } // jedisPool.close(); jedisPoll关闭后将导致池不可用 // System.out.println("jedispoll是否关闭了?"+jedisPool.isClosed()); } }).start(); } try { //主线程等待一定时间,否则会发生线程执行时效错乱问题 Thread.currentThread().sleep( 15000 ); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(jedisPool.getNumWaiters()+ "最终链接归还后活跃数:" +jedisPool.getNumActive()+ "空闲连接数:" +jedisPool.getNumIdle()); destroy(); } //在运用正常运行时,通常是不会手动调用jedisPool.close();池内将保持最大空闲数的连接,如果设置了逐出策略 //那么池内就会保留最小空闲连接,如果应用突然关闭,我们需要在bean销毁时将连接池销毁. public static void destroy(){ if (jedisPool != null ) { try { jedisPool.destroy(); } catch (Exception e) { LOG.error( "jedisPool destroy fail " ,e); } } } } |
最后任务
之后有时间需要看源码了解 watch 是如何实现监控的,和 exec 如果实现执行队列中的代码的......