分布式微服务-ZooKeeper的客户端常用命令-Java-API-操作
目录
分布式微服务–ZooKeeper的客户端常用命令 & Java API 操作
一、ZooKeeper 客户端常用命令
1. 启动与退出
bin/zkCli.sh -server 127.0.0.1:2181 # 连接客户端 quit # 退出客户端
2. 节点操作
# 查看子节点 ls / ls -s / ls /app # 查看节点详细信息 ls2 /app stat /app
# 创建节点 create /node1 "hello" # 持久节点 create -e /node2 "temp" # 临时节点 create -s /node3 "seq" # 顺序节点 create -e -s /node4 "tempSeq" # 临时顺序节点 创建node1下的子节点,不能node1和node1一起创建,必须创建了node1才能执行下面的否则报错 create /node1/node11 "hello1" #子节点
# 获取/修改数据 get /node1 set /node1 "world"
# 删除节点 delete /node1 # 删除无子节点的 deleteall /节点path #删除带有子节点的节点 rmr /node1 # 递归删除
3. Watch 监听
get /node1 true # 监听数据变化 ls / true # 监听子节点变化
⚠️ 监听是一次性的,触发后失效。
4. ACL 权限控制
getAcl /node1 setAcl /node1 world:anyone:rw
权限:
r
=读,w
=写,c
=创建,d
=删除,a
=管理。
模式:world
(所有人)、auth
、digest
(用户名密码)、ip
。5. 辅助命令
help # 查看帮助 history # 查看历史命令 redo <id> # 重做历史命令
二、ZooKeeper Java API 操作
1. 原生 API(org.apache.zookeeper.ZooKeeper)
需要的依赖
<!-- 原生 ZooKeeper 依赖 --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.8.3</version> </dependency>
代码实现
import org.apache.zookeeper.*; public class ZkDemo { public static void main(String[] args) throws Exception { // 1. 连接 ZooKeeper ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 30000, event -> { System.out.println("收到事件:" + event); }); // 2. 创建节点 zk.create("/node1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 3. 获取节点数据 byte[] data = zk.getData("/node1", false, null); System.out.println("节点数据:" + new String(data)); // 4. 修改节点数据 zk.setData("/node1", "world".getBytes(), -1); // 5. 获取子节点 System.out.println("子节点:" + zk.getChildren("/", false)); // 6. 删除节点 zk.delete("/node1", -1); // 7. 关闭连接 zk.close(); } }
2. Curator 客户端(推荐,简化 API)
需要的依赖
<!--curator--> <!-- Curator(如果你要用 Curator 封装的API) --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
⚠️
curator-framework
和curator-recipes
内部会依赖zookeeper
,所以一般不用单独引入zookeeper
依赖,除非你要控制 zk 版本。2.1 使用Curator实现增删改查的api
Curator 方法 ZooKeeper CLI 类似命令 create().forPath("/node")
create /node
getData().forPath("/node")
get /node
getChildren().forPath("/")
ls /
getData().storingStatIn(stat)
ls -s /node
setData().forPath("/node", data)
set /node data
delete().forPath("/node")
delete /node
delete().deletingChildrenIfNeeded()
rmr /node
代码实现:
public class CuratorTest { private CuratorFramework client; // Curator 客户端对象 /** * 建立连接 */ @Before public void testConnect() { /* * * @param connectString 连接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ /* //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); //1.第一种方式 CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);*/ //重试策略:初始等待3秒,重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式:通过builder方式构建客户端 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") // 设置ZK地址 .sessionTimeoutMs(60 * 1000) // 会话超时时间 .connectionTimeoutMs(15 * 1000) // 连接超时时间 .retryPolicy(retryPolicy) // 设置重试策略 // namespace("czq") // 设置命名空间,将 "czq" 作为客户端操作的根路径 // 这样之后创建节点时无需每次都写 /czq 前缀 // 例如:client.create().forPath("/node11", "hello1".getBytes()) // 实际会在 ZooKeeper 中创建 /czq/node11 节点 .namespace("czq") .build(); //开启连接 client.start(); } //==============================create============================================================================= /** * 创建节点:create 持久 临时 顺序 数据 * 1. 基本创建 :create().forPath("") * 2. 创建节点 带有数据:create().forPath("",data) * 3. 设置节点的类型:create().withMode().forPath("",data) * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data) */ @Test public void testCreate() throws Exception { //2. 创建节点 带有数据 //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储 String path = client.create().forPath("/app2", "hehe".getBytes()); System.out.println(path); } @Test public void testCreate2() throws Exception { //1. 基本创建 //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储 String path = client.create().forPath("/app1"); System.out.println(path); } @Test public void testCreate3() throws Exception { //3. 设置节点的类型 //默认类型:持久化 String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3"); System.out.println(path); } @Test public void testCreate4() throws Exception { //4. 创建多级节点 /app1/p1 //creatingParentsIfNeeded():如果父节点不存在,则创建父节点 String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1"); System.out.println(path); } //===========================get================================================================================ /** * 查询节点: * 1. 查询数据:get: getData().forPath() * 2. 查询子节点: ls /: getChildren().forPath() * 3. 查询节点状态信息:ls -s /:getData().storingStatIn(状态对象).forPath() */ @Test public void testGet1() throws Exception { //1. 查询数据:get byte[] data = client.getData().forPath("/app1"); System.out.println(new String(data)); } @Test public void testGet2() throws Exception { // 2. 查询子节点: ls / List<String> path = client.getChildren().forPath("/"); System.out.println(path); } @Test public void testGet3() throws Exception { Stat status = new Stat(); // 状态对象,用来存储节点元信息 System.out.println(status); //3. 查询节点状态信息:ls -s / //.storingStatIn(status)代表存储状态信息到status对象中 client.getData().storingStatIn(status).forPath("/app1"); System.out.println(status); } //===========================set================================================================================ /** * 修改数据 * 1. 基本修改数据:setData().forPath() * 2. 根据版本修改: setData().withVersion().forPath() * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。 * * @throws Exception */ @Test public void testSet() throws Exception { // 基本修改节点数据 client.setData().forPath("/app1", "itcast".getBytes()); } @Test public void testSetForVersion() throws Exception { Stat status = new Stat(); //3. 查询节点状态信息:ls -s //.storingStatIn(status)代表存储状态信息到status对象中 client.getData().storingStatIn(status).forPath("/app1"); int version = status.getVersion();//查询出来的节点版本 System.out.println(version); // 根据版本修改节点数据,保证并发安全 client.setData().withVersion(version).forPath("/app1", "hehe".getBytes()); } //===========================delete================================================================================ /** * 删除节点: delete deleteall * 1. 删除单个节点:delete().forPath("/app1"); * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1"); * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2"); * 4. 回调:inBackground * @throws Exception */ @Test public void testDelete() throws Exception { // 1. 删除单个节点 client.delete().forPath("/app1"); } @Test public void testDelete2() throws Exception { //2. 删除带有子节点的节点 client.delete().deletingChildrenIfNeeded().forPath("/app4"); } @Test public void testDelete3() throws Exception { //3. 必须成功的删除(自动重试保证删除成功) client.delete().guaranteed().forPath("/app2"); } @Test public void testDelete4() throws Exception { //4. 回调异步删除 client.delete().guaranteed().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("我被删除了~"); System.out.println(event); } }).forPath("/app1"); } @After public void close() { // 关闭客户端连接 if (client != null) { client.close(); } } }
2.2 使用Curator实现Watch事件监听的api
(1)Curator 2.x/4.x 常见的写法
使用者三个类(
NodeCache
、PathChildrenCache
、TreeCache
)
NodeCache
(监听一个节点自己)PathChildrenCache
(监听某个节点的直接子节点)TreeCache
(监听某个节点和所有子节点)从 Curator 5.x 开始,这三个类(
NodeCache
、PathChildrenCache
、TreeCache
)已经被 统一弃用,官方推荐用CuratorCache
来代替。
监听器 监听范围 典型应用场景 NodeCache
单个节点的数据变化 监听配置节点变化 PathChildrenCache
子节点的增删改,不监听本节点 监听服务节点上下线 TreeCache
节点及其所有子节点 全量配置或服务树监控 代码实现
public class CuratorWatcherTest { private CuratorFramework client; // Curator 客户端对象 /** * 建立连接 */ @Before public void testConnect() { /* * * @param connectString 连接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ /* //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); //1.第一种方式 CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);*/ //重试策略:初始等待3秒,重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式:通过builder方式构建客户端 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") // 设置ZK地址 .sessionTimeoutMs(60 * 1000) // 会话超时时间 .connectionTimeoutMs(15 * 1000) // 连接超时时间 .retryPolicy(retryPolicy) // 设置重试策略 // namespace("czq") // 设置命名空间,将 "czq" 作为客户端操作的根路径 // 这样之后创建节点时无需每次都写 /czq 前缀 // 例如:client.create().forPath("/node11", "hello1".getBytes()) // 实际会在 ZooKeeper 中创建 /czq/node11 节点 .namespace("czq") .build(); //开启连接 client.start(); } @After public void close() { if (client != null) { client.close(); // 关闭客户端连接 } } /** * 演示 NodeCache:给指定一个节点注册监听器 * NodeCache 只能监听某个具体节点的数据变化(新增/修改/删除) */ @Test public void testNodeCache() throws Exception { // 1. 创建 NodeCache 对象,监听 /app1 节点 final NodeCache nodeCache = new NodeCache(client,"/app1"); // 2. 注册监听器 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了~"); // 获取修改后的节点数据(如果节点被删除,这里可能会是 null) byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); // 3. 开启监听 // 参数 true 表示在启动监听时,立即加载一次缓存数据 nodeCache.start(true); // 阻塞住主线程,保证监听器一直生效 while (true){ } } /** * 演示 PathChildrenCache:监听某个节点的所有子节点 * 只能监听子节点的变化(新增/修改/删除),不能监听当前节点本身 */ @Test public void testPathChildrenCache() throws Exception { // 1. 创建 PathChildrenCache 监听对象 // 参数 true 表示对子节点数据进行缓存 PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true); // 2. 绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("子节点变化了~"); System.out.println(event); // 监听子节点数据变更 PathChildrenCacheEvent.Type type = event.getType(); if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("数据变了!!!"); byte[] data = event.getData().getData(); System.out.println(new String(data)); } } }); // 3. 开启监听 pathChildrenCache.start(); // 阻塞主线程,保证监听器一直生效 while (true){ } } /** * 演示 TreeCache:监听某个节点自己和它的所有子节点 * 相当于 NodeCache + PathChildrenCache 的结合体 */ @Test public void testTreeCache() throws Exception { // 1. 创建 TreeCache 监听对象 TreeCache treeCache = new TreeCache(client,"/app2"); // 2. 注册监听器 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("节点变化了"); System.out.println(event); } }); // 3. 开启监听 treeCache.start(); // 阻塞主线程,保证监听器一直生效 while (true){ } } }
(2)Curator 5.x以上常见的写法
从 Curator 5.x 开始,这三个类(
NodeCache
、PathChildrenCache
、TreeCache
)已经被 统一弃用,官方推荐用CuratorCache
来代替。(新版本)
方法名 对应旧API 监听范围 应用场景 testCuratorCacheNode
NodeCache 单节点数据变化 单个配置项 testCuratorCacheChildren
PathChildrenCache 子节点(不含父节点) 服务注册/发现 testCuratorCacheTree
TreeCache 节点 + 全部子节点 配置中心、全量监控 代码实现
public class CuratorWatcherTest { // Curator 客户端对象,用于操作 ZooKeeper private CuratorFramework client; /** * 建立连接 */ @Before public void testConnect() { /* * * @param connectString 连接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ /* //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); //1.第一种方式 CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);*/ //重试策略:初始等待3秒,重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式:通过builder方式构建客户端 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") // 设置ZK地址 .sessionTimeoutMs(60 * 1000) // 会话超时时间 .connectionTimeoutMs(15 * 1000) // 连接超时时间 .retryPolicy(retryPolicy) // 设置重试策略 // namespace("czq") // 设置命名空间,将 "czq" 作为客户端操作的根路径 // 这样之后创建节点时无需每次都写 /czq 前缀 // 例如:client.create().forPath("/node11", "hello1".getBytes()) // 实际会在 ZooKeeper 中创建 /czq/node11 节点 .namespace("czq") .build(); //开启连接 client.start(); } @After public void close() { // 测试完成后关闭客户端,释放资源 if (client != null) { client.close(); } } // ==============================替代 NodeCache============================================================================= /** * 1. 监听单个节点(替代 NodeCache) * 使用 CuratorCache + CuratorCacheListener 来代替旧的 NodeCache */ @Test public void testCuratorCacheNode() throws Exception { // 创建 CuratorCache,监听 /app1 节点 CuratorCache cache = CuratorCache.build(client, "/app1"); // 定义监听器,使用 forNodeCache 模式,只监听该节点数据变化 CuratorCacheListener listener = CuratorCacheListener.builder() .forNodeCache(new Runnable() { @Override public void run() { System.out.println("节点变化了~"); try { // 获取节点最新数据并打印 byte[] data = client.getData().forPath("/app1"); System.out.println("最新数据:" + new String(data)); } catch (Exception e) { e.printStackTrace(); } } }) .build(); // 将监听器绑定到缓存 cache.listenable().addListener(listener); // 开启缓存(监听) cache.start(); // 阻塞主线程,保证监听器一直运行 Thread.sleep(Long.MAX_VALUE); } // ==============================替代 PathChildrenCache============================================================================= /** * 2. 监听子节点变化(替代 PathChildrenCache) * 使用 CuratorCache + PathChildrenCacheListener 监听某节点的所有子节点 */ @Test public void testCuratorCacheChildren() throws Exception { // 创建 CuratorCache,监听 /app2 节点的子节点 CuratorCache cache = CuratorCache.build(client, "/app2"); // 定义监听器,forPathChildrenCache 表示只监听子节点的变化 CuratorCacheListener listener = CuratorCacheListener.builder() .forPathChildrenCache("/app2", client, new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("子节点变化了~"); System.out.println("类型:" + event.getType()); if (event.getData() != null) { // 打印节点路径和数据 System.out.println("节点:" + event.getData().getPath()); System.out.println("数据:" + new String(event.getData().getData())); } } }) .build(); // 将监听器绑定到缓存 cache.listenable().addListener(listener); // 开启缓存 cache.start(); // 阻塞主线程,保证监听器一直运行 Thread.sleep(Long.MAX_VALUE); } // ==============================替代 TreeCache============================================================================= /** * 3. 监听节点及其所有子节点(替代 TreeCache) * 使用 CuratorCache + TreeCacheListener 监听整个节点树 */ @Test public void testCuratorCacheTree() throws Exception { // 创建 CuratorCache,监听 /app2 节点及其子节点 CuratorCache cache = CuratorCache.build(client, "/app2"); // 定义监听器,forTreeCache 表示节点本身和子节点都监听 CuratorCacheListener listener = CuratorCacheListener.builder() .forTreeCache(client, new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("树节点变化了~"); System.out.println("类型:" + event.getType()); if (event.getData() != null) { // 打印节点路径和数据 System.out.println("节点:" + event.getData().getPath()); System.out.println("数据:" + new String(event.getData().getData())); } } }) .build(); // 将监听器绑定到缓存 cache.listenable().addListener(listener); // 开启缓存 cache.start(); // 阻塞主线程,保证监听器一直运行 Thread.sleep(Long.MAX_VALUE); } }
2.3 实现分布式锁
这里不做过多讲解详细去看我的另一篇博客: