Apache ZooKeeper进一步学习

  最然之前自己搞过一段时间的分布式理论知识,对Paxos、ZAB、Raft一致性协议也或深或浅地有些了解认识,所以对这种最终一致性分布式系统的使用场景也有一定的概念(感觉和别人侃起来也更有谈资了),但是作为一个程序员来说,一旦要把他们落实到代码上来,下起手来还真是有些困难。前几天偶遇一本英文的《Apache ZooKeeper Essentials》,其中对ZooKeeper实现分布式系统常用组件的构建过程讲的相对具体一些,这里不敢独享也和大家分享一下。
  额外的在此感叹一下:一方面觉得Java的程序员好享福,Apache全家桶好多项目都是Java实现的,所以在企业项目开发中Java可选用的成熟组件非常的多,自然使用资料和经验也遍地都是;虽然很多库也提供了C语言访问库的绑定,而且通常这是必须的,因为很多脚本语言(比如Python、PHP)的绑定,受限于语言的操作能力和性能方面的考虑,大多也是基于C语言绑定之上再进行一层特定语言的封装。这个过程中,C++的地位感觉有些尴尬了,除非原生使用C++开发的,绝大多数的组件感觉都没有原生C++绑定,而C++程序员想要用的Happy顺手(比如自动构造中初始化,自动析构释放资源),就必须自己基于C语言绑定再进行进一步的封装,因此C++的世界中,这种简单封装的轮子在我们的项目中非常之普遍,由此可见C++对C语言封装技能将会是C++程序员必备重要技能之一啊!

一、ZooKeeper的启动配置

  为了兼顾于测试和生产环境的需要,ZooKeeper具有单机模式和集群模式的部署形式可供选择。

1.1 standalone模式

  将conf/zoo_sample.cfg拷贝成conf/zoo.cfg,在该配置中有几个必须的参数需要说明:tickTime表示每个tick所代表的时长单元,以ms为单位,后续很多的配置都是基于这个tickTime来计数的,比如心跳间隔;dataDir表示数据存储目录,ZooKeeper服务会有一个in-memory状态数据库,而这个目录就是用于存储数据库的快照内容和修改事务日志信息使用的,这是一个类似的append only的记录文件;clientPort是接收客户端连接请求的侦听端口,默认是2181。
  使用上面的配置信息,使用bin/zkServer.sh start就可以启动ZooKeeper服务端了,通过使用bin/zkServer.sh status命令可以看见,当前服务工作在standalone模式下,因为该模式存在单点故障的风险,所以只可以用于测试使用。
  ZooKeeper提供Java和C两种语言命令行客户端访问版本,分别通过bin/zkClient.sh -server 127.0.0.1:2181cli_mt 127.0.0.1:2181可以连接服务端,他们风格有些差异,自行选择。

1.2 multinode cluster模式

  生产环境的ZooKeeper至少需要3台,通常需要5台甚至更多的实例部署成集群模式,且最好是奇数个的主机数量。这种模式下,zoo.cfg配置文件需要一些额外的信息:

1
2
3
4
5
initLimit=10
syncLimit=5
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

  initLimit表示fellower初始连接到leader的超时ticket;syncLimit表示fellower和leader同步的超时ticket;剩下部分使用server.id=host:port1:port2的格式列出主机列表,其中id的范围是1-255证书表示的主机ID,指定后需要在每个实体主机的dataDir根目录通过新建myid文件并写入对应相匹配的ID,port1表示集群中peer-to-peer各主机相互交互端口(比如fellower和leader相互交互的端口),而port2主要用来选主操作过程中使用的端口号。
  在集群模式下,客户端连接方式的主机参数需要指明集群中的所有机器,形如:bin/zkCli.sh -server zoo1:2181,zoo2:2181,zoo3:2181

1.3 multiple node模式

  在单个机器下也可以运行一个ZooKeeper集群,各个主机分别用不同的端口(而不是上面的IP)来独立运行,不过这种情况也是建议作为测试目的。配置文件中,各个主机的clientPort端口号以及port1、port2端口就必须各不相同,同时还要为各个主机指定不同的dataDir目录,其余的条目和上面multinode cluster的条目相同。其中的一个节点可能的配置如下:

1
2
3
4
5
6
7
8
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper/zoo1
clientPort=2181
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668

  接着,依次启动这些服务节点。

