深入浅出Zookeeper之五 Leader选举
tickTime=2000dataDir=/home/admin/zk-dataclientPort=2181#Learner初始化连接到Leader的超时时间initLimit=10#Learner和Leader之间消息发送,响应的超时时间syncLimit=5#集群配置,3台机器,2888为Leader服务端口,3888为选举时所用的端口server.1=master:2888:3888server.2=slave1:2888:3888server.3=slave2:2888:3888
?在server.1的$dataDir下
echo '1'>myid
启动server.1
./zkServer.sh start
?分析之前先看看选举相关的类图
?
入口函数QuorumPeerMain主线程启动
?
public void runFromConfig(QuorumPeerConfig config) throws IOException { ...... LOG.info("Starting quorum peer"); try {//对client提供读写的server,一般是2181端口 ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); //zk的逻辑主线程,负责选举,投票等 quorumPeer = new QuorumPeer(); quorumPeer.setClientPortAddress(config.getClientPortAddress()); quorumPeer.setTxnFactory(new FileTxnSnapLog( new File(config.getDataLogDir()), new File(config.getDataDir())));//集群机器地址 quorumPeer.setQuorumPeers(config.getServers()); quorumPeer.setElectionType(config.getElectionAlg());//本机的集群编号 quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit());//投票决定方式,默认超过半数就通过 quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setLearnerType(config.getPeerType()); //启动主线程 quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }?QuorumPeer复写Thread.start方法,启动
?
?
@Override public synchronized void start() {//恢复DB,从zxid中回复epoch变量,代表投票轮数 loadDataBase();//启动针对client的IO线程 cnxnFactory.start();//选举初始化,主要是从配置获取选举类型 startLeaderElection();//启动 super.start(); }?loadDataBase过程,恢复epoch数
?
?
private void loadDataBase() {try {//从本地文件恢复db zkDb.loadDataBase(); // load the epochs//从最新的zxid恢复epoch变量,zxid64位,前32位是epoch值,后32位是zxid long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version currentEpoch = epochOfZxid; LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch); writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); } if (epochOfZxid > currentEpoch) { throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid); } .......}?选举初始化
?
?
synchronized public void startLeaderElection() { try {//先投自己 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; }//从配置中拿自己的选举地址 for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } ......//根据配置,获取选举算法 this.electionAlg = createElectionAlgorithm(electionType); }?获取选举算法,默认为FastLeaderElection算法
?
?
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3://leader选举IO负责类 qcm = new QuorumCnxManager(this); QuorumCnxManager.Listener listener = qcm.listener; //启动已绑定3888端口的选举线程,等待集群其他机器连接 if(listener != null){ listener.start();//基于TCP的选举算法 le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }?FastLeaderElection初始化
?
?
private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1;//业务层发送队列,业务对象ToSend sendqueue = new LinkedBlockingQueue<ToSend>();//业务层接受队列,业务对象Notificataion recvqueue = new LinkedBlockingQueue<Notification>();// this.messenger = new Messenger(manager); }Messenger(QuorumCnxManager manager) {//启动业务层发送线程,将消息发给IO负责类QuorumCnxManager this.ws = new WorkerSender(manager); Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); t.setDaemon(true); t.start();//启动业务层接受线程,从IO负责类QuorumCnxManager接受消息 this.wr = new WorkerReceiver(manager); t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); }?QuorumPeer线程启动
?
?
run(){.......try { /* * Main loop */ while (running) { switch (getPeerState()) {//如果状态是LOOKING,则进入选举流程 case LOOKING: LOG.info("LOOKING"); ...... try {//选举算法开始选举,主线程可能在这里耗比较长时间 setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break;//其他流程处理 case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); setPeerState(ServerState.LOOKING); } break; case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; }.......}?进入选举流程
?
?
public Vote lookForLeader() throws InterruptedException {...... try {//收到的投票 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ logicalclock++;//先投给自己 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));//发送投票,包括发给自己 sendNotifications(); /* * Loop in which we exchange notifications until we find a leader *///主循环,直到选出leader while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time *///从IO线程里拿到投票消息,自己的投票也在这里处理 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. *///如果空闲 if(n == null){//消息发完了,继续发送,一直到选出leader为止 if(manager.haveDelivered()){ sendNotifications(); } else {//消息还在,可能其他server还没启动,尝试连接 manager.connectAll(); } /* * Exponential backoff *///延长超时时间 int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); }//收到了投票消息 else if(self.getVotingView().containsKey(n.sid)) { /* * Only proceed if the vote comes from a replica in the * voting view. */ switch (n.state) {//LOOKING消息,则 case LOOKING:......//检查下收到的这张选票是否可以胜出,依次比较选举轮数epoch,事务zxid,服务器编号server id } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {//胜出了,就把自己的投票修改为对方的,然后广播消息 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } ......//添加到本机投票集合,用来做选举终结判断 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//选举是否结束,默认算法是超过半数server同意 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { ......//修改状态,LEADING or FOLLOWING self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());//返回最终的选票结果 Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } } break;//如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经服务的zk集群时//OBSERVING机器不参数选举 case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break;//这2种需要参与选举 case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if(n.electionEpoch == logicalclock){//同样需要加入到本机的投票集合 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//投票是否结束,如果结束,再确认LEADER是否有效//如果结束,修改自己的状态并返回投票结果 if(termPredicate(recvset, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, n.electionEpoch)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } } /** * Before joining an established ensemble, verify that * a majority are following the same leader. */ outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); ...... break; default:?选举消息发送
?
?
private void sendNotifications() {//循环发送 for (QuorumServer server : self.getVotingView().values()) { long sid = server.id;//消息实体 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING, sid, proposedEpoch);......//添加到业务的发送队列,该队列会被WorkerSender消费 sendqueue.offer(notmsg); } }?WorkerSender消费
?
?
public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } private void process(ToSend m) {//选票协议是固定的 byte requestBytes[] = new byte[36]; ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); /* * Building notification packet to send */ requestBuffer.clear(); requestBuffer.putInt(m.state.ordinal()); requestBuffer.putLong(m.leader); requestBuffer.putLong(m.zxid); requestBuffer.putLong(m.electionEpoch); requestBuffer.putLong(m.peerEpoch);//通过QuorumCnxManager这个IO负责类发送消息 manager.toSend(m.sid, requestBuffer); }?QuorumCnxManager具体发送
?
?
public void toSend(Long sid, ByteBuffer b) { /* * If sending message to myself, then simply enqueue it (loopback). *///如果是自己,不走网络,直接添加到本地接受队列 if (self.getId() == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); /* * Otherwise send to the corresponding thread to send. *///否则,先添加到发送队列,然后尝试连接,连接成功则给每台server启动发送和接受线程 } else { /* * Start a new connection if doesn't have one already. */ if (!queueSendMap.containsKey(sid)) { ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY); queueSendMap.put(sid, bq); addToSendQueue(bq, b); } else { ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if(bq != null){ addToSendQueue(bq, b); } else { LOG.error("No queue for server " + sid); } } connectOne(sid); } }?尝试连接过程
?
?
synchronized void connectOne(long sid){ if (senderWorkerMap.get(sid) == null){.......//对方的选举地址,3888端口 electionAddr = self.quorumPeers.get(sid).electionAddr; .......//同步IO连接 Socket sock = new Socket(); setSockOpts(sock); sock.connect(self.getView().get(sid).electionAddr, cnxTO); if (LOG.isDebugEnabled()) { LOG.debug("Connected to server " + sid); }//连上了,初始化IO线程 initiateConnection(sock, sid); ...... }?由于这个时候只有server.1启动,当它尝试去连接其他server时,会报错,选举线程会一直重试。此时,server.1只收到了自己的选票。然后我们启动server.2,server.2也会主动去连接server.1,这个时候server.1h和server.2会相互发起连接,但最终只有有一个连接成功,请看下问。
?
这个时候被连接的server的Listener选举线程会收到新连接
Listener主循环,接受连接
?
while (!shutdown) { Socket client = ss.accept(); setSockOpts(client); LOG.info("Received connection request " + client.getRemoteSocketAddress()); receiveConnection(client); numRetries = 0; }?新连接处理
?
?
public boolean receiveConnection(Socket sock) { Long sid = null; try { // Read server id//读server id DataInputStream din = new DataInputStream(sock.getInputStream()); sid = din.readLong(); ...... //If wins the challenge, then close the new connection.//如果对方id比我小,则关闭连接,只允许大id的server连接小id的server if (sid < self.getId()) { /* * This replica might still believe that the connection to sid is * up, so we have to shut down the workers before trying to open a * new connection. */ SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); } /* * Now we start a new connection */ LOG.debug("Create new connection to server: " + sid); closeSocket(sock); connectOne(sid); // Otherwise start worker threads to receive data. } //如果对方id比我大,允许连接,并初始化单独的IO线程else { SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); senderWorkerMap.put(sid, sw); if (!queueSendMap.containsKey(sid)) { queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY)); } sw.start(); rw.start(); return true; } return false; }?连上后,自己server的IO线程初始化
?
?
public boolean initiateConnection(Socket sock, Long sid) { DataOutputStream dout = null; try { // Sending id and challenge//先发一个server id dout = new DataOutputStream(sock.getOutputStream()); dout.writeLong(self.getId()); dout.flush(); } ...... // If lost the challenge, then drop the new connection//如果对方id比自己大,则关闭连接,这样导致的结果就是大id的server才会去连接小id的server,避免连接浪费 if (sid > self.getId()) { LOG.info("Have smaller server identifier, so dropping the " + "connection: (" + sid + ", " + self.getId() + ")"); closeSocket(sock); // Otherwise proceed with the connection } //如果对方id比自己小,则保持连接,并初始化单独的发送和接受线程else { SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); senderWorkerMap.put(sid, sw); if (!queueSendMap.containsKey(sid)) { queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY)); } sw.start(); rw.start(); return true; } return false; }?通过以上的连接处理,每2台选举机器之间只会建立一个选举连接。
?
IO发送线程SendWorker启动,开始发送选举消息
?
try { while (running && !shutdown && sock != null) { ByteBuffer b = null; try {//每个server一个发送队列 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap .get(sid); if (bq != null) {//拿消息 b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS); } else { LOG.error("No queue of incoming messages for " + "server " + sid); break; } if(b != null){//发消息 lastMessageSent.put(sid, b); send(b); } } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for message on queue", e); } } } catch (Exception e) { LOG.warn("Exception when using channel: for id " + sid + " my id = " + self.getId() + " error = " + e); } this.finish();......?这个时候,其他机器通过IO线程RecvWorker收到消息
?
?
public void run() { threadCnt.incrementAndGet(); try { while (running && !shutdown && sock != null) { /** * Reads the first int to determine the length of the * message *///包的长度 int length = din.readInt(); if (length <= 0 || length > PACKETMAXSIZE) { throw new IOException( "Received packet with invalid packet: " + length); } /** * Allocates a new ByteBuffer to receive the message *///读到内存 byte[] msgArray = new byte[length]; din.readFully(msgArray, 0, length); ByteBuffer message = ByteBuffer.wrap(msgArray);//添加到接收队列,后续业务层的接收线程WorkerReceiver会来拿消息 addToRecvQueue(new Message(message.duplicate(), sid)); } ...... }?业务层的接受线程WorkerReceiver拿消息
?
?
public void run() { Message response; while (!stop) { // Sleeps on receive try{//从IO线程拿数据 response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(response == null) continue; /* * If it is from an observer, respond right away. * Note that the following predicate assumes that * if a server is not a follower, then it must be * an observer. If we ever have any other type of * learner in the future, we'll have to change the * way we check for observers. *///如果是Observer,则返回当前选举结果 if(!self.getVotingView().containsKey(response.sid)){ Vote current = self.getCurrentVote(); ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock, self.getPeerState(), response.sid, current.getPeerEpoch()); sendqueue.offer(notmsg); }else { // Receive new message ....... // State of peer that sent this message//对方节点状态 QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING; switch (response.buffer.getInt()) { case 0: ackstate = QuorumPeer.ServerState.LOOKING; break; case 1: ackstate = QuorumPeer.ServerState.FOLLOWING; break; case 2: ackstate = QuorumPeer.ServerState.LEADING; break; case 3: ackstate = QuorumPeer.ServerState.OBSERVING; break; } // Instantiate Notification and set its attributes//初始化Notification对象 Notification n = new Notification(); n.leader = response.buffer.getLong(); n.zxid = response.buffer.getLong(); n.electionEpoch = response.buffer.getLong(); n.state = ackstate; n.sid = response.sid; ...... /* * If this server is looking, then send proposed leader *///如果自己也在LOOKING,则放入业务接收队列,选举主线程会消费该消息 if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ recvqueue.offer(n); ...... } //如果自己不在选举中,而对方server在LOOKING中,则向其发送当前的选举结果,当有server加入一个essemble时有用 else { /* * If this server is not looking, but the one that sent the ack * is looking, then send back what it believes to be the leader. */ Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ if(LOG.isDebugEnabled()){ LOG.debug("Sending new notification. My id = " + self.getId() + " recipient=" + response.sid + " zxid=0x" + Long.toHexString(current.getZxid()) + " leader=" + current.getId()); } ToSend notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock, self.getPeerState(), response.sid, current.getPeerEpoch()); sendqueue.offer(notmsg); } } ....... }由于整个集群只有3台机器,所以server.1和server.2启动后,即可选举出Leader。后续Leader和Follower开始数据交互,请看后文。Leader选举小结?1.server启动时默认选举自己,并向整个集群广播
?
2.收到消息时,通过3层判断:选举轮数,zxid,server id大小判断是否同意对方,如果同意,则修改自己的选票,并向集群广播3.QuorumCnxManager负责IO处理,每2个server建立一个连接,只允许id大的server连id小的server,每个server启动单独的读写线程处理,使用阻塞IO4.默认超过半数机器同意时,则选举成功,修改自身状态为LEADING或FOLLOWING5.Obserer机器不参与选举