首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

深入显出Zookeeper之五 Leader选举

2013-01-26 
深入浅出Zookeeper之五Leader选举tickTime2000dataDir/home/admin/zk-dataclientPort2181#Learner初始

深入浅出Zookeeper之五 Leader选举
tickTime=2000dataDir=/home/admin/zk-dataclientPort=2181#Learner初始化连接到Leader的超时时间initLimit=10#Learner和Leader之间消息发送,响应的超时时间syncLimit=5#集群配置,3台机器,2888为Leader服务端口,3888为选举时所用的端口server.1=master:2888:3888server.2=slave1:2888:3888server.3=slave2:2888:3888

?在server.1的$dataDir下

echo '1'>myid

启动server.1

./zkServer.sh start

?分析之前先看看选举相关的类图

深入显出Zookeeper之五  Leader选举

?

入口函数QuorumPeerMain主线程启动

?

public void runFromConfig(QuorumPeerConfig config) throws IOException {      ......        LOG.info("Starting quorum peer");      try {//对client提供读写的server,一般是2181端口          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();          cnxnFactory.configure(config.getClientPortAddress(),                                config.getMaxClientCnxns());  //zk的逻辑主线程,负责选举,投票等          quorumPeer = new QuorumPeer();          quorumPeer.setClientPortAddress(config.getClientPortAddress());          quorumPeer.setTxnFactory(new FileTxnSnapLog(                      new File(config.getDataLogDir()),                      new File(config.getDataDir())));//集群机器地址          quorumPeer.setQuorumPeers(config.getServers());          quorumPeer.setElectionType(config.getElectionAlg());//本机的集群编号          quorumPeer.setMyid(config.getServerId());          quorumPeer.setTickTime(config.getTickTime());          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());          quorumPeer.setInitLimit(config.getInitLimit());          quorumPeer.setSyncLimit(config.getSyncLimit());//投票决定方式,默认超过半数就通过          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());          quorumPeer.setCnxnFactory(cnxnFactory);          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));          quorumPeer.setLearnerType(config.getPeerType());  //启动主线程          quorumPeer.start();          quorumPeer.join();      } catch (InterruptedException e) {          // warn, but generally this is ok          LOG.warn("Quorum Peer interrupted", e);      }    }
?QuorumPeer复写Thread.start方法,启动

?

?

    @Override    public synchronized void start() {//恢复DB,从zxid中回复epoch变量,代表投票轮数        loadDataBase();//启动针对client的IO线程        cnxnFactory.start();//选举初始化,主要是从配置获取选举类型                startLeaderElection();//启动        super.start();    }
?loadDataBase过程,恢复epoch数

?

?

private void loadDataBase() {try {//从本地文件恢复db            zkDb.loadDataBase();            // load the epochs//从最新的zxid恢复epoch变量,zxid64位,前32位是epoch值,后32位是zxid            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;    long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);            try {            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);            } catch(FileNotFoundException e) {            // pick a reasonable epoch number            // this should only happen once when moving to a            // new code version            currentEpoch = epochOfZxid;            LOG.info(CURRENT_EPOCH_FILENAME                    + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",                    currentEpoch);            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);            }            if (epochOfZxid > currentEpoch) {            throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);            } .......}
?选举初始化

?

?

synchronized public void startLeaderElection() {    try {//先投自己    currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());    } catch(IOException e) {    RuntimeException re = new RuntimeException(e.getMessage());    re.setStackTrace(e.getStackTrace());    throw re;    }//从配置中拿自己的选举地址        for (QuorumServer p : getView().values()) {            if (p.id == myid) {                myQuorumAddr = p.addr;                break;            }        }        ......//根据配置,获取选举算法        this.electionAlg = createElectionAlgorithm(electionType);    }
?获取选举算法,默认为FastLeaderElection算法

?

?

protected Election createElectionAlgorithm(int electionAlgorithm){        Election le=null;                        //TODO: use a factory rather than a switch        switch (electionAlgorithm) {        case 0:            le = new LeaderElection(this);            break;        case 1:            le = new AuthFastLeaderElection(this);            break;        case 2:            le = new AuthFastLeaderElection(this, true);            break;        case 3://leader选举IO负责类            qcm = new QuorumCnxManager(this);            QuorumCnxManager.Listener listener = qcm.listener;            //启动已绑定3888端口的选举线程,等待集群其他机器连接    if(listener != null){                listener.start();//基于TCP的选举算法                le = new FastLeaderElection(this, qcm);            } else {                LOG.error("Null listener when initializing cnx manager");            }            break;        default:            assert false;        }        return le;    }
?FastLeaderElection初始化

?

?

    private void starter(QuorumPeer self, QuorumCnxManager manager) {        this.self = self;        proposedLeader = -1;        proposedZxid = -1;//业务层发送队列,业务对象ToSend        sendqueue = new LinkedBlockingQueue<ToSend>();//业务层接受队列,业务对象Notificataion        recvqueue = new LinkedBlockingQueue<Notification>();//        this.messenger = new Messenger(manager);    }Messenger(QuorumCnxManager manager) {//启动业务层发送线程,将消息发给IO负责类QuorumCnxManager            this.ws = new WorkerSender(manager);            Thread t = new Thread(this.ws,                    "WorkerSender[myid=" + self.getId() + "]");            t.setDaemon(true);            t.start();//启动业务层接受线程,从IO负责类QuorumCnxManager接受消息            this.wr = new WorkerReceiver(manager);            t = new Thread(this.wr,                    "WorkerReceiver[myid=" + self.getId() + "]");            t.setDaemon(true);            t.start();        }
?QuorumPeer线程启动

?

?

run(){.......try {            /*             * Main loop             */            while (running) {                switch (getPeerState()) {//如果状态是LOOKING,则进入选举流程                case LOOKING:                    LOG.info("LOOKING");                    ......                        try {//选举算法开始选举,主线程可能在这里耗比较长时间                            setCurrentVote(makeLEStrategy().lookForLeader());                        } catch (Exception e) {                            LOG.warn("Unexpected exception", e);                            setPeerState(ServerState.LOOKING);                        }                    }                    break;//其他流程处理                case OBSERVING:                    try {                        LOG.info("OBSERVING");                        setObserver(makeObserver(logFactory));                        observer.observeLeader();                    } catch (Exception e) {                        LOG.warn("Unexpected exception",e );                                            } finally {                        observer.shutdown();                        setObserver(null);                        setPeerState(ServerState.LOOKING);                    }                    break;                case FOLLOWING:                    try {                        LOG.info("FOLLOWING");                        setFollower(makeFollower(logFactory));                        follower.followLeader();                    } catch (Exception e) {                        LOG.warn("Unexpected exception",e);                    } finally {                        follower.shutdown();                        setFollower(null);                        setPeerState(ServerState.LOOKING);                    }                    break;                case LEADING:                    LOG.info("LEADING");                    try {                        setLeader(makeLeader(logFactory));                        leader.lead();                        setLeader(null);                    } catch (Exception e) {                        LOG.warn("Unexpected exception",e);                    } finally {                        if (leader != null) {                            leader.shutdown("Forcing shutdown");                            setLeader(null);                        }                        setPeerState(ServerState.LOOKING);                    }                    break;                }.......}
?进入选举流程

?

?

public Vote lookForLeader() throws InterruptedException {......        try {//收到的投票            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();            int notTimeout = finalizeWait;            synchronized(this){                logicalclock++;//先投给自己                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());            }            LOG.info("New election. My id =  " + self.getId() +                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));//发送投票,包括发给自己            sendNotifications();            /*             * Loop in which we exchange notifications until we find a leader             *///主循环,直到选出leader            while ((self.getPeerState() == ServerState.LOOKING) &&                    (!stop)){                /*                 * Remove next notification from queue, times out after 2 times                 * the termination time                 *///从IO线程里拿到投票消息,自己的投票也在这里处理                Notification n = recvqueue.poll(notTimeout,                        TimeUnit.MILLISECONDS);                /*                 * Sends more notifications if haven't received enough.                 * Otherwise processes new notification.                 *///如果空闲                if(n == null){//消息发完了,继续发送,一直到选出leader为止                    if(manager.haveDelivered()){                        sendNotifications();                    } else {//消息还在,可能其他server还没启动,尝试连接                        manager.connectAll();                    }                    /*                     * Exponential backoff                     *///延长超时时间                    int tmpTimeOut = notTimeout*2;                    notTimeout = (tmpTimeOut < maxNotificationInterval?                            tmpTimeOut : maxNotificationInterval);                    LOG.info("Notification time out: " + notTimeout);                }//收到了投票消息                else if(self.getVotingView().containsKey(n.sid)) {                    /*                     * Only proceed if the vote comes from a replica in the                     * voting view.                     */                    switch (n.state) {//LOOKING消息,则                    case LOOKING:......//检查下收到的这张选票是否可以胜出,依次比较选举轮数epoch,事务zxid,服务器编号server id                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,                                proposedLeader, proposedZxid, proposedEpoch)) {//胜出了,就把自己的投票修改为对方的,然后广播消息                            updateProposal(n.leader, n.zxid, n.peerEpoch);                            sendNotifications();                        }                        ......//添加到本机投票集合,用来做选举终结判断                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//选举是否结束,默认算法是超过半数server同意                        if (termPredicate(recvset,                                new Vote(proposedLeader, proposedZxid,                                        logicalclock, proposedEpoch))) {                            ......//修改状态,LEADING or FOLLOWING                                self.setPeerState((proposedLeader == self.getId()) ?                                        ServerState.LEADING: learningState());//返回最终的选票结果                                Vote endVote = new Vote(proposedLeader,                                        proposedZxid, proposedEpoch);                                leaveInstance(endVote);                                return endVote;                            }                        }                        break;//如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经服务的zk集群时//OBSERVING机器不参数选举                    case OBSERVING:                        LOG.debug("Notification from observer: " + n.sid);                        break;//这2种需要参与选举                    case FOLLOWING:                    case LEADING:                        /*                         * Consider all notifications from the same epoch                         * together.                         */                        if(n.electionEpoch == logicalclock){//同样需要加入到本机的投票集合                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//投票是否结束,如果结束,再确认LEADER是否有效//如果结束,修改自己的状态并返回投票结果                            if(termPredicate(recvset, new Vote(n.leader,                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {                                self.setPeerState((n.leader == self.getId()) ?                                        ServerState.LEADING: learningState());                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);                                leaveInstance(endVote);                                return endVote;                            }                        }                        /**                         * Before joining an established ensemble, verify that                         * a majority are following the same leader.                         */                        outofelection.put(n.sid, new Vote(n.leader, n.zxid,                                n.electionEpoch, n.peerEpoch, n.state));                        ......                        break;                    default:
?选举消息发送

?

?

    private void sendNotifications() {//循环发送        for (QuorumServer server : self.getVotingView().values()) {            long sid = server.id;//消息实体            ToSend notmsg = new ToSend(ToSend.mType.notification,                    proposedLeader,                    proposedZxid,                    logicalclock,                    QuorumPeer.ServerState.LOOKING,                    sid,                    proposedEpoch);......//添加到业务的发送队列,该队列会被WorkerSender消费            sendqueue.offer(notmsg);        }    }
?WorkerSender消费

?

?

            public void run() {                while (!stop) {                    try {                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);                        if(m == null) continue;                        process(m);                    } catch (InterruptedException e) {                        break;                    }                }                LOG.info("WorkerSender is down");            }            private void process(ToSend m) {//选票协议是固定的                byte requestBytes[] = new byte[36];                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);                /*                 * Building notification packet to send                 */                requestBuffer.clear();                requestBuffer.putInt(m.state.ordinal());                requestBuffer.putLong(m.leader);                requestBuffer.putLong(m.zxid);                requestBuffer.putLong(m.electionEpoch);                requestBuffer.putLong(m.peerEpoch);//通过QuorumCnxManager这个IO负责类发送消息                manager.toSend(m.sid, requestBuffer);            }
?QuorumCnxManager具体发送

?

?

    public void toSend(Long sid, ByteBuffer b) {        /*         * If sending message to myself, then simply enqueue it (loopback).         *///如果是自己,不走网络,直接添加到本地接受队列        if (self.getId() == sid) {             b.position(0);             addToRecvQueue(new Message(b.duplicate(), sid));            /*             * Otherwise send to the corresponding thread to send.             *///否则,先添加到发送队列,然后尝试连接,连接成功则给每台server启动发送和接受线程        } else {             /*              * Start a new connection if doesn't have one already.              */             if (!queueSendMap.containsKey(sid)) {                 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(                         SEND_CAPACITY);                 queueSendMap.put(sid, bq);                 addToSendQueue(bq, b);             } else {                 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);                 if(bq != null){                     addToSendQueue(bq, b);                 } else {                     LOG.error("No queue for server " + sid);                 }             }             connectOne(sid);                        }    }
?尝试连接过程

?

?

synchronized void connectOne(long sid){        if (senderWorkerMap.get(sid) == null){.......//对方的选举地址,3888端口                electionAddr = self.quorumPeers.get(sid).electionAddr;            .......//同步IO连接                Socket sock = new Socket();                setSockOpts(sock);                sock.connect(self.getView().get(sid).electionAddr, cnxTO);                if (LOG.isDebugEnabled()) {                    LOG.debug("Connected to server " + sid);                }//连上了,初始化IO线程                initiateConnection(sock, sid);            ......    }
?由于这个时候只有server.1启动,当它尝试去连接其他server时,会报错,选举线程会一直重试。此时,server.1只收到了自己的选票。然后我们启动server.2,server.2也会主动去连接server.1,这个时候server.1h和server.2会相互发起连接,但最终只有有一个连接成功,请看下问。

?

这个时候被连接的server的Listener选举线程会收到新连接

Listener主循环,接受连接

?

while (!shutdown) {                        Socket client = ss.accept();                        setSockOpts(client);                        LOG.info("Received connection request "                                + client.getRemoteSocketAddress());                        receiveConnection(client);                        numRetries = 0;                    }
?新连接处理

?

?

 public boolean receiveConnection(Socket sock) {        Long sid = null;                try {            // Read server id//读server id            DataInputStream din = new DataInputStream(sock.getInputStream());            sid = din.readLong();            ......                //If wins the challenge, then close the new connection.//如果对方id比我小,则关闭连接,只允许大id的server连接小id的server        if (sid < self.getId()) {            /*             * This replica might still believe that the connection to sid is             * up, so we have to shut down the workers before trying to open a             * new connection.             */            SendWorker sw = senderWorkerMap.get(sid);            if (sw != null) {                sw.finish();            }            /*             * Now we start a new connection             */            LOG.debug("Create new connection to server: " + sid);            closeSocket(sock);            connectOne(sid);            // Otherwise start worker threads to receive data.        } //如果对方id比我大,允许连接,并初始化单独的IO线程else {            SendWorker sw = new SendWorker(sock, sid);            RecvWorker rw = new RecvWorker(sock, sid, sw);            sw.setRecv(rw);            SendWorker vsw = senderWorkerMap.get(sid);                        if(vsw != null)                vsw.finish();                        senderWorkerMap.put(sid, sw);                        if (!queueSendMap.containsKey(sid)) {                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(                        SEND_CAPACITY));            }                        sw.start();            rw.start();                        return true;            }        return false;    }
?连上后,自己server的IO线程初始化

?

?

public boolean initiateConnection(Socket sock, Long sid) {        DataOutputStream dout = null;        try {            // Sending id and challenge//先发一个server id            dout = new DataOutputStream(sock.getOutputStream());            dout.writeLong(self.getId());            dout.flush();        }         ......        // If lost the challenge, then drop the new connection//如果对方id比自己大,则关闭连接,这样导致的结果就是大id的server才会去连接小id的server,避免连接浪费        if (sid > self.getId()) {            LOG.info("Have smaller server identifier, so dropping the " +                     "connection: (" + sid + ", " + self.getId() + ")");            closeSocket(sock);            // Otherwise proceed with the connection        } //如果对方id比自己小,则保持连接,并初始化单独的发送和接受线程else {            SendWorker sw = new SendWorker(sock, sid);            RecvWorker rw = new RecvWorker(sock, sid, sw);            sw.setRecv(rw);            SendWorker vsw = senderWorkerMap.get(sid);                        if(vsw != null)                vsw.finish();                        senderWorkerMap.put(sid, sw);            if (!queueSendMap.containsKey(sid)) {                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(                        SEND_CAPACITY));            }                        sw.start();            rw.start();                        return true;                        }        return false;    }
?通过以上的连接处理,每2台选举机器之间只会建立一个选举连接。

?

IO发送线程SendWorker启动,开始发送选举消息

?

try {                while (running && !shutdown && sock != null) {                    ByteBuffer b = null;                    try {//每个server一个发送队列                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap                                .get(sid);                        if (bq != null) {//拿消息                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);                        } else {                            LOG.error("No queue of incoming messages for " +                                      "server " + sid);                            break;                        }                        if(b != null){//发消息                            lastMessageSent.put(sid, b);                            send(b);                        }                    } catch (InterruptedException e) {                        LOG.warn("Interrupted while waiting for message on queue",                                e);                    }                }            } catch (Exception e) {                LOG.warn("Exception when using channel: for id " + sid + " my id = " +                         self.getId() + " error = " + e);            }            this.finish();......
?这个时候,其他机器通过IO线程RecvWorker收到消息

?

?

 public void run() {            threadCnt.incrementAndGet();            try {                while (running && !shutdown && sock != null) {                    /**                     * Reads the first int to determine the length of the                     * message                     *///包的长度                    int length = din.readInt();                    if (length <= 0 || length > PACKETMAXSIZE) {                        throw new IOException(                                "Received packet with invalid packet: "                                        + length);                    }                    /**                     * Allocates a new ByteBuffer to receive the message                     *///读到内存                    byte[] msgArray = new byte[length];                    din.readFully(msgArray, 0, length);                    ByteBuffer message = ByteBuffer.wrap(msgArray);//添加到接收队列,后续业务层的接收线程WorkerReceiver会来拿消息                    addToRecvQueue(new Message(message.duplicate(), sid));                }          ......        }
?业务层的接受线程WorkerReceiver拿消息

?

?

 public void run() {                Message response;                while (!stop) {                    // Sleeps on receive                    try{//从IO线程拿数据                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);                        if(response == null) continue;                        /*                         * If it is from an observer, respond right away.                         * Note that the following predicate assumes that                         * if a server is not a follower, then it must be                         * an observer. If we ever have any other type of                         * learner in the future, we'll have to change the                         * way we check for observers.                         *///如果是Observer,则返回当前选举结果                        if(!self.getVotingView().containsKey(response.sid)){                            Vote current = self.getCurrentVote();                            ToSend notmsg = new ToSend(ToSend.mType.notification,                                    current.getId(),                                    current.getZxid(),                                    logicalclock,                                    self.getPeerState(),                                    response.sid,                                    current.getPeerEpoch());                            sendqueue.offer(notmsg);                        }else {                            // Receive new message                            .......                            // State of peer that sent this message//对方节点状态                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;                            switch (response.buffer.getInt()) {                            case 0:                                ackstate = QuorumPeer.ServerState.LOOKING;                                break;                            case 1:                                ackstate = QuorumPeer.ServerState.FOLLOWING;                                break;                            case 2:                                ackstate = QuorumPeer.ServerState.LEADING;                                break;                            case 3:                                ackstate = QuorumPeer.ServerState.OBSERVING;                                break;                            }                            // Instantiate Notification and set its attributes//初始化Notification对象                            Notification n = new Notification();                            n.leader = response.buffer.getLong();                            n.zxid = response.buffer.getLong();                            n.electionEpoch = response.buffer.getLong();                            n.state = ackstate;                            n.sid = response.sid;                            ......                            /*                             * If this server is looking, then send proposed leader                             *///如果自己也在LOOKING,则放入业务接收队列,选举主线程会消费该消息                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){                                recvqueue.offer(n);                                ......                            }     //如果自己不在选举中,而对方server在LOOKING中,则向其发送当前的选举结果,当有server加入一个essemble时有用    else {                                /*                                 * If this server is not looking, but the one that sent the ack                                 * is looking, then send back what it believes to be the leader.                                 */                                Vote current = self.getCurrentVote();                                if(ackstate == QuorumPeer.ServerState.LOOKING){                                    if(LOG.isDebugEnabled()){                                        LOG.debug("Sending new notification. My id =  " +                                                self.getId() + " recipient=" +                                                response.sid + " zxid=0x" +                                                Long.toHexString(current.getZxid()) +                                                " leader=" + current.getId());                                    }                                    ToSend notmsg = new ToSend(                                            ToSend.mType.notification,                                            current.getId(),                                            current.getZxid(),                                            logicalclock,                                            self.getPeerState(),                                            response.sid,                                            current.getPeerEpoch());                                    sendqueue.offer(notmsg);                                }                            }                      .......            }
由于整个集群只有3台机器,所以server.1和server.2启动后,即可选举出Leader。后续Leader和Follower开始数据交互,请看后文。Leader选举小结?1.server启动时默认选举自己,并向整个集群广播

?

2.收到消息时,通过3层判断:选举轮数,zxid,server id大小判断是否同意对方,如果同意,则修改自己的选票,并向集群广播3.QuorumCnxManager负责IO处理,每2个server建立一个连接,只允许id大的server连id小的server,每个server启动单独的读写线程处理,使用阻塞IO4.默认超过半数机器同意时,则选举成功,修改自身状态为LEADING或FOLLOWING5.Obserer机器不参与选举

热点排行