1
2
3
4
5
nicol@centos6:/tmp/zookeeper|⇒ mkdir zoo1 zoo2 zoo3
nicol@centos6:/tmp/zookeeper|⇒ echo 1 > zoo1/myid && echo 2 > zoo2/myid && echo 3 > zoo3/myid
nicol@centos6:~/dist/zookeeper-3.4.9|⇒ bin/zkServer.sh start conf/zoo1.cfg
nicol@centos6:~/dist/zookeeper-3.4.9|⇒ bin/zkServer.sh start conf/zoo2.cfg
nicol@centos6:~/dist/zookeeper-3.4.9|⇒ bin/zkServer.sh start conf/zoo3.cfg

  然后,相对应的客户端连接集群的方式就变成如下形式:

1
nicol@centos6:~/dist/zookeeper-3.4.9|⇒ bin/zkCli.sh -server localhost:2181,localhost:2182,localhost:2183

此时,集群应该是正常工作了。要想看某个节点的角色(fellower、leader),可以使用status命令即可:

1
2
3
4
nicol@centos6:~/dist/zookeeper-3.4.9|⇒ bin/zkServer.sh status conf/zoo2.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo2.cfg
Mode: leader

  PS: 发现将数据放到/tmp目录真不是什么好习惯。测试环境不知道为啥原因ZooKeeper集群连接不上,然后我把进程kill掉后,发现/tmp/zookeeper/目录空空如也。接着我重建各个node的数据目录和myid后,重启后发现之前所有的配置数据都没有了,哭晕在厕所……所以上面的数据目录建议配置到持久化设备上去,虽然性能可能有些弱,但是不至于数据丢失的情况发生。

二、ZooKeeper原理再次浅析

2.1 数据模型

  ZooKeeper基本数据模型是znode,以类似文件系统的方式组织成为树状结构,每个znode可以存储一个任意二进制比特串,其长度被限制成1MB的大小,这通常情况下足够使用了,只不过不支持数据类型的存储在实际使用中还是有些不便,znode的路径表示必须是绝对路径形式的。znode分为persistent、ephemeral两种类型,可以外加sequential属性,那么总共可以组织成4种类型的znode节点。
  (1) persistent
  这种模式持久节点除非被显式调用API删除外,否则会一直存在,在使用中通常用于存储高可用的、重要的数据内容,比如程序、服务的配置信息等。
  (2) ephemeral
  这种模式临时节点会在创建它的客户端和服务器建立的session会话断开后自动删除,当然该节点也可以使用delete的API显式删除,当前ZooKeeper的限制是这种临时节点不允许有children。临时节点在分布式系统中通常用于某些组件需要动态知道其他组件、服务或者资源状态和可用性的时候,命令行创建使用-e参数指定。
  (3) sequential
  当节点有这种属性的时候,ZooKeeper在创建该节点的时候会动态为其产生一个单调递增的序列号,并追加成为节点名的一部分。序列号是使用int(4字)存储,以0打头补齐总共用10个数字显示(这样的目的是为了好排序的目的),顺序节点命令行创建需使用-s参数。

2.2 ZooKeeper Watch监听

  服务更新通常有poll和push两种模式,前者在规模大和复杂的系统下伸缩性不强,而ZooKeeper采用了事件通知的方法来实现一种类似push的通知模式,相关事件会被推送给感兴趣的注册客户端,在ZooKeepr中注册的过程就是在znode上面设置watch event的过程。需要注意的是watch是one-time类型的操作,即一次注册只会触发一次通知,如果客户端还想继续侦听znode节点的事件,就必须在接收到通知的时候再次手动注册之。还需要注意的是,因为ZooKeeper的watch event是one-time触发的,所以在服务发送通知和客户端收到通知并重置监听这个时间间隔中,如果这个znode此时再有notification发生,则这个消息就会被丢失掉,应用程序开发的时候需要考虑到会有这个问题。
  在watch event和notification机制中,ZooKeeper保证:所有的事件按照FIFO的顺序处理,会保证notification是按序发送的;对于一个znode,在其notification通知到客户端之前,不会再对这个znode做任何其他修改操作;发送事件的顺序是按照ZooKeeper服务接收到改变顺序为准。此外如果客户端同ZooKeeper服务因为某种原因断开连接后,而后又重新恢复连接(会话),无论新连接上的主机是否是之前会话所在的主机,之前注册的watch event将会被自动恢复注册,而且之前的notification也将会被正常推送,所以可以说客户端连接到集群的任何服务器都是绝对透明的,而唯一的特例是当一个客户端注册一个znode的exist事件侦听,在客户端离线的时候这个znode被创建然后又被删除,而后来客户端恢复连接的时候这个事件是不会得到这个过程的事件通知的。

