ZooKeeper之Java客户端API使用—读取数据。

2022年9月15日11:13:31

  读取数据,包括子节点列表的获取和节点数据的获取。ZooKeeper分别提供了不同的API来获取数据。

getChildren

        客户端可以通过ZooKeeper的API来获取一个节点的所有子节点,有如下8个接口可供使用:

  • List<String> getChildren(final String path , Watcher watcher)
  • List<String> getChildren(String path , boolean watch)
  • void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
  • void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
  • List<String> getChildren(final String path , Watcher watcher, Stat stat)
  • List<String> getChildren(String path , boolean watch, Stat stat)
  • void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
  • void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)

        这里列出的8个API包含了同步和异步的接口,API方法的参数说明如下表所示。

参数名 说明
path 指定数据节点的节点路径,即API调用的目的是获取该节点的子节点列表
watcher 注册的Watcher。一旦在本次子节点获取之后,子节点列表发生变更的话,那么就会向客户端发送通知。该参数允许传入null
watch 表明是否需要注册一个Watcher。这里就要使用默认Watcher了。如果这个参数是true,那么ZooKeeper客户端会自动使用上文中提到的那个默认Watcher;如果是false,表明不需要注册Watcher
cb 注册一个异步回调函数
ctx 用于传递上下文信息的对象
stat 指定数据节点的节点状态信息。用法是在接口中传入一个旧的stat变量,该stat变量会在方法执行执行过程中,被来自服务器响应的新stat对象替换

Watcher

        注册Watcher,如果ZooKeeper客户端在获取到指定节点的子节点列表后,还需要订阅这个子节点列表的变化通知,那么就可以通过注册一个Watcher来实现。当有子节点被添加或是删除时,服务器就会向客户端发送一个NodeChildren Change(EventType.NodeChildrenChanged)类型的事件通知。需要注意的是,在服务端发送给客户端的事件通知中,是不包含最新的节点列表的,客户端必须主动重新进行获取。通常客户端在收到这个事件通知后,就可以再次获取最新的子节点列表了。

stat

        描述节点状态信息的对象:stat。stat对象中记录了一个节点的基本属性信息,例如列电创建时事务ID(cZxid)、最后一次修改的事务ID(mZxid)和节点数据内容的长度(dataLength)等。有时候,我们不仅需要获取节点最新的子节点列表,还要获取这个节点最新的节点状态信息。对于这种情况,我们可以将一个旧的stat变量传入API接口,该stat变量会在方法执行过程中,被来自服务端响应的新stat对象替换。

使用同步API获取子节点列表

// ZooKeeper API 获取子节点列表,使用同步(sync)接口

public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

pirvate static ZooKeeper zk = null;

public static void main(String[] args) throws Exception {

String path = "/zk-book";

zk = new ZooKeeper("domain1.book.zookeeper:2181", 5000, new ZooKeeper_GetChildren_API_Sync_Usage());

connectedSemaphore.await();

zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

zk.create(path+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

List<String> childrenList = zk.getChildren(path , true);

System.out.println(childrenList);

zk.create(path+"/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

Thread.sleep(Integer.MAX_VALUE);

}

public void process(WatchedEvent event) {

if (KeeperState.SyncConnected == event.getState()) {

if (EventType.None == event.getType() && null == event.getPath) {

connectedSemaphore.countDown();

} else if (event.getType() == EventType.NodeChildrenChanged) {

try {

System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true));

} catch (Exception e) {}

}

}

}

}

        运行程序,输出结果如下:

ZooKeeper之Java客户端API使用—读取数据。

        在上面这个程序中,我们首先创建了一个父节点/zk-book,以及一个子节点/zk-book/c1。然后调用getChildren的同步接口来获取/zk-book节点下的所有子节点,同时在接口调用的时候注册了一个Watcher。之后,我们继续向/zk-book节点创建子节点/zk-book/c2。由于之前我们对/zk-book节点注册了一个Watcher,因此,一旦此时有子节点被创建,ZooKeeper服务端就会客户端发出一个“子节点变更”的事件通知,于是,客户但在收到这个事件通知之后就可以再次调用getChildren方法来获取新的子节点列表。

        另外,从输出结果中我们还可以发现,调用getChildren获取到节点列表,都是数据节点的相对节点路径,例如上面输出结果中的c1和c2,事实上,完整的ZNode路径应该是/zk-book/c1和zk-book/c2。

        关于Watcher,这里简单提一点,ZooKeeper服务端在向客户端发送Watcher “NodeChildrenChanged”事件通知的时候,仅仅只会发出一个通知,而不会把节点的变化情况发送给客户端,需要客户端自己重新获取。另外,由于Watcher通知是一次性的,即一旦触发一次通知后,该Watcher就失效了,因此客户端需要反复注册Watcher。

