一、前言
基于redis的客户端jedis分别基于其setnx(首次赋值返回1,其余的情况返回0的方式,且redis服务器端操作都是单线程队列操作的)、multi事务、watch监控器三种不同方式实现乐观锁,应用于在分布式高并发处理等相关场景。
二、代码示例
1. RedisLock类 - 其中 lock是基于setnx实现加锁、lock_2是基于multi事务的方式、lock_3是watch加Transaction,具体代码如下
import java.util.Random; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Transaction; public class RedisLock { //加锁标志 public static final String LOCKED = "TRUE" ; public static final long ONE_MILLI_NANOS = 1000000L; //默认超时时间(毫秒) public static final long DEFAULT_TIME_OUT = 3000 ; public static JedisPool pool; public static final Random r = new Random(); //锁的超时时间(秒),过期删除 public static final int EXPIRE = 5 * 60 ; private Jedis jedis; private String key; //锁状态标志 private boolean locked = false ; public RedisLock(String key) { this .key = key; this .jedis= new Jedis( "127.0.0.1" , 6379 , 60000 ); } /** * 通过jedis.setnx实现锁 * @param timeout * @return */ public boolean lock( long timeout) { long nano = System.nanoTime(); timeout *= ONE_MILLI_NANOS; try { while ((System.nanoTime() - nano) < timeout) { if (jedis.setnx(key, LOCKED) == 1 ) { jedis.expire(key, EXPIRE); locked = true ; return locked; } // 短暂休眠,nano避免出现活锁 Thread.sleep( 3 , r.nextInt( 500 )); } } catch (Exception e) { } return false ; } /** * 事务和管道都是异步模式。在事务和管道中不能同步查询结果,因此下面 t.getSet(key, LOCKED);只能被一个线程查询 * 否则线程获取不到 * @param timeout * @return */ public boolean lock_2( long timeout) { long nano = System.nanoTime(); timeout *= ONE_MILLI_NANOS; try { while ((System.nanoTime() - nano) < timeout) { Transaction t = jedis.multi(); // 开启事务,当server端收到multi指令 // 会将该client的命令放入一个队列,然后依次执行,知道收到exec指令 t.getSet(key, LOCKED); t.expire(key, EXPIRE); String ret = (String) t.exec().get( 0 ); System.out.println(Thread.currentThread()+ " ========== " +ret); if (ret!= null &&ret.equalsIgnoreCase( "TRUE" )) return true ; // if (ret == null || ret.equals("UNLOCK")) { // return true; // } // 短暂休眠,nano避免出现活锁 Thread.sleep( 3 , r.nextInt( 500 )); } } catch (Exception e) { } return false ; } public boolean lock_3( long timeout) { long nano = System.nanoTime(); timeout *= ONE_MILLI_NANOS; try { while ((System.nanoTime() - nano) < timeout) { jedis.watch(key); // 开启watch之后,如果key的值被修改,则事务失败,exec方法返回null String value = jedis.get(key); if (value == null || value.equals( "UNLOCK" )) { Transaction t = jedis.multi(); t.setex(key, EXPIRE, LOCKED); if (t.exec() != null ) { return true ; } } jedis.unwatch(); // 短暂休眠,nano避免出现活锁 Thread.sleep( 3 , r.nextInt( 500 )); } } catch (Exception e) { } return false ; } public boolean lock() { return lock(DEFAULT_TIME_OUT); } // 无论是否加锁成功,必须调用 public void unlock() { if (locked) jedis.del(key); } } |
2. RedisLockTest类 - 通过三个线程同时进行add的递减,通过加锁可以控制add按照顺序递减10,9,8,7..3,2,1,否则在多线程上下文切换的情况下无法正常打印
2.1 - 如果没有加锁测试类,代码如下
public class RedisLockTest { private static volatile int add= 10 ; public static void main(String[] args) { Runnable handler= new Runnable(){ @Override public void run() { while (add> 0 ){ System.out.println(Thread.currentThread().toString()+ " ———————— add@" +add); add--; try { Thread.sleep( 500 ); } catch (InterruptedException e) { } } }}; new Thread(handler).start(); new Thread(handler).start(); new Thread(handler).start(); } } |
控制台打印结果
Thread[Thread-0,5,main] ———————— add@10 Thread[Thread-1,5,main] ———————— add@10 Thread[Thread-2,5,main] ———————— add@8 Thread[Thread-0,5,main] ———————— add@7 Thread[Thread-1,5,main] ———————— add@6 Thread[Thread-2,5,main] ———————— add@6 Thread[Thread-0,5,main] ———————— add@4 Thread[Thread-1,5,main] ———————— add@4 Thread[Thread-2,5,main] ———————— add@4 Thread[Thread-1,5,main] ———————— add@1 Thread[Thread-0,5,main] ———————— add@1 |
2.2 setnx方式配置lock测试类,代码如下
public class RedisLockTest { private static volatile int add= 10 ; public static void main(String[] args) { Runnable handler= new Runnable(){ @Override public void run() { RedisLock mylock= new RedisLock( "testlock1" ); if (mylock.lock( 300000 )){ while (add> 0 ){ System.out.println(Thread.currentThread().toString()+ " ———————— add@" +add); add--; try { Thread.sleep( 500 ); } catch (InterruptedException e) { } } } mylock.unlock(); }}; new Thread(handler).start(); new Thread(handler).start(); new Thread(handler).start(); } } |
控制台打印结果如下
Thread[Thread-0,5,main] ———————— add@10 Thread[Thread-0,5,main] ———————— add@9 Thread[Thread-0,5,main] ———————— add@8 Thread[Thread-0,5,main] ———————— add@7 Thread[Thread-0,5,main] ———————— add@6 Thread[Thread-0,5,main] ———————— add@5 Thread[Thread-0,5,main] ———————— add@4 Thread[Thread-0,5,main] ———————— add@3 Thread[Thread-0,5,main] ———————— add@2 Thread[Thread-0,5,main] ———————— add@1 |
2.3 lock_3是watch加Transaction的方式测试类
public class RedisLockTest { private static volatile int add= 10 ; public static void main(String[] args) { Runnable handler= new Runnable(){ @Override public void run() { RedisLock mylock= new RedisLock( "testlock3" ); if (mylock.lock_3( 300000 )){ while (add> 0 ){ System.out.println(Thread.currentThread().toString()+ " ———————— add@" +add); add--; try { Thread.sleep( 500 ); } catch (InterruptedException e) { } } } mylock.unlock(); }}; new Thread(handler).start(); new Thread(handler).start(); new Thread(handler).start(); } } |
控制台结果
Thread[Thread-2,5,main] ———————— add@10 Thread[Thread-2,5,main] ———————— add@9 Thread[Thread-2,5,main] ———————— add@8 Thread[Thread-2,5,main] ———————— add@7 Thread[Thread-2,5,main] ———————— add@6 Thread[Thread-2,5,main] ———————— add@5 Thread[Thread-2,5,main] ———————— add@4 Thread[Thread-2,5,main] ———————— add@3 Thread[Thread-2,5,main] ———————— add@2 Thread[Thread-2,5,main] ———————— add@1 |