MENU

ZooKeeper

May 11, 2020 • Read: 465 • Java

ZooKeeper

zookeeper产生背景:

项目从单体到分布式转变之后,将会产生多个节点之间协同的问题。如:

  1. 每天的定时任务由谁哪个节点来执行?
  2. RPC调用时的服务发现?
  3. 如何保证并发请求的幂等
  4. ....

这些问题可以统一归纳为多节点协调问题,如果靠节点自身进行协调这是非常不可靠的,性能上也不可取。必须由一个独立的服务做协调工作,它必须可靠,而且保证性能。

ZooKeeper 基础知识基本分为三大模块:

  • 数据模型
  • ACL 权限控制
  • Watch 监控

数据模型

请输入图片描述

znode 节点类型与特性

  1. 持久节点

删除持久节点,就要显式调用 delete 函数进行删除操作

  1. 临时节点

当创建该临时节点的客户端会话因超时或发生异常而关闭时,该节点也相应在 ZooKeeper 服务器上被删除。利用临时节点的这一特性来做服务器集群内机器运行情况的统计,将集群设置为“/servers”节点,并为集群下的每台服务器创建一个临时节点“/servers/host”,当服务器下线时该节点自动被删除,最后统计临时节点个数就可以知道集群中的运行情况。

  1. 有序节点

节点的状态结构

节点状态信息。

Ciqc1F6yL-yAKn9QAABsJSpQkFI688

每一个节点都有一个自己的状态属性,记录了节点本身的一些信息,这些属性包括的内容我列在了

Ciqc1F6zbwWAVkt5AAC_yMQVCFo712

使用 ZooKeeper 实现锁

  • 悲观锁

悲观锁认为进程对临界资源的竞争总是会出现,为了保证进程在操作数据时,该数据不被其他进程修改,数据会一致醋鱼被锁定的状态。

我们假设一个具有 n 个进程的应用,同时访问临界区资源,我们通过进程创建 ZooKeeper 节点 /locks 的方式获取锁。

线程 a 通过成功创建 ZooKeeper 节点“/locks”的方式获取锁后继续执行,如下图所示:

CgqCHl6yL_WAAnymAAB32xbrhxQ973

这时进程 b 也要访问临界区资源,于是进程 b 也尝试创建“/locks”节点来获取锁,因为之前进程 a 已经创建该节点,所以进程 b 创建节点失败无法获得锁。

CgqCHl6yL_6AOIONAAB3daUjikw147

这样就实现了一个简单的悲观锁,但是会产生死锁的问题,针对死锁我们可以通过将节点设置为临时节点的方式避免,并通过在服务器端添加监听事件来通知其他进程重新获取锁。

  • 乐观锁

乐观锁认为,进程对临界区资源的竞争不会总是出现,所以相对悲观锁而言。加锁方式没有那么激烈,不会全程的锁定资源,而是在数据进行提交更新的时候,对数据的冲突与否进行检测,如果发现冲突了,则拒绝操作。我理解的乐观锁就是CAS操作,

在 ZooKeeper 中的 version 属性就是用来实现乐观锁机制中的“校验”的,ZooKeeper 每个节点都有数据版本的概念,在调用更新操作的时候,假如有一个客户端试图进行更新操作,它会携带上次获取到的 version 值进行更新。而如果在这段时间内,ZooKeeper 服务器上该节点的数值恰好已经被其他客户端更新了,那么其数据版本一定也会发生变化,因此肯定与客户端携带的 version 无法匹配,便无法成功更新,因此可以有效地避免一些分布式更新的并发问题。

ZooKeeper-Watch 机制实现

Watch 机制是如何实现的

Ciqc1F61ILaAb7sQAAC6T3wMHDU651

上图中列出了客户端在不同会话状态下,相应的在服务器节点所能支持的事件类型。例如在客户端连接服务端的时候,可以对数据节点的创建、删除、数据变更、子节点的更新等操作进行监控。

现在我们已经从应用层的角度了解了 ZooKeeper 中的 Watch 机制,而学习 ZooKeeper 过程中一个大问题就是入门容易精通难,像上边我们通过几个简单的 API 调用就可以对服务器的节点状态变更进行监控,但是在实际生产环境中我们会遇到很多意想不到的问题,要想解决好这些问题就要深入理解 Watch 的底层实现机制。

Watch 机制的底层原理

Watch 机制,我们另辟蹊径,从设计模式角度出发来分析其底层实现:

Ciqc1F61IL-AEQuUAABdpaAsy2k628

以观察者模式的角度点来看看 ZooKeeper 底层 Watch 是如何实现的。

Ciqc1F61IMWAbWW9AABzXk9xuOs953

最核心或者说关键的代码就是创建一个列表来存放观察者。
ZooKeeper 中则是在客户端和服务器端分别实现两个存放观察者列表,即:ZKWatchManager 和 WatchManager

客户端 Watch 注册实现过程

我们先看一下客户端的实现过程,在发送一个 Watch 监控事件的会话请求时,ZooKeeper 客户端主要做了两个工作:

  • 标记该会话是一个带有 Watch 事件的请求
  • 将 Watch 事件存储到 ZKWatchManager

我们以 getData 接口为例。当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系:

public byte[] getData(final String path, Watcher watcher, Stat stat){
    ...
        WatchRegistration wcb = null;
                if (watcher != null) {
                        wcb = new DataWatchRegistration(watcher,clientPath);
        }
        RequestHeader h = new RequestHeader();
        request.setWatch(watcher != null);
        ...
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request,response,wcb);
}

之后客户端向服务器发送请求时,是将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中:

