【ZooKeeper】Java客户端API使用介绍。

2022-09-07 10:37:30

创建会话。

本人使用的是3.4.11版本,也是目前的稳定版本,先给出官方的API地址:http://zookeeper.apache.org/doc/r3.4.11/api/index.html

客户端可以通过创建一个ZooKeeper(org.apache.zookeeper.ZooKeeper)实例来连接ZooKeeper服务器,API一共提供了四种构造方法:

ZooKeeper(java.lang.String connectString, int sessionTimeout, Watcher watcher)

ZooKeeper(java.lang.String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)

ZooKeeper(java.lang.String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)

ZooKeeper(java.lang.String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

这里面的参数含义:

  • connectString:代表着ZooKeeper服务器地址列表,使用的就是host:post的形式,如果有多个服务器的话中间使用英文逗号隔开就好,而且在地址后面还可以指定一个根目录,例如:centos3:2181/test ,这样所有的操作都是基于test这个根目录的。

  • sessionTimeout:顾名思义就是会话的超时时间,单位是毫秒。在ZooKeeper服务器和客户端之间通过心跳检测的形式来判断客户端是否可用,如果在设定的sessionTimeout时间内没有进行有效的心跳检测,会话就失效了。

  • watcher:这是一个实现了Watcher接口的类,用来作为ZooKeeper中的时间通知处理器,比如当指定节点的数据发生变化时就会回调该方法。

  • canBeReadOnly:一个boolean类型的参数,这是标识当前的会话是否支持“read-only”也就是所谓的只读模式。在ZooKeeper集群中,一个机器如果和集群中过半及以上的机器失去了网络连接(网络分区情况),那么这个机器将不再处理客户端请求,读写请求都不会处理。但是有些时候出现这种故障之后,我们还是希望此事能够ZooKeeper服务器能够提供读服务,这就是“read-only”模式。

  • sessionId和sessionPasswd:分别代表会话的ID和会话秘钥。这两个参数能够确定唯一的会话,同时客户端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。

参数中含义已经说得很清楚了,需要注意的是ZooKeeper会话的创建是一个异步的过程,也就是说虽然new了一个ZooKeeper实例并且也能处理完客户端初始化工作之后得到返回,但是此时还不能算是真正建立还一个可用的会话,此时会话处于“CONNECTING”状态。

zooKeeper =new ZooKeeper(zkAddress,sessionTimeout,new ZKServer());//初始化一个ZK对象
System.out.println(zooKeeper.getState());

比例上述代码,输出的内容就是:CONNECTING。当ZooKeeper服务器真正创建完一个会话后,会向服务端发送一个时间通知,客户端接收到了这个通知之后才能算真正创建完一个会话,这时是“CONNECTED”状态。

也可以在创建完ZooKeeper实例之后,使用ZooKeeper的getSessionId()和getSessionPasswd()方法得到对应会话的ID和秘钥以便后面用来恢复会话,注意当ZooKeeper处于“CONNECTING”状态时,接收到的ID为0,秘钥虽然不为0但是与完全创建完会话得到的秘钥不一致,显然也是不正确的。如果使用不正确的秘钥的话会接收到服务器一个Expired通知。


创建节点。

当我们拿到一个会话之后,就可以对ZooKeeper服务器进行操作了。可以通过以下两种构造方法创建一个数据节点:

create(java.lang.String path, byte[] data, java.util.List acl, CreateMode createMode)

create(java.lang.String path, byte[] data, java.util.List acl, CreateMode createMode, AsyncCallback.StringCallback cb, java.lang.Object ctx)

参数含义:

  • path:需要创建的节点路径,注意如果必须保证父节点存在,否则抛出NodeExistsExcepstion异常。

  • data:需要存入节点的数据,需要是byte数组类型。

  • acl:代表节点的ACL(访问控制列表)策略。

  • createMode:节点类型,是一个枚举类型,通常有4中可选的节点。

  • cb:异步时的回调函数,需要实现StringCallback接口。

  • ctx:传递一个对象,可以在回调方法执行的时候使用,通常放一个上下文信息。

很显然第一个构造方法是同步的,这时候会使用该函数会返回一个String类型的数据,这个就是所创建节点的路径。

其中的acl参数可以使用以下几种:

publicinterfaceIds {/**
         * This Id represents anyone.
         */publicfinal Id ANYONE_ID_UNSAFE =new Id("world","anyone");/**
         * This Id is only usable to set ACLs. It will get substituted with the
         * Id's the client authenticated with.
         */publicfinal Id AUTH_IDS =new Id("auth","");/**
         * This is a completely open ACL .
         */publicfinal ArrayList<ACL> OPEN_ACL_UNSAFE =new ArrayList<ACL>(
                Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));/**
         * This ACL gives the creators authentication id's all permissions.
         */publicfinal ArrayList<ACL> CREATOR_ALL_ACL =new ArrayList<ACL>(
                Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));/**
         * This ACL gives the world the ability to read.
         */publicfinal ArrayList<ACL> READ_ACL_UNSAFE =new ArrayList<ACL>(
                Collections
                        .singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));
    }