2.3 ZooKeeper数据模型的操作API

  ZooKeeper数据模型支持的操作接口有:create、delete、exists、getChildren、getData、setData、getACL、setACL、sync,除了上面的普通接口外ZooKeeper还支持对znode的批次更新multi,通过批次操作可以将对znode的多个原语操作合并成一个单元,且这个操作整体是一个原子操作。客户端对读操作可以直接由客户端连接的服务器处理(很可能是fellower),这个过程还可以使用sync让客户端同步得到最新的状态或结果,而对于写操作会自动转发到leader节点上,并且在事务持久化到绝大多数节点之后,才会为这个请求生成响应信息。
  在对znode进行读操作的时候(比如exists、getChildren、getData),允许watch event设置在对应节点上,而写操作(比如create、delete、setData)则会触发watch event并发生notification,常见的watch event事件类型有:NodeChildrenChanged、NodeCreated、NodeDateChanged、NodeDeleted等。

2.4 客户端会话

  开始的时候,客户端会随机连接主机列表中的服务器,如果失败后会继续尝试下一台服务器,这个过程将一直尝试直到成功建立连接或者尝试了所有的服务器。当客户端和服务器连接成功后,一个session就被创建了,并且使用一个64-bit的数字进行标识,session在ZooKeeper中扮演着重要的角色,比如ephemeral节点的生命周期就同客户端和服务器的连接相关联的,一旦session结束后其临时节点将会被自动删除。session具有超时周期属性,这在客户端和服务器创建连接的时候由客户端作为参数制定并由ZooKeeper集群管理,当前实现要求timeout的值最小为2 x tickTIme,最大为20 x tickTime。
  session是通过客户端向ZooKeeper发送ping request心跳维持的,而且是由客户端库自动发送(应用程序无须关心),当心跳超时之后ZooKeeper和客户端的连接会被删除,且此时客户端库会自动透明实现和ZooKeeper的重连操作,一旦重连成功(无论是否是之前那台主机),现存的session、相关的ephemeral节点会继续存在生效,且客户端断线这段时间中所有pending notification将会被按照顺序重新发送给客户端。

三、ZooKeeper构建常用分布式组件

3.1 Barrier

  Barrier屏障是分布式系统中常用的用于阻塞所有执行在某个执行点目的,直到某个条件被满足后再让所有的节点继续执行,ZooKeeper实现屏障的步骤是:
  a. 创建/zk_barrier节点,当这个znode存在的时候表示barrier生效;
  b. 所有客户端调用exist()接口向/zk_barrier注册watch event;
  c. 如果exists()返回true则表示barrier生效,此时客户端阻塞等待notification;
  d. 负责/zk_barrier的节点检查直到某个条件满足后,删除/zk_barrier;
  e. 上面的删除操作会触发watch event,此时所有阻塞在此的客户端都接收到notification,他们再次调用exists()都返回false,表明barrier已经被消除,则所有客户端开始执行。
  根据上面的原理,还可以轻松的实现double-barrier,可以用于控制计算的开始、退出时间点,原理是当指定数目的线程加入到barrier中时允许计算开始执行,而当所有的计算节点计算完成退出后,整个计算过程被标示为结束状态。其过程可以描述为:
  Phase 1
  a. 假设/barrier节点为目的点,每一个客户端处理进程都会在/barrier目录下创建ephemeral节点,而通常客户端会用自己的主机名标识自己这个临时节点;同时,客户端还需要设置/ready节点的watch event监听;
  b. N是预先知晓的客户端处理进程数目最小阈值,每当一个客户端进程加入之后,都调用M = getChildren(/barrier, watch=false)检查子节点的数目;
  c. 当M >= N的时候,该加入的客户端处理进程负责创建/ready节点;
  d. 上面的创建操作会触发所有其他客户端开始计算操作;
  Phase 2
  a. 当客户端进程计算结束之后,都删除当时自己创建的临时节点;
  b. 然后客户端进程调用M = getChildren(/barrier, watch=true),当M !=0 时候继续等待,而当M == 0时候,则大家就可以全部退出barrier了。
  上面实现的不理想之处是存在惊群(herd effect)效应,改进的方式是创建临时节点时候采用sequential ephemeral节点,而在最后退出的时候,每个客户端进程只对临近于自己较小的顺序节点添加watch event即可。