使用异步API获取子节点列表

// ZooKeeper API 获取子节点列表,使用异步(async)接口

public class ZooKeeper_GetChildren_API_ASync_Usage implements Watcher {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

private static ZooKeeper zk = null;

public static void main(String[] args) throws Exception {

String path = "/zk-book";

zk = new ZooKeeper("domain1.book.zookeeper:2181", 5000, new ZooKeeper_GetChildren_API_ASync_Usage());

connectedSemaphore.await();

zk.create(path , "".getBytes() , Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

zk.create(path+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

zk.getChildren(path, true, new IChildren2Callback(), null);

zk.create(path + "/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

Thread.sleep(Integer.MAX_VALUE);

}

public void process(WatchedEvent event) {

if(KeeperState.SyncConnected == event.getState()) {

if(KeeperState.SyncConnected == event.getState()) {

if(EventType.None == event.getType() && null == event.getPath()) {

connectedSemaphore.countDown();

} else if (event.getType == EventType.NodeChildrenChanged) {

try {

System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true));

} catch (Exception e) {}

}

}

}

}

}

class IChildren2Callback implements AsyncCallback.Children2Callback {

public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

System.out.println("Get Children znode result:[response code:" + rc + ", param path:" + path + ", ctx: " + ", children list: " + ", stat:" + stat);

}

}

        运行程序,输出结果如下:

ZooKeeper之Java客户端API使用—读取数据。

        在上面这个程序中,我们将子节点列表的获取逻辑进行了异步化。异步接口通常会应用在这样的使用场景中:应用启动的时候,会获取一些配置信息,例如“机器列表”,这些配置通常比较大,并且不希望配置的获取影响应用的主流程。

getData

        客户端可以通过ZooKeeper的API来获取一个节点的数据内容,有如下4个接口:

  • byte[] getData(final String path, Watcher watcher, Stat stat)
  • byte[] getData(String path, boolean watch, Stat stat)
  • void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
  • void getData(String path, boolean watch, DataCallback cb, Object ctx)

        这里列出的4个API包含了同步和异步的接口,API方法的参数说明如下表所示。

参数名 说明
path 指定数据节点的节点路径,即API调用的目的是获取该节点的数据内容
watcher 注册的Watcher。一旦之后节点内容有变更,就会向客户端发送通知。该参数允许传入null
stat 指定数据节点的节点状态信息。用法是在接口中传入一个旧的stat变量,该stat变量会在方法执行过程中,被来自服务端响应的新stat对象替换。
watch 表明是否需要注册一个Watcher。这里使用到默认Watcher。如果这个参数是true,那么ZooKeeper客户端会自动使用上文中提到的那个默认Watcher:如果是false,表明不需要注册Watcher
cb 注册一个异步回调函数
ctx 用于传递上下文信息的对象

        getData接口和上文中的getChildren接口的用法基本相同,这里主要看一看注册的Watcher有什么不同之处。客户端在获取一个节点的数据内容的时候,是可以进行Watcher注册的,这样一来,一旦该节点的状态发生变更,那么ZooKeeper服务端就会向客户端发送一个NodeDataChanged(EventType.NodeDataChanged) 的事件通知。

        另外,API返回结果的类型是byte[],目前ZooKeeper只支持这种类型的数据存储,所以在获取数据的时候也是返回此类型。

使用同步API获取节点数据内容

// ZooKeeper API 获取节点数据内容,使用同步(sync)接口

