ZooKeeper 学习
阅读<从 Paxos="" 到ZooKeeper="" 分布式一致性原理与实践=""> 做的笔记从>
集群安装
-
1、修改 zoo.cfg
修改%ZK_HOME%/conf/zoo.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper-3.4.5/zkData clientPort=2181 server.1=s1:2888:3888 server.2=s2:2888:3888 server.3=s3:2888:3888
-
2、在 dataDir 中创建 myid
即在/usr/local/zookeeper-3.4.5/zkData下创建 myid。
- myid 文件中只有一个数字,server.1的 myid 文件内容是 “1”。
- myid 文件中的数字和 zoo.cfg 中 server.id=host:port:port的 id 一致。
- id 的范围为 1~255。
-
3、为其他机器都配置上 zoo.cfg和 myid 文件。
-
4、启动
在%ZK_HOME%/bin 目录下执行下面命令
./zkServer.sh start 看到下面内容 JMX enabled by default Using config: /usr/local/zookeeper-3.4.5/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
-
5、验证
使用 telnet 和 stat 验证是否正常启动。
telnet 192.168.7.30 2181 Trying 192.168.7.30... Connected to 192.168.7.30. Escape character is '^]'. stat Zookeeper version: 3.4.5-1392090, built on 09/30/2012 17:52 GMT Clients: /192.168.7.209:50054[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 1 Sent: 0 Connections: 1 Outstanding: 0 Zxid: 0x10000000d Mode: follower Node count: 4 Connection closed by foreign host.
单机模式安装
-
1、修改 zoo.cfg
修改%ZK_HOME%/conf/zoo.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper-3.4.5/zkData clientPort=2181 server.1=127.0.0.1:2888:3888
-
2、在 dataDir 中创建 myid
即在/usr/local/zookeeper-3.4.5/zkData下创建 myid。
- myid 文件中只有一个数字,server.1的 myid 文件内容是 “1”。
-
3、启动
在%ZK_HOME%/bin 目录下执行下面命令
./zkServer.sh start 看到下面内容 JMX enabled by default Using config: /usr/local/zookeeper-3.4.5/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
-
4、验证
使用 telnet 和 stat 验证是否正常启动。
telnet 127.0.0.1 2181 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. stat Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT Clients: /127.0.0.1:50073[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 1 Sent: 0 Connections: 1 Outstanding: 0 Zxid: 0x1f Mode: standalone Node count: 5 Connection closed by foreign host.
客户端脚本
- 连接脚本
./zkCli.sh
连接指定的 zk 服务器
./zkCli.sh -server 192.168.7.30:2181
-
创建
使用 create 命令,创建一个 zk 节点
create [-s] [-e] path data acl
-s 或 -e 分别指定节点特性:顺序或临时节点。默认情况下,创建的是持久节点。
[zk: 192.168.7.30:2181(CONNECTED) 0] create /zk-book 123 Created /zk-book
-
读取
使用 ls 命令和 get 命令
使用 ls,读取 zk 指定节点下的所有子节点。
ls path [watch]
[zk: 192.168.7.30:2181(CONNECTED) 1] ls / [zk-book, zookeeper]
使用 get,获取指定节点的数据内容和属性信息
get path [watch]
[zk: 192.168.7.30:2181(CONNECTED) 2] get /zk-book 123 cZxid = 0x300000004 ctime = Sun Dec 22 21:31:03 CST 2019 mZxid = 0x300000004 mtime = Sun Dec 22 21:31:03 CST 2019 pZxid = 0x300000004 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 3 numChildren = 0
-
更新
使用 set 命令,更新指定节点的数据内容。
set path data [version]
[zk: 192.168.7.30:2181(CONNECTED) 3] set /zk-book 456 cZxid = 0x300000004 ctime = Sun Dec 22 21:31:03 CST 2019 mZxid = 0x300000005 mtime = Sun Dec 22 21:37:33 CST 2019 pZxid = 0x300000004 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 3 numChildren = 0
-
删除
使用 delete命令,删除指定节点。
delete path [version]
[zk: 192.168.7.30:2181(CONNECTED) 4] delete /zk-book
删除某个节点,该节点必须没有子节点。
[zk: 192.168.7.30:2181(CONNECTED) 8] create /zk-book 123 Created /zk-book [zk: 192.168.7.30:2181(CONNECTED) 9] create /zk-book/child 345 Created /zk-book/child [zk: 192.168.7.30:2181(CONNECTED) 10] delete /zk-book Node not empty: /zk-book [zk: 192.168.7.30:2181(CONNECTED) 11] delete /zk-book/child [zk: 192.168.7.30:2181(CONNECTED) 12] delete /zk-book
Java客户端 API 使用
-
创建一个最基本的 zk 会话实例
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.9</version> </dependency>
package chap5; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; public class Zk_Usage_Simple implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws IOException { ZooKeeper zooKeeper = new ZooKeeper("192.168.7.30:2181", 5000, new Zk_Usage_Simple()); System.out.println(zooKeeper.getState()); try { connectedSemaphore.await(); } catch (InterruptedException e) { System.out.println("zk session established"); } } public void process(WatchedEvent event) { System.out.println("Received watched event: " + event); if (KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } }
-
同步 API 创建节点
package chap5; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class Zk_Create_Sync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = new ZooKeeper("192.168.7.30:2181", 5000, new Zk_Create_Sync_Usage()); connectedSemaphore.await(); // 创建临时节点 String path = zooKeeper.create("/zk-test-ephemeral-", "zk-test-ephemeral-".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Success create zNode :" + path); // 创建临时节点并自动后缀加上数字 String path2 = zooKeeper.create("/zk-test-ephemeral-", "zk-test-ephemeral-".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Success create zNode :" + path2); } public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { connectedSemaphore.countDown(); } } }
Success create zNode :/zk-test-ephemeral- Success create zNode :/zk-test-ephemeral-0000000007
-
异步 API 创建节点
package chap5; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class Zk_Create_ASync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = new ZooKeeper("192.168.7.30:2181", 5000, new Zk_Create_ASync_Usage()); connectedSemaphore.await(); zooKeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), "I am context"); zooKeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), "I am context"); zooKeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), "I am context"); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { connectedSemaphore.countDown(); } } } class IStringCallback implements AsyncCallback.StringCallback { public void processResult(int rc, String path, Object ctx, String name) { System.out.println("Create path result : [" + rc + ", " + path + ", " + ctx + ", " + name); } }
Create path result : [0, /zk-test-ephemeral-, I am context, /zk-test-ephemeral- Create path result : [-110, /zk-test-ephemeral-, I am context, null Create path result : [-110, /zk-test-ephemeral-, I am context, null
和同步接口方法创建节点最大的区别在于,节点的创建过程(包括网络通信和服务端的节点创建过程)是异步的。并且,异步接口不会抛出异常,所有的异常都在回调中通过 Result Code 来体现。
-
删除节点
package chap5; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class ZKDeleteSyncUsage implements Watcher { private static final CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zk = new ZooKeeper("192.168.7.30:2181", 5000, new ZKDeleteSyncUsage()); connectedSemaphore.await(); String path = "/zk-delete-node"; zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // if the given version is -1, it matches any node's versions zk.delete(path, -1); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { connectedSemaphore.countDown(); } } }
在 zk 中,只允许删除子节点,如果一个节点有子节点,必须先删除掉所有子节点才能删除。
-
读取数据
读取数据,包括子节点列表的获取和节点数据的获取。
package chap5; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class ZKGetChildrenSyncUsage implements Watcher { private static final CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk = null; public static void main(String[] args) throws Exception { zk = new ZooKeeper("192.168.7.30:2181", 5000, new ZKGetChildrenSyncUsage()); connectedSemaphore.await(); String path = "/zk-book"; zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); List<String> childrenList = zk.getChildren(path, true); System.out.println("childrenList :" + childrenList); zk.create(path + "/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { if (EventType.None == event.getType() && event.getPath() == null) { connectedSemaphore.countDown(); } else if (event.getType() == EventType.NodeChildrenChanged) { try { System.out.println("ReGet child:" + zk.getChildren(event.getPath(), true)); } catch (Exception e) { } } } } }
获取数据
package chap5; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class ZKGetDataSyncUsage implements Watcher { private static final CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk = null; private static Stat stat = new Stat(); public static void main(String[] args) throws Exception { zk = new ZooKeeper("192.168.7.30:2181", 5000, new ZKGetDataSyncUsage()); connectedSemaphore.await(); String path = "/zk-book"; zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(new String(zk.getData(path, true, stat))); System.out.println(stat.getCzxid() + " " + stat.getMzxid() + " " + stat.getVersion()); zk.setData(path, "123".getBytes(), -1); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { if (EventType.None == event.getType() && event.getPath() == null) { connectedSemaphore.countDown(); } else if (event.getType() == EventType.NodeDataChanged) { try { System.out.println(new String(zk.getData(event.getPath(), true, stat))); System.out.println(stat.getCzxid() + " " + stat.getMzxid() + " " + stat.getVersion()); } catch (Exception e) { // EMPTY } } } } }
节点的内容或是节点数据版本变化,都被看作节点变化
-
更新数据
package chap5; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class ZKSetDataSyncUsage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk = null; public static void main(String[] args) throws Exception { zk = new ZooKeeper("192.168.7.30:2181", 5000, new ZKSetDataSyncUsage()); connectedSemaphore.await(); String path = "/zk-book"; zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zk.getData(path, true, null); Stat stat = zk.setData(path, "456".getBytes(), -1); System.out.println( "Czxid = " + stat.getCzxid() + " Mzxid = " + stat.getMzxid() + " Version = " + stat.getVersion()); Stat stat2 = zk.setData(path, "456".getBytes(), stat.getVersion()); System.out.println( "Czxid = " + stat.getCzxid() + " Mzxid = " + stat.getMzxid() + " Version = " + stat2.getVersion()); try { zk.setData(path, "456".getBytes(), stat.getVersion()); } catch (KeeperException e) { System.out.println("code = " + e.getCode() + "message = " + e.getMessage()); } Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } } } }
基于 version 参数,可以很好的控制 ZooKeeper 上节点数据的原子性操作。
-
检测节点是否存在
package chap5; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class ZKExistSyncUsage implements Watcher { private static final CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String path = "/zk-book"; zk = new ZooKeeper("192.168.7.30:2181", 5000, new ZKExistSyncUsage()); connectedSemaphore.await(); zk.exists(path, true); zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.setData(path, "123".getBytes(), -1); zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.delete(path + "/c1", -1); zk.delete(path, -1); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { try { if (event.getState() == KeeperState.SyncConnected) { if (event.getType() == EventType.None && null == event.getPath()) { connectedSemaphore.countDown(); } else if (event.getType() == EventType.NodeCreated) { System.out.println("Node:" + event.getPath() + " Created"); zk.exists(event.getPath(), true); } else if (event.getType() == EventType.NodeDeleted) { System.out.println("Node:" + event.getPath() + " Deleted"); zk.exists(event.getPath(), true); } else if (event.getType() == EventType.NodeDataChanged) { System.out.println("Node:" + event.getPath() + " DataChanged"); zk.exists(event.getPath(), true); } } } catch (Exception e) { System.out.println(e); } } }
Node:/zk-book Created Node:/zk-book DataChanged Node:/zk-book Deleted
- 无论节点是否存在,通过 exists 接口都可以注册 watcher。
- exists 接口中注册的 watcher,能过对节点创建、节点删除、节点数据更新进行监听。
- 对于指定节点的子节点的各种变化,都不会通知客户端。
-
权限控制
package chap5; import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class ZKAuthUsage { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { String path = "/zk-book-auth"; String path2 = path + "/c1"; ZooKeeper zk = new ZooKeeper("192.168.7.30:2181", 5000, new ZKCreateSyncUsage()); zk.addAuthInfo("digest", "foo:foo".getBytes()); zk.create(path, "123".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); zk.create(path2, "123".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); ZooKeeper zk2 = new ZooKeeper("192.168.7.30:2181", 5000, new ZKCreateSyncUsage()); zk2.addAuthInfo("digest", "foo:foo".getBytes()); // byte[] data = zk2.getData(path, false, null); // System.out.println(new String(data)); zk2.delete(path2, -1); ZooKeeper zk3 = new ZooKeeper("192.168.7.30:2181", 5000, new ZKCreateSyncUsage()); zk3.delete(path, -1); Thread.sleep(Integer.MAX_VALUE); } }
当客户端对一个节点添加权限信息后,其作用范围是节点,也就是说当我们对一个数据节点添加权限信息后,依然可以自由删除这个节点。但这个节点的子节点,就必须使用相应的权限信息才能删除。
典型使用场景
ZooKeeper 是一个高可用的分布式数据管理和协调框架,基于对 ZAB 算法的实现,该框架能够很好的保证分布式环境中数据的一致性。
-
数据发布/订阅
数据发布/订阅系统,即所谓的配置中心。发布者将数据发布到 zk 的一个或一系列节点上,供订阅者进行数据订阅,实现配置信息的集中式管理和数据的动态更新。
zk 采用推拉结合的方式,客户端向服务端注册自己关心的节点,一旦该节点的数据发生变更,那么服务端就向相应的服务端