通常如果应用场景没有太高的权限要求的话,使用其中的OPEN_ACL_UNSAFE就可以了,这样其他地方对这个节点的任何操作都不需要权限控制。

createMode的四种类型分别如下:

  • 持久(PERSISTENT)
  • 持久顺序(PERSISTENT_SEQUENTIAL)
  • 临时(EPHEMERAL)
  • 临时顺序(EPHEMERAL_SEQUENTIAL)

其中这临时节点下面是不可以创建任何节点的。如果是顺序的话,那么会在最后的节点名称后面加上0000000000、0000000001等等递增字符,并且只要父节点一样的话,那么这个顺序序号就会持续增加,除非将父节点删除重新创建才会从0开始。

在异步使用的时候,我们需要传入一个实现StringCallback接口的类。其中主要是完成processResult的重写:

publicvoidprocessResult(int rc, String path, Object ctx, String name)

这个重写方法中第一个rc参数是服务端响应吗,代表着本次执行结果,常见的响应吗有以下四种:

  • 0:OK,接口调用成功。
  • -4:ConnectionLoss,客户端和服务端连接已断开。
  • -110:NodeExists,指定节点已经存在了。
  • -112:SessionExpired,会话已过期。

第二个参数是刚才调用create时传入的路径参数。第三个参数是刚才调用create时传入的ctx参数值。第四个是服务端创建的节点名,如果是顺序节点的话返回的就是添加序号的路径。

同步创建:

String name = zooKeeper.create(parentNode,"ParentZNode".getBytes(), Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT);//创建一个持久类型的父节点,同步的方式

System.out.println("ParentNode Create Success!" + name);

异步创建:

zooKeeper.create(parentNode +"/" + serverName,("This is " + serverName).getBytes(), Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL_SEQUENTIAL,new ZKServer(),"AsyncCallback Create.");//防止重名增加序号/** 异步创建节点时的回调
     * @param rc 服务器响应吗
     * @param path 接口调用时传入API的数据节点路径参数
     * @param ctx 接口调用时传入API的CTX参数值
     * @param name 完整节点的路径
     */publicvoidprocessResult(int rc, String path, Object ctx, String name) {//System.out.println(zooKeeper.getState());switch (rc) {case0:
                System.out.println(name +"创建成功!");
                System.out.println("RC:" + rc +",path:" + path +",ctx:" + ctx +",name:" + name);//输出相应信息
                otherSomething();//创建成功之后执行其他业务break;case -110:
                System.out.println("节点已存在!");break;
        }
        System.out.println(zooKeeper.getSessionId() +"," + zooKeeper.getSessionPasswd() +"," + zooKeeper.getState());
    }

读取节点。

读取节点的话API提供了两种,一种是读取某个节点下的所有子节点:getChildren,另一种是获取节点中的数据:getData。

1 . getChildren。

getChildren共有八个接口,可以分为四类:

(1)

java.util.List<java.lang.String> getChildren(java.lang.String path, boolean watch)

void getChildren(java.lang.String path, boolean watch, AsyncCallback.ChildrenCallback cb, java.lang.Object ctx)//上面的异步版本

参数含义:

  • path:指定数据节点的路径,就是API调用的目的是获取该节点的子节点列表。

  • watch:标明是否需要注册一个Watcher。如果这个参数是true,那么客户端就会自动使用上文中的默认的Watcher(也就是创建会话传入的Watcher);如果是false,标明不需要注册Watcher。

  • cb:这是用来异步回调的。

  • ctx:用来传递上下文信息的对象。

(2)

java.util.List<java.lang.String> getChildren(java.lang.String path, boolean watch, Stat stat)

void getChildren(java.lang.String path, boolean watch, AsyncCallback.Children2Callback cb, java.lang.Object ctx)//上面的异步版本

path、watch与上面的都一致,这里有个新参数:stat,顾名思义就是一些该节点的统计信息。可以从这个stat中看到该节点的一些基本属性。

注意这里的异步版本的回调函数,与上面的那个是不一样的,这里中间有着2这个数字。这种的比上述的会多一个stat对象。

(3)

java.util.List<java.lang.String> getChildren(java.lang.String path, Watcher watcher)