public class ZooKeeper_GetData_API_Sync_Usage implements Watcher {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

pirvate static ZooKeeper zk = null;

public static void main(String[] args) throws Exception {

String path = "/zk-book";

zk = new ZooKeeper("domain1.book.zookeeper:2181", 5000, new ZooKeeper_GetData_API_Sync_Usage());

connectedSemaphore.await();

zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

System.out.println(new String(zk.getData(path, true, stat)));
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
zk.setData(path, "123".Bytes(), -1);

Thread.sleep(Integer.MAX_VALUE);

}

public void process(WatchedEvent event) {

if (KeeperState.SyncConnected == event.getState()) {

if (EventType.None == event.getType() && null == event.getPath) {

connectedSemaphore.countDown();

} else if (event.getType() == EventType.NodeChildrenChanged) {

try {

System.out.println(new String(zk.getData(event.getPath(), true, stat)));

System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());

} catch (Exception e) {}

}

}

}

}

        运行程序,输出结果如下:

ZooKeeper之Java客户端API使用—读取数据。

        在上面这个程序中,我们首先创建了一个节点/zk-book,并初始化其数据内容为“123”.然后调用getData的同步接口来获取/zk-book节点的数据内容,调用的同时注册了一个Watcher。之后,我们同样以“123”去更新将该节点的数据内容,此时,由于我们之前在该节点上注册了一个Watcher,因此,一旦该节点的数据发生变化,ZooKeeper服务端就会向客户端发出一个“数据变更”的事件通知,于是,客户端可以收到这个事件通知后,再次调用getData接口来获取新的数据内容。

        另外,在调用getData接口的同时,我们传入了一个stat变量,在ZooKeeper客户端的内部实现中,会从服务端的响应中获取到数据节点的最新节点状态信息,来替换这个客户端的旧状态。

        我们重点再来看下运行上面这个程序的输出结果中,前后两次调用getData接口的返回值。第一次的输出结果如下:

ZooKeeper之Java客户端API使用—读取数据。

       第二次的输出结果如下:

ZooKeeper之Java客户端API使用—读取数据。

        第一次是客户端主动调用getData接口来获取数据;第二次则是节点数据变更后,服务端发送Watcher事件通知给客户端后,客户端再次调用getData接口来获取数据。两次调用的输出结果中,节点数据内容的值并没有变化。既然节点的数据内容并没有变化,那么ZooKeeper服务端为什么会向客户端发送Watcher事件通知呢。这里,我们必须明确一个概念:节点的数据内容或是节点的数据版本变化,都被看作是ZooKeeper节点的变化。明白这个概念后,再回过头看上面的结果输出,可以看出,该节点在Zxid为“253404961568”时被创建,在Zxid为“253404961576”时被更新,于是节点的数据版本从“0”变化到“1”.所以,这里我们要明确的一点是,蜀军欸容或是数据版本变化,都会触发服务端的NodeDataChanged通知。

使用异步API获取节点数据内容

// ZooKeeper API 获取节点数据内容,使用异步(async)接口

public class GetData_API_ASync_Usage implements Watcher {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

private static ZooKeeper zk = null;

public static void main(String[] args) throws Exception {

String path = "/zk-book";

zk = new ZooKeeper("domain1.book.zookeeper:2181", 5000, new GetData_API_ASync_Usage());

connectedSemaphore.await();

zk.create(path , "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode, EPHEMERAL);
zk.getData(path, true, new IDataCallback(), null);
zk.setData(path, "123".getBytes(), -1);

Thread.sleep(Integer.MAX_VALUE);

}

public void process(WatchedEvent event) {

if(KeeperState.SyncConnected == event.getState()) {

if(KeeperState.SyncConnected == event.getState()) {

if(EventType.None == event.getType() && null == event.getPath()) {

connectedSemaphore.countDown();

} else if (event.getType == EventType.NodeChildrenChanged) {

try {

zk.getData(event.getPath(), true, new IDataCallback(), null);

} catch (Exception e) {}

}

}

}

}

}

class IChildren2Callback implements AsyncCallback.Children2Callback {

public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

System.out.println(rc +", " + path + ", " + new String(data));

System.out.println(stat.getCzxid() + "," + stat.getVersion());

}

}

        运行程序,输出结果如下:

ZooKeeper之Java客户端API使用—读取数据。

        上面就是使用getData的异步接口来获取节点数据内容的示例程序。

  • 作者:孤芳不自賞
  • 原文链接:https://blog.csdn.net/en_joker/article/details/78716170
    更新时间:2022年9月15日11:13:31 ,共 8695 字。