public Packet queuePacket(RequestHeader h, ReplyHeader r,...){
    Packet packet = null;
    ...
    packet = new Packet(h, r, request, response,watchRegistration);
    ...
    outgoingQueue.add(packet); 
    ...
    return packet;
}

最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager 中:

private void finishPacket(Packet p) {
    int err = p.replyHeader.getErr();
    if (p.watchRegistration != null) {
        p.watchRegistration.register(err);
    }
    ...
}

服务端 Watch 注册实现过程

Zookeeper 服务端处理 Watch 事件基本有 2 个过程:

解析收到的请求是否带有 Watch 注册事件

  • 将对应的 Watch 事件存储到 WatchManager
  • 下面我们分别对这 2 个步骤进行分析:

当 ZooKeeper 服务器接收到一个客户端请求后,首先会对请求进行解析,判断该请求是否包含 Watch 事件。这在 ZooKeeper 底层是通过 FinalRequestProcessor 类中的 processRequest 函数实现的。当 getDataRequest.getWatch() 值为 True 时,表明该请求需要进行 Watch 监控注册。并通过 zks.getZKDatabase().getData 函数将 Watch 事件注册到服务端的 WatchManager 中。

public void processRequest(Request request) {
        ...
        byte b[]=zks.getZKDatabase().getData(getDataRequest.getPath(),stat,
                               getDataRequest.getWatch() ? cnxn : null);
        rsp = new GetDataResponse(b, stat);
        ..
}

服务端 Watch 事件的触发过程

在客户端和服务端都对 watch 注册完成后,我们接下来看一下在 ZooKeeper 中触发一个 Watch 事件的底层实现过程:

我们以 setData 接口即“节点数据内容发生变更”事件为例。在 setData 方法内部执行完对节点数据的变更后,会调用 WatchManager.triggerWatch 方法触发数据变更事件。

public Stat setData(String path, byte data[], ...){
    Stat s = new Stat();
    DataNode n = nodes.get(path);
    ...
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

下面我们进入 triggerWatch 函数内部来看看他究竟做了哪些工作:

  • 封装了一个具有会话状态、事件类型、数据节点 3 种属性的 WatchedEvent 对象。
  • 查询该节点注册的 Watch 事件,如果为空说明该节点没有注册过 Watch 事件。

如果存在 Watch 事件则添加到定义的 Wathcers 集合中,并在 WatchManager 管理中删除。

  • 通过调用 process 方法向客户端发送通知。
 Set<Watcher> triggerWatch(String path, EventType type...) {
    WatchedEvent e = new WatchedEvent(type,
                                 KeeperState.SyncConnected, path);
    Set<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path);
        ...
        for (Watcher w : watchers) {
            Set<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
           continue;
        }
        w.process(e);
    }
    return watchers;
}

客户端回调的处理过程

客户端使用 SendThread.readResponse() 方法来统一处理服务端的相应。

  • 首先反序列化服务器发送请求头信息 replyHdr.deserialize(bbia, "header"),并判断相属性字段 xid 的值为 -1,表示该请求响应为通知类型。
  • 在处理通知类型时,首先将己收到的字节流反序列化转换成 WatcherEvent 对象。接着判断客户端是否配置了 chrootPath 属性,如果为 True 说明客户端配置了 chrootPath 属性。需要对接收到的节点路径进行 chrootPath 处理。最后调用 eventThread.queueEvent( )方法将接收到的事件交给 EventThread 线程进行处理
if (replyHdr.getXid() == -1) {
    ...
    WatcherEvent event = new WatcherEvent();
    event.deserialize(bbia, "response");
    ...
    if (chrootPath != null) {
        String serverPath = event.getPath();
        if(serverPath.compareTo(chrootPath)==0)
            event.setPath("/");
            ...
            event.setPath(serverPath.substring(chrootPath.length()));
            ...
    }
    WatchedEvent we = new WatchedEvent(event);
    ...
    eventThread.queueEvent( we );
}

接下来我们来看一下 EventThread.queueEvent() 方法内部的执行逻辑。其主要工作分为以下:

  • 按照通知的事件类型,从 ZKWatchManager 中查询注册过的客户端 Watch 信息。客户端在查询到对应的 Watch 信息后,会将其从 ZKWatchManager 的管理中删除。因此这里也请你多注意,客户端的 Watcher 机制是一次性的,触发后就会被删除。
  • 获取到对应的 Watcher 信息后,将查询到的 Watcher 存储到 waitingEvents 队列中,调用 EventThread 类中的 run 方法会循环取出在 waitingEvents 队列中等待的 Watcher 事件进行处理
  • 调用processEvent(event) 方法来最终执行实现了 Watcher 接口的 process()方法。
public Set<Watcher> materialize(...){
        Set<Watcher> result = new HashSet<Watcher>();
        ...
        switch (type) {
    ...
        case NodeDataChanged:
        case NodeCreated:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            synchronized (existWatches) {
                addTo(existWatches.remove(clientPath), result);
            }
            break;
        ....
        }
        return result;
}

public void run() {
        try {
          isRunning = true;
          while (true) {
             Object event = waitingEvents.take();
             if (event == eventOfDeath) {
                wasKilled = true;
             } else {
                processEvent(event);
             }
             if (wasKilled)
                synchronized (waitingEvents) {
                   if (waitingEvents.isEmpty()) {
                      isRunning = false;
                      break;
                   }
            }
      }
    ...
}

private void processEvent(Object event) {
      ...
      if (event instanceof WatcherSetEventPair) {
          WatcherSetEventPair pair = (WatcherSetEventPair) event;
          for (Watcher watcher : pair.watchers) {
              try {
                  watcher.process(pair.event);
              } catch (Throwable t) {
                  LOG.error("Error while calling watcher ", t);
              }
          }
      }
}