void getChildren(java.lang.String path, Watcher watcher, AsyncCallback.ChildrenCallback cb, java.lang.Object ctx)//上面的异步版本

这里的watcher参数就是注册的Watcher,一旦本次节点获取之后,子节点列表发生变更的话,那么就会像客户端发送通知,也就会回调该方法,不过只能回调一次,所以需要反复注册。可以传入null。

(4)

java.util.List<java.lang.String> getChildren(java.lang.String path, Watcher watcher, Stat stat)

void getChildren(java.lang.String path, Watcher watcher, AsyncCallback.Children2Callback cb, java.lang.Object ctx)

与上面的一样,不过多了一个stat参数,所以对应的回调函数使用的就是Children2Callback。

2 . getData。

使用getData可以获取一个节点的数据内容,有四个接口:

byte[] getData(java.lang.String path, Watcher watcher, Stat stat)

void getData(java.lang.String path, Watcher watcher, AsyncCallback.DataCallback cb, java.lang.Object ctx)//上面的异步版本

byte[] getData(java.lang.String path, boolean watch, Stat stat)

void getData(java.lang.String path, boolean watch, AsyncCallback.DataCallback cb, java.lang.Object ctx)//上面的异步版本

同步版本直接返回一个byte数组,异步的话在回调函数中会传入byte数组。

说是有四个构造方法,其实也就两类,剩下的都是对应的异步版本。一类就是直接注册的Watcher,另一类就是使用boolean参数来标志是否需要注册一个Watcher。

例如可以在Watcher回调中使用getChildren进行重新注册监听,传入true参数使用默认的Watcher(也就是创建会话传入的Watcher)。

注意这里返回的数据内容是byte数组,所以需要做对应的转换。

nodes = zooKeeper.getChildren(parentNode, true,stat);//再次监听

for (String node : nodes) {
    System.out.println(node);
}

System.out.println("stat:" + stat.getAversion() +"," + stat.getDataLength() +"," + stat.getMtime());

修改节点。

这里的修改节点指的是修改节点中的数据内容,有两个接口:

Stat setData(java.lang.String path,byte[] data,intversion)void setData(java.lang.String path,byte[] data,intversion, AsyncCallback.StatCallback cb, java.lang.Object ctx)

可以看到,如果是同步的话会返回一个Stat对象,异步的话在回调函数中会传入对应的Stat对象。

参数含义:

  • path:指定数据节点的节点路径。

  • data[]:一个字节数组,即需要使用该数据内容覆盖节点现在的内容。

  • version:指定节点的数据版本,即表明本次更新是针对该数据版本进行的。

  • cb:回调函数。

  • ctx:用来传递上下文的对象。

这里的其他参数上面的各个API都有介绍,但是其中的version需要注意。为什么需要version这个参数呢?上面的getData的时候也没有需要指定version,那么这个version究竟是什么意思?

这里有一个CAS(Compare and Swap)的概念,大概意思就是:“对于值V,每次更新前都会比对其值是否是预期值A,只有符合预期,才会将V原子化的更新到新值B”。

在ZooKeeper上创建节点的时候会自动的添加version版本,每次更新的操作都会重新更新这个version号。假如新创建了一个节点:test,这时的version值为0,如果我们想要通过客户端来更新这个test节点的数据,我们需要指定此刻的version值也就是0。但是恰好在我们进行更新之前,有另一个客户端对这个节点进行了更新,那么此时的version值就会发生变化成为1,那么我们的客户端携带的version是0,根据CAS的概念就是没有达到预期的值,所以就会更新失败。

这个version也可以传入-1,在ZooKeeper中版本号都是从0开始的,这个-1就是一个标识符,用来说明对最新的版本号进行更新就可以了。


删除节点。

删除节点与上面的修改节点类似,也是有两个接口:

voiddelete(java.lang.String path,intversion)voiddelete(java.lang.String path,intversion, AsyncCallback.VoidCallback cb, java.lang.Object ctx)

一个同步版本,一个异步版本,与上面的修改节点操作基本一致。需要注意的就是只允许删除叶子节点,如果想要删除的节点下面有子节点的话就会删除失败。

检测节点。

检测节点是否存在一共有四个接口:

Stat exists(java.lang.String path, boolean watch)

void exists(java.lang.String path, boolean watch, AsyncCallback.StatCallback cb, java.lang.Object ctx)

Stat exists(java.lang.String path, Watcher watcher)

void exists(java.lang.String path, Watcher watcher, AsyncCallback.StatCallback cb, java.lang.Object ctx)

参数与上面的都类似,如果是同步版本的话会返回Stat对象,异步的话会在回调函数中传入Stat对象。如果节点不存在的话返回的Stat对象就是null的。


