ZooKeeper client library internals

3 minute read

This post aims to covering how ZooKeeper client library works internally. The version of ZooKeeper used is 3.6.1.

The topmost class

ZooKeeper is the public interface that a client can use to interact with a ZooKeeper server (sending requests and getting responses). The most notable features are as follows:

  • Session establishment is asynchronous
  • Read-only mode (useful in case of partitioning)
  • Sync/Async requests

Session establishment

When instantiating a ZooKeeper, it creates a ClientCnxn object, starts a sender thread and an event thread (these two threads will be described in the following section) and then returns immediately. So the session may or may not have been established at that moment.

Sync/Async requests

ZooKeeper provides both synchronous and asynchronous methods to send requests. The synchronous methods are implemented on top of the asynchronous mechanism. They are backed by below submitRequest method. In this method, after adding a new packet to the queue, the calling thread waits on the packet for its finishing.

zookeeper-server/.../org/apache/zookeeper/ClientCnxn.java
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
public ReplyHeader submitRequest(
    RequestHeader h,
    Record request,
    Record response,
    WatchRegistration watchRegistration,
    WatchDeregistration watchDeregistration) throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(
        h,
        r,
        request,
        response,
        null,
        null,
        null,
        null,
        watchRegistration,
        watchDeregistration);
    synchronized (packet) {
        if (requestTimeout > 0) {
            // Wait for request completion with timeout
            waitForPacketFinish(r, packet);
        } else {
            // Wait for request completion infinitely
            while (!packet.finished) {
                packet.wait();
            }
        }
    }
    if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
        sendThread.cleanAndNotifyState();
    }
    return r;
}

When using the asynchronous methods, a callback must be passed as an argument. All the supported types of callbacks are defined in AsyncCallback. Because the callback is executed in ZooKeeper IO thread, we shouldn’t perform expensive operations in the callback, otherwise, the ZooKeeper client won’t process other events in time.

The core of IO handling

The class ClientCnxn is the core of client IO handling. All operations on ZooKeeper are finally performed by it. Two threads are created and managed inside ClientCnxn, one is SenderThread, the other is EventThread.

Packets queue

Internally there are two queues for packets. One is outgoingQueue which stores packets ready to be sent. The other is pendingQueue which stores packets that have have been sent and are waiting for a response.

SenderThread

The SenderThread repeatedly does the following:

  • Connect to a ZooKeeper server unless connected
  • Authenticate to the server if connected and required
  • Send heart beats if connected
  • Call the lower level socket implementation to proceed (poll and process IO events)

Lower level socket implementations

The SenderThread owns a ClientCnxnSocket which abstracts out the lower level socket implementation. Two implementations are provided: ClientCnxnSocketNIO and ClientCnxnSocketNetty.

Let’s take ClientCnxnSocketNIO for an example. The main logic is doIO method. Inside doIO, when SocketChannel is readable, available data will be read into incomingBuffer which is a preallocated ByteBuffer (with fixed buffer size). When incomingBuffer is full, if it is lenBuffer (which is also a ByteBuffer but only accepts 4-bytes data which is the length of the incoming message), it’ll be used to to allocate a new incomingBuffer with that amount of free space.

zookeeper-server/.../org/apache/zookeeper/ClientCnxnSocketNIO.java
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
    ...
    if (sockKey.isReadable()) {
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
            throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                           + Long.toHexString(sessionId)
                                           + ", likely server has closed socket");
        }
        if (!incomingBuffer.hasRemaining()) {
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                recvCount.getAndIncrement();
                readLength();
            } else if (!initialized) {
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                    // Since SASL authentication has completed (if client is configured to do so),
                    // outgoing packets waiting in the outgoingQueue can now be sent.
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else {
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
    ...
}

Updated: