先复习一下 ZK 实现分布式锁的原理:
每个客户端对某个方法加锁时,在 zookeeper 上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
通过代码验证是否生成了瞬时的有序节点
package com.jv.zookeeper.curator;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class TestInterProcessMutex {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.245.101:2181", retryPolicy);
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/mylock");
//lock.acquire(1000, TimeUnit.MILLISECONDS) 获取锁,超时时间为1000毫秒
if ( lock.acquire(1000, TimeUnit.MILLISECONDS) )
{
try
{
System.out.println("得到锁,并执行");
//模拟线程需要执行很长时间,观察ZK中/mylock下的临时ZNODE情况
Thread.sleep(10000000);
}
finally
{
lock.release();
System.out.println("释放锁");
}
}
}
}
package com.jv.zookeeper.curator;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class TestInterProcessMutex2 {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.245.101:2181", retryPolicy);
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/mylock");
//将超时时间设置足够长,观察ZK中ZNODE的情况,以验证分布式锁的原理是否是使用建立临时顺序ZNODE实现的
if ( lock.acquire(1000000, TimeUnit.MILLISECONDS) )
{
try
{
System.out.println("得到锁,并执行");
Thread.sleep(10000000);
}
finally
{
lock.release();
System.out.println("释放锁");
}
}
}
}
要把代码跑起来,在 pom.xml 中加入如下依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
先运行 TestInterProcessMutex,在运行 TestInterProcessMutex2
使用 xshell 或者 securityCRT 登录 zookeeper 主机
进入到 zookeeper 的安装目录 /bin
./zkCli.sh
ls /mylock
可以看到确实生成了两个瞬时有序节点,并且序号小的客户端获得了锁
curator 封装过后使用确实很方便
补充一点,curator 还可以很方便的实现选举
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{
public void takeLeadership(CuratorFramework client) throws Exception
{
// 这是你变成leader时执行的方法,你可以在这里执行leader的所有操作
// 如果你想放弃leader,你必须退出此方法
}
}
LeaderSelector selector = new LeaderSelector(client, path, listener);
selector.autoRequeue(); // not required, but this is behavior that you will probably expect
selector.start();
它的原理就是包装了 InterProcessMutex,然后 LeaderSelector 跑起来之后就去获取锁,一旦获取到锁就调用 listener.takeLeadership 方法
这种选举还是有点太简单了,没有去考虑资源、数据问题。zk 本身的选举就需要考虑参考事务 ID 的大小,拥有最大事务 ID 的服务器才能是 leader,然后 follower 同步 leader 中比自己更大的事务,达到数据一致
实际应用的话,需要考虑分布式组件的情况,选择是否使用 ZK 提供的简单选举策略