创建会话。
本人使用的是3.4.11版本,也是目前的稳定版本,先给出官方的API地址:http://zookeeper.apache.org/doc/r3.4.11/api/index.html
客户端可以通过创建一个ZooKeeper(org.apache.zookeeper.ZooKeeper)实例来连接ZooKeeper服务器,API一共提供了四种构造方法:
ZooKeeper(java.lang.String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(java.lang.String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
ZooKeeper(java.lang.String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
ZooKeeper(java.lang.String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
这里面的参数含义:
connectString:代表着ZooKeeper服务器地址列表,使用的就是host:post的形式,如果有多个服务器的话中间使用英文逗号隔开就好,而且在地址后面还可以指定一个根目录,例如:centos3:2181/test ,这样所有的操作都是基于test这个根目录的。
sessionTimeout:顾名思义就是会话的超时时间,单位是毫秒。在ZooKeeper服务器和客户端之间通过心跳检测的形式来判断客户端是否可用,如果在设定的sessionTimeout时间内没有进行有效的心跳检测,会话就失效了。
watcher:这是一个实现了Watcher接口的类,用来作为ZooKeeper中的时间通知处理器,比如当指定节点的数据发生变化时就会回调该方法。
canBeReadOnly:一个boolean类型的参数,这是标识当前的会话是否支持“read-only”也就是所谓的只读模式。在ZooKeeper集群中,一个机器如果和集群中过半及以上的机器失去了网络连接(网络分区情况),那么这个机器将不再处理客户端请求,读写请求都不会处理。但是有些时候出现这种故障之后,我们还是希望此事能够ZooKeeper服务器能够提供读服务,这就是“read-only”模式。
sessionId和sessionPasswd:分别代表会话的ID和会话秘钥。这两个参数能够确定唯一的会话,同时客户端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。
参数中含义已经说得很清楚了,需要注意的是ZooKeeper会话的创建是一个异步的过程,也就是说虽然new了一个ZooKeeper实例并且也能处理完客户端初始化工作之后得到返回,但是此时还不能算是真正建立还一个可用的会话,此时会话处于“CONNECTING”状态。
zooKeeper =new ZooKeeper(zkAddress,sessionTimeout,new ZKServer());//初始化一个ZK对象
System.out.println(zooKeeper.getState());
比例上述代码,输出的内容就是:CONNECTING。当ZooKeeper服务器真正创建完一个会话后,会向服务端发送一个时间通知,客户端接收到了这个通知之后才能算真正创建完一个会话,这时是“CONNECTED”状态。
也可以在创建完ZooKeeper实例之后,使用ZooKeeper的getSessionId()和getSessionPasswd()方法得到对应会话的ID和秘钥以便后面用来恢复会话,注意当ZooKeeper处于“CONNECTING”状态时,接收到的ID为0,秘钥虽然不为0但是与完全创建完会话得到的秘钥不一致,显然也是不正确的。如果使用不正确的秘钥的话会接收到服务器一个Expired通知。
创建节点。
当我们拿到一个会话之后,就可以对ZooKeeper服务器进行操作了。可以通过以下两种构造方法创建一个数据节点:
create(java.lang.String path, byte[] data, java.util.List acl, CreateMode createMode)
create(java.lang.String path, byte[] data, java.util.List acl, CreateMode createMode, AsyncCallback.StringCallback cb, java.lang.Object ctx)
参数含义:
path:需要创建的节点路径,注意如果必须保证父节点存在,否则抛出NodeExistsExcepstion异常。
data:需要存入节点的数据,需要是byte数组类型。
acl:代表节点的ACL(访问控制列表)策略。
createMode:节点类型,是一个枚举类型,通常有4中可选的节点。
cb:异步时的回调函数,需要实现StringCallback接口。
ctx:传递一个对象,可以在回调方法执行的时候使用,通常放一个上下文信息。
很显然第一个构造方法是同步的,这时候会使用该函数会返回一个String类型的数据,这个就是所创建节点的路径。
其中的acl参数可以使用以下几种:
publicinterfaceIds {/**
* This Id represents anyone.
*/publicfinal Id ANYONE_ID_UNSAFE =new Id("world","anyone");/**
* This Id is only usable to set ACLs. It will get substituted with the
* Id's the client authenticated with.
*/publicfinal Id AUTH_IDS =new Id("auth","");/**
* This is a completely open ACL .
*/publicfinal ArrayList<ACL> OPEN_ACL_UNSAFE =new ArrayList<ACL>(
Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));/**
* This ACL gives the creators authentication id's all permissions.
*/publicfinal ArrayList<ACL> CREATOR_ALL_ACL =new ArrayList<ACL>(
Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));/**
* This ACL gives the world the ability to read.
*/publicfinal ArrayList<ACL> READ_ACL_UNSAFE =new ArrayList<ACL>(
Collections
.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));
}
通常如果应用场景没有太高的权限要求的话,使用其中的OPEN_ACL_UNSAFE就可以了,这样其他地方对这个节点的任何操作都不需要权限控制。
createMode的四种类型分别如下:
- 持久(PERSISTENT)
- 持久顺序(PERSISTENT_SEQUENTIAL)
- 临时(EPHEMERAL)
- 临时顺序(EPHEMERAL_SEQUENTIAL)
其中这临时节点下面是不可以创建任何节点的。如果是顺序的话,那么会在最后的节点名称后面加上0000000000、0000000001等等递增字符,并且只要父节点一样的话,那么这个顺序序号就会持续增加,除非将父节点删除重新创建才会从0开始。
在异步使用的时候,我们需要传入一个实现StringCallback接口的类。其中主要是完成processResult的重写:
publicvoidprocessResult(int rc, String path, Object ctx, String name)
这个重写方法中第一个rc参数是服务端响应吗,代表着本次执行结果,常见的响应吗有以下四种:
- 0:OK,接口调用成功。
- -4:ConnectionLoss,客户端和服务端连接已断开。
- -110:NodeExists,指定节点已经存在了。
- -112:SessionExpired,会话已过期。
第二个参数是刚才调用create时传入的路径参数。第三个参数是刚才调用create时传入的ctx参数值。第四个是服务端创建的节点名,如果是顺序节点的话返回的就是添加序号的路径。
同步创建:
String name = zooKeeper.create(parentNode,"ParentZNode".getBytes(), Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT);//创建一个持久类型的父节点,同步的方式
System.out.println("ParentNode Create Success!" + name);
异步创建:
zooKeeper.create(parentNode +"/" + serverName,("This is " + serverName).getBytes(), Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL_SEQUENTIAL,new ZKServer(),"AsyncCallback Create.");//防止重名增加序号/** 异步创建节点时的回调
* @param rc 服务器响应吗
* @param path 接口调用时传入API的数据节点路径参数
* @param ctx 接口调用时传入API的CTX参数值
* @param name 完整节点的路径
*/publicvoidprocessResult(int rc, String path, Object ctx, String name) {//System.out.println(zooKeeper.getState());switch (rc) {case0:
System.out.println(name +"创建成功!");
System.out.println("RC:" + rc +",path:" + path +",ctx:" + ctx +",name:" + name);//输出相应信息
otherSomething();//创建成功之后执行其他业务break;case -110:
System.out.println("节点已存在!");break;
}
System.out.println(zooKeeper.getSessionId() +"," + zooKeeper.getSessionPasswd() +"," + zooKeeper.getState());
}
读取节点。
读取节点的话API提供了两种,一种是读取某个节点下的所有子节点:getChildren,另一种是获取节点中的数据:getData。
1 . getChildren。
getChildren共有八个接口,可以分为四类:
(1)
java.util.List<java.lang.String> getChildren(java.lang.String path, boolean watch)
void getChildren(java.lang.String path, boolean watch, AsyncCallback.ChildrenCallback cb, java.lang.Object ctx)//上面的异步版本
参数含义:
path:指定数据节点的路径,就是API调用的目的是获取该节点的子节点列表。
watch:标明是否需要注册一个Watcher。如果这个参数是true,那么客户端就会自动使用上文中的默认的Watcher(也就是创建会话传入的Watcher);如果是false,标明不需要注册Watcher。
cb:这是用来异步回调的。
ctx:用来传递上下文信息的对象。
(2)
java.util.List<java.lang.String> getChildren(java.lang.String path, boolean watch, Stat stat)
void getChildren(java.lang.String path, boolean watch, AsyncCallback.Children2Callback cb, java.lang.Object ctx)//上面的异步版本
path、watch与上面的都一致,这里有个新参数:stat,顾名思义就是一些该节点的统计信息。可以从这个stat中看到该节点的一些基本属性。
注意这里的异步版本的回调函数,与上面的那个是不一样的,这里中间有着2这个数字。这种的比上述的会多一个stat对象。
(3)
java.util.List<java.lang.String> getChildren(java.lang.String path, Watcher watcher)
void getChildren(java.lang.String path, Watcher watcher, AsyncCallback.ChildrenCallback cb, java.lang.Object ctx)//上面的异步版本
这里的watcher参数就是注册的Watcher,一旦本次节点获取之后,子节点列表发生变更的话,那么就会像客户端发送通知,也就会回调该方法,不过只能回调一次,所以需要反复注册。可以传入null。
(4)
java.util.List<java.lang.String> getChildren(java.lang.String path, Watcher watcher, Stat stat)
void getChildren(java.lang.String path, Watcher watcher, AsyncCallback.Children2Callback cb, java.lang.Object ctx)
与上面的一样,不过多了一个stat参数,所以对应的回调函数使用的就是Children2Callback。
2 . getData。
使用getData可以获取一个节点的数据内容,有四个接口:
byte[] getData(java.lang.String path, Watcher watcher, Stat stat)
void getData(java.lang.String path, Watcher watcher, AsyncCallback.DataCallback cb, java.lang.Object ctx)//上面的异步版本
byte[] getData(java.lang.String path, boolean watch, Stat stat)
void getData(java.lang.String path, boolean watch, AsyncCallback.DataCallback cb, java.lang.Object ctx)//上面的异步版本
同步版本直接返回一个byte数组,异步的话在回调函数中会传入byte数组。
说是有四个构造方法,其实也就两类,剩下的都是对应的异步版本。一类就是直接注册的Watcher,另一类就是使用boolean参数来标志是否需要注册一个Watcher。
例如可以在Watcher回调中使用getChildren进行重新注册监听,传入true参数使用默认的Watcher(也就是创建会话传入的Watcher)。
注意这里返回的数据内容是byte数组,所以需要做对应的转换。
nodes = zooKeeper.getChildren(parentNode, true,stat);//再次监听
for (String node : nodes) {
System.out.println(node);
}
System.out.println("stat:" + stat.getAversion() +"," + stat.getDataLength() +"," + stat.getMtime());
修改节点。
这里的修改节点指的是修改节点中的数据内容,有两个接口:
Stat setData(java.lang.String path,byte[] data,intversion)void setData(java.lang.String path,byte[] data,intversion, AsyncCallback.StatCallback cb, java.lang.Object ctx)
可以看到,如果是同步的话会返回一个Stat对象,异步的话在回调函数中会传入对应的Stat对象。
参数含义:
path:指定数据节点的节点路径。
data[]:一个字节数组,即需要使用该数据内容覆盖节点现在的内容。
version:指定节点的数据版本,即表明本次更新是针对该数据版本进行的。
cb:回调函数。
ctx:用来传递上下文的对象。
这里的其他参数上面的各个API都有介绍,但是其中的version需要注意。为什么需要version这个参数呢?上面的getData的时候也没有需要指定version,那么这个version究竟是什么意思?
这里有一个CAS(Compare and Swap)的概念,大概意思就是:“对于值V,每次更新前都会比对其值是否是预期值A,只有符合预期,才会将V原子化的更新到新值B”。
在ZooKeeper上创建节点的时候会自动的添加version版本,每次更新的操作都会重新更新这个version号。假如新创建了一个节点:test,这时的version值为0,如果我们想要通过客户端来更新这个test节点的数据,我们需要指定此刻的version值也就是0。但是恰好在我们进行更新之前,有另一个客户端对这个节点进行了更新,那么此时的version值就会发生变化成为1,那么我们的客户端携带的version是0,根据CAS的概念就是没有达到预期的值,所以就会更新失败。
这个version也可以传入-1,在ZooKeeper中版本号都是从0开始的,这个-1就是一个标识符,用来说明对最新的版本号进行更新就可以了。
删除节点。
删除节点与上面的修改节点类似,也是有两个接口:
voiddelete(java.lang.String path,intversion)voiddelete(java.lang.String path,intversion, AsyncCallback.VoidCallback cb, java.lang.Object ctx)
一个同步版本,一个异步版本,与上面的修改节点操作基本一致。需要注意的就是只允许删除叶子节点,如果想要删除的节点下面有子节点的话就会删除失败。
检测节点。
检测节点是否存在一共有四个接口:
Stat exists(java.lang.String path, boolean watch)
void exists(java.lang.String path, boolean watch, AsyncCallback.StatCallback cb, java.lang.Object ctx)
Stat exists(java.lang.String path, Watcher watcher)
void exists(java.lang.String path, Watcher watcher, AsyncCallback.StatCallback cb, java.lang.Object ctx)
参数与上面的都类似,如果是同步版本的话会返回Stat对象,异步的话会在回调函数中传入Stat对象。如果节点不存在的话返回的Stat对象就是null的。
示例。
假如我们有多个后台服务的话,我们想要检测到这些后台服务的列表情况,我们可以在这个服务运行之前将它们自己注册到ZooKeeper中,然后通过回调函数来检测列表的变化情况。这里统一将服务的名称添加到一个父节点中,一旦有其他服务器注册到了这个父节点下面,所有的服务器都回接到通知。
注意这里使用了digest权限,如果想要在后台查看ZooKeeper列表的话需要使用addauth digest leafage:true添加权限认证。
package com.leafage.zk;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.List;import java.util.concurrent.CountDownLatch;importstatic org.apache.zookeeper.ZooDefs.*;/**
* @Author Leafage
* @Date 2017/12/1 16:15
**/publicclassZKServerimplementsWatcher,AsyncCallback.StringCallback {privatestaticfinal String zkAddress ="centos3:2181";//ZK的地址privatestaticfinalint sessionTimeout =2000;//链接超时时间privatestaticfinal String parentNode ="/ZKServer";//存储在ZK中的一个统一的父级ZNodeprivatestatic String serverName =null;privatestatic ZooKeeper zooKeeper =null;/** 主函数,传入服务器名称
* @param args
*/publicstaticvoidmain(String[] args)throws IOException, KeeperException, InterruptedException {
serverName = args[0];//得到服务器名称
initZK();//初始化ZK内容
registerServer();//注册服务器
}/**
* 初始化Zookeeper对象
*/publicstaticvoidinitZK()throws IOException, InterruptedException {
zooKeeper =new ZooKeeper(zkAddress,sessionTimeout,new ZKServer());//初始化一个ZK对象//System.out.println(zooKeeper.getSessionId() + "," + zooKeeper.getSessionPasswd() +"," + zooKeeper.getState());
zooKeeper.addAuthInfo("digest","leafage:true".getBytes());//增加权限
}/**
* 将自己的Server注册到ZK中
*/publicstaticvoidregisterServer()throws KeeperException, InterruptedException {if (zooKeeper.exists(parentNode,true) ==null) {//如果父节点不存在则新建父节点
String name = zooKeeper.create(parentNode,"ParentZNode".getBytes(), Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT);//创建一个持久类型的父节点,同步的方式
System.out.println("ParentNode Create Success!" + name);
}//zooKeeper.exists(parentNode + "/" + serverName, true);//判断节点是否存在
zooKeeper.create(parentNode +"/" + serverName,("This is " + serverName).getBytes(), Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL_SEQUENTIAL,new ZKServer(),"AsyncCallback Create.");//防止重名增加序号
Thread.sleep(Integer.MAX_VALUE);
}/** 服务器监控ZK的程序
* @param event
*/publicvoidprocess(WatchedEvent event) {
System.out.println("==========" + event.getType() +"==========" + event);
List<String> nodes =null;
Stat stat =new Stat();try {
nodes = zooKeeper.getChildren(parentNode,true,stat);//再次监听
System.out.println("stat:" + stat.getAversion() +"," + stat.getDataLength() +"," + stat.getMtime());
}catch (KeeperException e) {
System.out.println("e1");
e.printStackTrace();
}catch (InterruptedException e) {
System.out.println("e2");
e.printStackTrace();
}if (event.getType() == Event.EventType.NodeChildrenChanged) {
System.out.println(event.getPath() +"的节点如下:");for (String node : nodes) {
System.out.println(node);
}
}
}/** 异步创建节点时的回调
* @param rc 服务器响应吗
* @param path 接口调用时传入API的数据节点路径参数
* @param ctx 接口调用时传入API的CTX参数值
* @param name 完整节点的路径
*/publicvoidprocessResult(int rc, String path, Object ctx, String name) {//System.out.println(zooKeeper.getState());switch (rc) {case0:
System.out.println(name +"创建成功!");
System.out.println("RC:" + rc +",path:" + path +",ctx:" + ctx +",name:" + name);//输出相应信息
otherSomething();//创建成功之后执行其他业务break;case -110:
System.out.println("节点已存在!");break;
}//System.out.println(zooKeeper.getSessionId() + "," + zooKeeper.getSessionPasswd() +"," + zooKeeper.getState());
}/**
* 处理一些业务内容
*/publicvoidotherSomething() {
System.out.println("Do Something!");
}
}