ZooKeeper C语言API简单试用

  ZooKeeper有官方标准C语言绑定客户端却没有C++版本的,反正身为一个见多识广的C++程序员对此已经习以为常、生无可恋了。其实,ZooKeeper最流行的客户端绑定是Curator,不过它是Java语言的,这个库极为的流行好用,于是乎GitHub上面有很多模仿Curator的接口,然后基于C语言绑定库,封装实现出C++语言的ZooKeeper客户端,不过貌似都是小作坊之手笔(没有太多的Star),所以也不敢直接上。
  C语言的客户端库咋看之下会比较的复杂,因为异步的方式将操作和回调割裂开来,代码会比较碎片化。通过XMind将这些接口梳理一下,其实也挺容易的。

一、ZooKeepr C语言客户端基本使用

  在之前介绍编译C客户端的时候,会同时生成C语言开发所需的库文件。我们通常会使用其多线程异步版本的库,该版本库在使用的时候,会自动创建单独的IO线程、事件线程用于处理连接和事件回调操作。
  下面,首先将zookeeper.h头文件中的重要函数接口罗列出来,其实对照前面一篇文章看来,这些函数的意义也是清晰可辨的,而且接口的风格比较统一。基本有些函数名会有一个额外添加w标记的版本(没办法,C语言不支持重载),可以用于设置watch event,当侦听的事件发生后收到notification时,就会使用事先设置参数去调用指定的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ZOOAPI zhandle_t *zookeeper_init(...);
ZOOAPI int zookeeper_close(zhandle_t *zh);
ZOOAPI const clientid_t *zoo_client_id(zhandle_t *zh);

ZOOAPI int zoo_acreate(zhandle_t *zh, const char *path, ...);
ZOOAPI int zoo_adelete(zhandle_t *zh, const char *path, ...);

ZOOAPI int zoo_aexists(zhandle_t *zh, const char *path, int watch, ... );
ZOOAPI int zoo_awexists(zhandle_t *zh, const char *path, watcher_fn watcher, ...);

ZOOAPI int zoo_aget(zhandle_t *zh, const char *path, ...
ZOOAPI int zoo_awget(zhandle_t *zh, const char *path, watcher_fn watcher, ...);
ZOOAPI int zoo_aset(zhandle_t *zh, const char *path, ...);

ZOOAPI int zoo_aget_children(zhandle_t *zh, const char *path, ...);
ZOOAPI int zoo_awget_children(zhandle_t *zh, const char *path, watcher_fn watcher, ...);

ZOOAPI int zoo_async(zhandle_t *zh, const char *path, string_completion_t completion, ...);

  为了看起来更加的直观,同时也方便查找和备忘,我重要的五个函数整理到一个xmind的文件中了:
zookeeper

1.1 会话的建立

  下面所有的操作接口都需要一个zhandle_t的句柄参数,这个句柄最初通过zookeeper_init调用返回得到的。

1
zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t *clientid, void *context, int flags);

  host 使用host:port组成,使用逗号分隔的所有集群的主机列表;
  fn 是会话建立的回调函数,因为会话建立的过程是异步的,上述函数会在会话建立之前就会返回,所以需要在这个回调函数中检查会话状态,而且必要的时候需要进行重试操作;
  recv_timeout 会话超时时间,使用ms作为单位的;
  clientid 主要是用于会话恢复的时候使用,已经建立的会话可以使用zoo_client_id得到,如果是新建会话就将其置为NULL。
  context 是一个用户自定义的数据指针,必要时可以通过zoo_get_context返回,这个东西是用户自己使用的,ZooKeeper库本身不会用到他;
  flags 被保留未使用,置0。