3.2 Queue

  Queue作为联系生产者和消费最常用纽带,在分布式系统中也经常会被使用。通常而言生产者在某个节点下创建子节点以表示为“生产”行为,而消费者删除子节点表示“消费”过程,当然常常任务队列还需要FIFO的顺序特性,此时创建的子节点可以则可以是sequential顺序节点。
  a. 创建/QUEUE表示任务队列;
  b. 生产者通过创建顺序子节点表示产生消息,其调用为create(“queue-“, SEQUENTIAL_EPHEMERAL),其产生的消息形如queue-N;
  c. 消费者通过调用 M = getChildren(/QUEUE, watch=true),该调用会返回一个子节点列表,通过排序字节点列表并从中选出序列号最小的子节点出来,消费者可以删除掉这个字节点表示已经消费之;
  d. 消费者一直等到M中的子节点消费完后,再调用一次getChildren()调用,以查看是否有新的子节点被添加进来。
  上面的操作在删除子节点的时候可能会有问题,就是在其他客户端获取该自己点访问的时候,此时删除之会返回失败,客户端则需要重新尝试删除之(这么看来无法保证节点只被一个消费者消费啊)。queue实现的例子可以从recipes中查看官方样例。基于上面的例子,实现PriorityQueue也很简单,需要创建的队列名字是”queue-YY”即可,其中YY用于标示队列的优先级。

3.3 Lock

  分布式锁是分布式系统中用于同步访问共享资源的重要原语,分布式锁要求多个客户端不会同时持有该锁资源,以实现某一时刻只有一个客户端可以对指定的资源进行访问操作,比如写共享的文件、数据库。
  为了创建分布式锁,首先需要创建一个持久节点作为锁节点,客户端如果需要这个锁就必须在其下创建临时序列ephemeral-sequential节点,然后我们约定拥有最小序列号的节点拥有该锁,而当客户端释放锁的时候,只需要删除自己这个临时节点就可以了,其请求锁过程可以表述如下:
  a. 创建锁节点/locknode,客户端获取锁的时候创建顺序临时节点create(“/locknode/lock-“, CreateMode=EPHEMERAL_SEQUENTIAL)
  b. 检查锁节点的所有子节点getChildren(“/locknode/lock-“, watch=false),这里设置watch event为false是为了避免惊群效应;
  c. 如果在步骤a中创建的节点拥有最小序列号,则该节点获取到该锁,退出获取锁的算法过程;
  d. 否则就调用exists(“/locknode/znode具有相邻较小序列号, watch=true),如果返回false则进入步骤b;
  e. 如果上面步骤返回true,则客户单安心等待释放锁的通知既可以;
  释放锁的过程如下:
  a. 当持有锁的客户端直接删除相应的临时节点,这个删除操作也会让等待其释放所的其他某个客户端会收到notification;
  b. 那个收到通知的客户端应当在此时具有最小序列号的客户端,其得到通知的过程就应当是获取锁的过程。