示例。

假如我们有多个后台服务的话,我们想要检测到这些后台服务的列表情况,我们可以在这个服务运行之前将它们自己注册到ZooKeeper中,然后通过回调函数来检测列表的变化情况。这里统一将服务的名称添加到一个父节点中,一旦有其他服务器注册到了这个父节点下面,所有的服务器都回接到通知。

注意这里使用了digest权限,如果想要在后台查看ZooKeeper列表的话需要使用addauth digest leafage:true添加权限认证。

package com.leafage.zk;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.List;import java.util.concurrent.CountDownLatch;importstatic org.apache.zookeeper.ZooDefs.*;/**
 * @Author Leafage
 * @Date 2017/12/1 16:15
 **/publicclassZKServerimplementsWatcher,AsyncCallback.StringCallback {privatestaticfinal String zkAddress ="centos3:2181";//ZK的地址privatestaticfinalint sessionTimeout =2000;//链接超时时间privatestaticfinal String parentNode ="/ZKServer";//存储在ZK中的一个统一的父级ZNodeprivatestatic String serverName =null;privatestatic ZooKeeper zooKeeper =null;/** 主函数,传入服务器名称
     * @param args
     */publicstaticvoidmain(String[] args)throws IOException, KeeperException, InterruptedException {
        serverName = args[0];//得到服务器名称
        initZK();//初始化ZK内容
        registerServer();//注册服务器
    }/**
     * 初始化Zookeeper对象
     */publicstaticvoidinitZK()throws IOException, InterruptedException {
        zooKeeper =new ZooKeeper(zkAddress,sessionTimeout,new ZKServer());//初始化一个ZK对象//System.out.println(zooKeeper.getSessionId() + "," + zooKeeper.getSessionPasswd() +"," + zooKeeper.getState());
        zooKeeper.addAuthInfo("digest","leafage:true".getBytes());//增加权限
    }/**
     * 将自己的Server注册到ZK中
     */publicstaticvoidregisterServer()throws KeeperException, InterruptedException {if (zooKeeper.exists(parentNode,true) ==null) {//如果父节点不存在则新建父节点
            String name = zooKeeper.create(parentNode,"ParentZNode".getBytes(), Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT);//创建一个持久类型的父节点,同步的方式
            System.out.println("ParentNode Create Success!" + name);
        }//zooKeeper.exists(parentNode + "/" + serverName, true);//判断节点是否存在
        zooKeeper.create(parentNode +"/" + serverName,("This is " + serverName).getBytes(), Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL_SEQUENTIAL,new ZKServer(),"AsyncCallback Create.");//防止重名增加序号
        Thread.sleep(Integer.MAX_VALUE);
    }/** 服务器监控ZK的程序
     * @param event
     */publicvoidprocess(WatchedEvent event) {
        System.out.println("==========" + event.getType() +"==========" + event);
        List<String> nodes =null;
        Stat stat =new Stat();try {
            nodes = zooKeeper.getChildren(parentNode,true,stat);//再次监听
            System.out.println("stat:" + stat.getAversion() +"," + stat.getDataLength() +"," + stat.getMtime());
        }catch (KeeperException e) {
            System.out.println("e1");
            e.printStackTrace();
        }catch (InterruptedException e) {
            System.out.println("e2");
            e.printStackTrace();
        }if (event.getType() == Event.EventType.NodeChildrenChanged) {
            System.out.println(event.getPath() +"的节点如下:");for (String node : nodes) {
                System.out.println(node);
            }
        }
    }/** 异步创建节点时的回调
     * @param rc 服务器响应吗
     * @param path 接口调用时传入API的数据节点路径参数
     * @param ctx 接口调用时传入API的CTX参数值
     * @param name 完整节点的路径
     */publicvoidprocessResult(int rc, String path, Object ctx, String name) {//System.out.println(zooKeeper.getState());switch (rc) {case0:
                System.out.println(name +"创建成功!");
                System.out.println("RC:" + rc +",path:" + path +",ctx:" + ctx +",name:" + name);//输出相应信息
                otherSomething();//创建成功之后执行其他业务break;case -110:
                System.out.println("节点已存在!");break;
        }//System.out.println(zooKeeper.getSessionId() + "," + zooKeeper.getSessionPasswd() +"," + zooKeeper.getState());
    }/**
     * 处理一些业务内容
     */publicvoidotherSomething() {
        System.out.println("Do Something!");
    }
}
  • 作者:繁城落叶
  • 原文链接:https://blog.csdn.net/Leafage_M/article/details/78698310
    更新时间:2022-09-07 10:37:30