1
typedef void (*watcher_fn)(zhandle_t *zh, int type, int state, const char *path,void *watcherCtx);

  watch_fn可以算得上是一个十分重要的函数原型了,不仅在创建会话的时候,后面读节点需要设置watch event的时候都需要这个东西。 
  type 表示事件类型,包括ZOO_CREATED_EVENT、ZOO_DELETED_EVENT、ZOO_CHANGED_EVENT、ZOO_CHILD_EVENT、ZOO_SESSION_EVENT;
  state 表示session的状态,包含ZOO_CONNECTING_STATE、ZOO_CONNECTED_STATE等状态,代码逻辑中需要根据对应的状态做重连、数据读取等对应的操作;
  path 事件相关的path,在创建连接的时候是SESSION_EVENT类型的事件,其值为NULL;
  watcherCtx 在会话创建的时候该值为NULL,而后续awget、awexists等操作的时候,可以额外的传递一个用户自定义参数,会提现在watcherCtx这里。
  在上面会话建立的过程当中,如果回调函数收到的状态为ZOO_CONNECTED_STATE,则表示会话已经建立成功,ZOO_NOTCONNECTED_STATE则表示会话没有建立,而ZOO_EXPIRED_SESSION_STATE表示会话超时,此时需要调用zookeeper_close关闭这个超时会话,再做其他处理。

1.2 创建znode节点

  znode节点是ZooKeeper中的基本元素,通过调用zoo_acreate可以用异步的方式创建znode节点,其函数签名为:

1
int zoo_acreate(zhandle_t *zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, string_completion_t completion, const void *data);

  zh 是后面所有调用操作都需要使用一个会话句柄信息,path 则是需要创建的目的路径;
  valuevaluelen 表明节点的数据内容,后面可以使用zoo_awget、zoo_set等接口读取访问和设置。通过valuelen,所以其保存的数据可以是任意的裸字节序列;
  acl 主要是安全和管理需要的,对于可信环境而言,其值可以设置为ZOO_OPEN_ACL_UNSAFE,但是这个参数不能传递空地址哦;
  flag 是创建节点需要附带的参数,前面说到创建ZOO_EPHEMERAL(临时)和ZOO_SEQUENCE(顺序)节点的方式,就是通过这个flag来设置指定的;
  string_completion_t 是在异步创建完成后进行回调通知使用的,data则是作为参数供这个回调参数使用。

1
2
3
4
typedef void (*void_completion_t)(int rc, const void *data);
typedef void (*string_completion_t)(int rc, const char *value, const void *data);
typedef void (*strings_completion_t)(int rc, const struct String_vector *strings, const void *data);
typedef void (*data_completion_t)(int rc, const char *value, int value_len, const struct Stat *stat, const void *data);

  上面几个是常用的回调函数之原型,他们会在很多地方被使用到。
  rc 是返回码,其值可以参照enum ZOO_ERRORS,正常情况下用ZOK表示成功返回,不过在ZCONNECTIONLOSS和ZOPERATIONTIMEOUT这类异常发生的时候,也会触发会话事件的函数被回调。对于非正常情况的错误通常会进行重试操作等响应方式,当然需要在使用的时候甄别对待了。
  而其他的版本,基本都是将当初设置的参数再透传过来,供回调函数里面使用(包括重试之前的操作),常用手法了,此处不表。

1.3 获取节点数据和监听节点

  这里是ZooKeeper工作的重要部分,通过get的方式可以获取特定节点存储的数据信息和子节点信息,同时通过设置watch可以对数据或者节点的变更、异常做出最快的响应。
  获取数据一般有zoo_awget和zoo_awget_children两类,主要是获取节点本身所存储的数据,以及某个路径下所有的子节点信息,两者的回调函数不同:zoo_awget的回调函数类型是data_completion_t,其返回的数据存储在value指向的内存当中;而zoo_awget_children的回调函数类型是strings_completion_t,其返回的数据(子节点信息)存储在一个String_vector的自定义数据结构当中。
  上面的struct String_vector是一个简单的字符串数组的封装,count域表明内容的个数,data域为一些列的指针数组,我们见到传递过来的是一个const指针,如果后面自己额外申请了这个结构,就需要记得在使用完后使用free_vector进行内存的释放。

二、一个小小的例子

  那就看一下ZooKeeper最常用的场景——服务的发布和发现作用。
  在这里,我们模拟一个S端创建并且监听某一个路径;然后任意数目的C端启动后,会自动在前面约定的路径下建立(顺序、临时)节点,同时将自己服务提供IP:Port地址信息保存在数据域里面,形成一个服务发布的效果;服务端读取并设置该路径的watch event,遍历所有的C端,读取他们的数据域并打印出客户端的地址信息。通过这种方式,C端可以启动任意多个实例,而且不用硬编码约定的服务端口,双方唯一需要事先约定的就是zookeeper的znode路径就足够了。

2.1 初始化连接

  初始化连接就是创建zhandle_t的过程,因为这是一个异步过程,同时后续所有的操作都需要基于这个zhandle_t作为参数,所以初始化的过程是在一个while中不断检测进行的。
  下面的回调函数watcher不仅仅在创建的时候使用,整个过程中会话的状态发生改变,这个回调函数都会被自动调用,我们需要根据对应的state做出相应的处理。还有就是,各种不同的时间可以共用同一个回调函数,他们通过type参数进行区分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int init(){
handle = zookeeper_init("127.0.0.1:2181", watcher, 15000, NULL, 0, 0);
if(!handle) return -1;

while(connected != 1) { ::sleep(1); }
}

void watcher(zhandle_t *zkh, int type, int state, const char *path, void* context) {
if (type == ZOO_SESSION_EVENT) {
if(state == ZOO_CONNECTED_STATE) {
connected = 1;
} else if(state == ZOO_CONNECTING_STATE) {
if(connected == 1)
std::cout << __func__ << ": disconnected..." << std::endl;
connected = 0;
}else if(state == ZOO_EXPIRED_SESSION_STATE) {
expired = 1;
connected = 0;
zookeeper_close(zkh);
} else {
connected = 0;
std::cout << __func__ << ": unknown state:" << state << std::endl;
}
} else if (type == ZOO_CHILD_EVENT) {
if(strcmp(path, srv_path) != 0)
std::cout << __func__ << ": error path info:" << path << std::endl;

std::cout << __func__ << ": children event detected!" << std::endl;
show_info(); // 注意,需要再次安插watch event
}
}

  上面需要额外注意的就是ZOO_CHILD_EVENT,在其事件响应的最后通过调用show_info(),实际是重新安装了节点的watch event。我们知道,ZooKeeper的watch event是one-shot的,如果事件被激活,则需要手动再次设置watch event,才能继续得到后续的事件通知。同时,我们知道在某些情况下ZooKeeper的event是可能被丢失掉的,所以在实际使用的时候,还需要设置一个定时器,周期性的对节点进行扫描并安插watch event才比较稳妥。

2.2 创建服务路径节点

  然后就是服务端创建事先约定的路径节点。这是一个普通(持久、非序列)的znode,回调函数中rc状态码为ZOK表示创建成功,出错则可以不断重新尝试创建操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void create_path(const char* path, const char* value){
zoo_acreate(handle, path, value, strlen(value), &ZOO_OPEN_ACL_UNSAFE, 0, create_path_callback, NULL);
}

void create_path_callback(int rc, const char *value, const void *data) {
switch (rc) {
case ZCONNECTIONLOSS:
create_path(value, (const char *) data);
break;

case ZOK:
std::cout << __func__ << ": created node:" << value << std::endl;
break;

case ZNODEEXISTS:
std::cout << __func__ << ": node already exists" << std::endl;
break;

default:
std::cout << __func__ << ": something went wrong..." << rc << std::endl;
break;
}
}

2.3 发布服务

  下面是重头戏了,我们这里需要实现的效果就是服务的自动发布(而不需要事先约定路径、端口号等各种信息),客户端绑定本地的任意端口号,然后通过getsockname得到实际绑定的地址信息,并后续将其发布到自己创建的临时节点的数据项里面。
  相比于传统硬编码约定端口号的方式,可以避免SO_REUSEADDR类似的问题,而且可以多实例部署,简单方便。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = 0; /* bind() will choose a random port*/

bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
//listen(sockfd,5);

socklen_t len = sizeof(serv_addr);
getsockname(sockfd, (struct sockaddr *)&serv_addr, &len);
char buffer[100];
inet_ntop(AF_INET, &serv_addr.sin_addr, buffer, sizeof(buffer));

char msg[1024] = {0,};
sprintf(msg, "%s:%d", buffer, serv_addr.sin_port);

create_eph_seq_path( (std::string(srv_path) + "/srv_provider_").c_str(), msg);

  下面的操作是在指定的目录下面完成建立临时、序列节点,其实跟上面持久节点创建除了参数不同之外,都是一样的操作流程。这里节点的数据项,是上面获取的IP:Port的地址信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void create_eph_seq_path(const char* path, const char* value){
zoo_acreate(handle, path, value, strlen(value), &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL|ZOO_SEQUENCE, create_eph_seq_path_callback, NULL);
}

void create_eph_seq_path_callback(int rc, const char *value, const void *data) {
switch (rc) {
case ZCONNECTIONLOSS:
create_eph_seq_path(value, (const char *) data);
break;

case ZOK:
std::cout << __func__ << ": created eph_sql node:" << value << std::endl;
break;

case ZNODEEXISTS:
std::cout << __func__ << ": node already exists" << std::endl;
break;

default:
std::cout << __func__ << ": something went wrong..." << rc << std::endl;
break;
}
}

2.4 服务获取

  这里是服务发现的部分。其操作就是从指定目录下面获取其所有的子节点,对于每一个存在的节点都是一个活着的service provider,可以从其数据域中提取其地址信息,然后进行实际的服务调用,业务相关的东西就不演示了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void show_info(){
zoo_awget_children(handle, srv_path, watcher, NULL, info_get_callback, NULL);
}

void info_get_callback(int rc, const struct String_vector *strings, const void *data) {
switch(rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
show_info();
break;

case ZOK:
for( int i=0; i<strings->count; i++ ) {
std::cout << "=>" << strings->data[i] << std::endl;
char buff[128] = {0,};
int len = sizeof(buff);
zoo_get(handle, (std::string(srv_path) + "/" + strings->data[i]).c_str(), 0, buff, &len, NULL);
std::cout << buff << std::endl;
}
break;

default:
std::cout << __func__ << ": something went wrong..." << rc << std::endl;
break;
}
}

  在使用的过程中,发现增加服务的时候,ZOO_CHILD_EVENT很快就得到响应了,但是删除服务的时候,这个事件总是会延迟几秒钟才得到响应。究竟是因为ZooKeeper本身就这个德行,还是我的姿势不对?因为事件感知的越迟,对线上的业务影响就会越严重。Anyway,反正按照上面的接口和模式,算是说搭建一个分布式系统的服务还是挺简单的吧!
  例子的代码在server.cppclient.cpp,编译后直接链接zookeeper_mt就可以运行测试了。

  PS:的确,在客户端断线后,ZooKeeper服务器需要一段时间的超时检测才会认为节点不可用,所以客户端下线后到ZooKeeper确认后再行删除临时节点,再到后面的事件通知和回调是需要一定时间差的,因此针对这种非实时性的话业务在此需要作出考量。同时,如果客户端需要主动退出的话,需要优先向ZooKeeper服务注销自己,以避免上述不应期对业务带来的影响。
  对于分布式系统来说,通常有两种实现方式:一种就是像ZooKeeper这样作为独立的分布式服务提供,然后其他组件通过编译链接对应的客户端库,通过分布式服务获取相应的功能,其优点是容易上手,而且独立的分布式服务相对更加成熟稳定,对业务侵入性也小;还有一种形式就是以分布式库的方式提供,然后结合业务开发出高性能的分布式服务,比如分布式日志系统、分布式数据库等基础服务,其一般是要求定制型强,性能和可靠性高的场景,当然也最能装逼了啊!

本文完!

参考