3.4 Leader Election

  选主就是要求分布式系统中只有一个客户端进程作为组织、协调者的情况,这样可以简化多个客户端进程的同步、分配、管理等工作,选主操作的重要作用就是消除Leader带来的单点问题,能够在Leader挂掉的情况下快速选出新Leader继续服务。通常,选主操作必须满足的条件是在任何时候,至多只有一个Leader存在。
  在下面的选主操作中,会使用临时序列节点,我们同样假定具有最小序列号的临时节点代表的主机为Leader。最简单的方式就是当作为Leader最小序列号的节点消失后,所有节点收到通知并检查自己是否是最小节点,然后具有最小序列号的那个节点行驶Leader操作,这必然是惊群的实现。
  a. 创建/election持久节点作为所有待参选客户端根节点,参选的客户端创建临时序列节点/election/candidate-sessionID_路径,使用sessionID的识别码主要是用于帮助异常情况的名字识别,在创建临时顺序节点的时候可能create()已经成功但是server此时crash掉了,那么客户端就无法得到当前创建的名字,而此时client的会话仍然是有效的,此时客户端通过扫描sessionID可以得到相关节点,这里所涉及到的ZooKeeper的特性,是序列节点的序列号是基于目录递增的,而不是基于特定前缀递增的。下面假设create()调用成功后返回N序列号;
  b. 客户端可以获取当前参选者信息L = getChildren(“/election“, watch=false),不设置watch event是为了防止竞选过程中出现惊群情况;
  c. 对/election/candidate-sessionID_M设置watch event,其中M是小于N相邻序列号L = getChildren(“/election/candidate-sessionID_M”, watch=true)
  d. 当得到上面节点的删除notification的时候,调用getChildren(“/election/“, watch=false)获取所有的参选节点;
  e. 当得到最新的参选列表L时候,此时:如果本节点candidate-sessionID_N是L中的最小节点,则声明自己为Leader;否则像上面一样监听/election/candidate-sessionID_M,其中M是小于N相邻序列号;
  f. 如果当前的Leader已经crash掉,则拥有次最小序列号的客户端收到通知,通过上面的d步骤检查将会作为leader继任;
  可选的,Leader可以将自己的识别信息保留在固定节点上,这时候其他节点将可以方便的查询那个固定节点的信息,得知当前系统的Leader是谁了。

3.5 Group membership

  群组管理功能是允许其他进程能够相互感知进程加入、退出集群的功能,以便得到当前集群的最新状态。在ZooKeeper中通过ephemeral临时节点的特性,任何客户端加入集群都可以在一个预设的路径作为父节点创建一个ephemeral临时节点,而通过在这个父节点上增加watch event就可以感知加入、离开集群的操作导致集群成员的变化。
  a. 创建/membership永久节点,任何客户端加入群组都需要在这个路径下创建ephemeral临时节点;
  b. 群组的所有成员注册/membership的watch event,通过调用L = getChildren(“/membership”, watch=true)方法,以感知群组成员的变化事件;
  c. 当有客户端加入群组的时候就会创建ephemeral临时节点,客户端离开或者crash等情况,ZooKeeper服务会自动删除其对应的临时节点,所有的其他成员将得到通知;
  d. 通过查看L,成员可以得知加入或者离开群组的成员;
  当然,上面的实现会有herd effect惊群问题。

3.6 Two-Phase commit

  2PC是事务系统中最常见的一致性手段,用于保证某个事务在所有客户端都是原子性commit或rollback。2PC的两个阶段是:协调者询问所有参与者,让所有参与者投票是提交还是放弃某个事务;协调者收集所有选票,如果所有参与者都赞成commit那么就提交事务,否则就回滚事务,协调者最终将结果通知给所有参与者。
  a. 创建/2PC_Transactions代表2PC事务节点,协调者(可以通过之前的选主方式选出Leader作为协调者)在其下创建事务,然后在其上设置watch event;
  b. 协调者在事务节点下创建另外一个持久子节点/2PC_Transactions/TX/tx_result,用于发布commit、abort或者其他的协议相关信息;
  c. 所有的事件参与者都在/2PC_Transactions/和/2PC_Transactions/TX/tx_result下创建watch event;
  d. 当协调者创建事务节点的时候,所有的参与客户端都会收到notification,然后客户端针对事务信息发起自己的投票;投票的过程就是在/2PC_Transactions/TX下创建带有自己主机标示的ephemeral临时节点,并表明自己的投票立场;
  e. 协调者发现应当参与的所有参与者都建立了临时节点后,就清点各个客户端的选票,根据投票结果将最终的事务结果写入到tx_result节点中;
  f. 当tx_result节点更新的时候,所有的客户端都会收到NodeDataChanged事件通知,然后根据最终的决议结果发起提交还是回滚操作。

3.7 Service Discovery

  服务发现是分布式系统的核心功能和SOA架构的核心构件,最简单的服务发现是可以让客户端发现服务提供者的IP:Port地址信息。服务发现的核心特性包括:其允许服务进行注册以表明自己的可用性;其通过某种方式可以定位到一个现存可用的服务;服务的变更可以通知传播出去。
  a. 约定/services是一个固定节点用于服务发布;
  b. 服务注册发布的过程中,服务会在/services下面注册一个ephemeral临时节点;
  c. 服务发现的过程,客户端加入集群后可以对某个服务的路径添加watch侦听,新服务实例的加入和退出都可以被通知到。

哈哈,是不是感觉离实践更进一步了,加油!
本文完!

参考