ZK(zookeeper)是微服務(wù)解決方案中擁有服務(wù)注冊發(fā)現(xiàn)最為核心的環(huán)境,是微服務(wù)的基石。作為服務(wù)注冊發(fā)現(xiàn)模塊,并不是只有ZK一種產(chǎn)品,目前得到行業(yè)認可的還有:Eureka、Consul。這里我們只聊ZK,這個工具本身很小zip包就幾兆,安裝非常傻瓜,能夠支持集群部署。

背景
在集群環(huán)境下ZK的leader&follower的概念,已經(jīng)節(jié)點異常ZK面臨的問題以及如何解決。ZK本身是JAVA語言開發(fā),也開源到Github上但官方文檔對內(nèi)部介紹的很少,零散的博客很多,有些寫的很不錯。
ZK節(jié)點狀態(tài)角色
ZK集群單節(jié)點狀態(tài)(每個節(jié)點有且只有一個狀態(tài)),ZK的定位一定需要一個leader節(jié)點處于lading狀態(tài)。
- looking:尋找leader狀態(tài),當前集群沒有l(wèi)eader,進入leader選舉流程。
- following:跟隨者狀態(tài),接受leading節(jié)點同步和指揮。
- leading:領(lǐng)導(dǎo)者狀態(tài)。
- observing:觀察者狀態(tài),表明當前服務(wù)器是observer。
ZAB協(xié)議(原子廣播)
Zookeeper專門設(shè)計了一種名為原子廣播(ZAB)的支持崩潰恢復(fù)的一致性協(xié)議。ZK實現(xiàn)了一種主從模式的系統(tǒng)架構(gòu)來保持集群中各個副本之間的數(shù)據(jù)一致性,所有的寫操作都必須通過Leader完成,Leader寫入本地日志后再復(fù)制到所有的Follower節(jié)點。一旦Leader節(jié)點無法工作,ZAB協(xié)議能夠自動從Follower節(jié)點中重新選出一個合適的替代者,即新的Leader,該過程即為領(lǐng)導(dǎo)選舉。
ZK集群中事務(wù)處理是leader負責,follower會轉(zhuǎn)發(fā)到leader來統(tǒng)一處理。簡單理解就是ZK的寫統(tǒng)一leader來做,讀可以follower處理,這也就是CAP理論中ZK更適合讀多寫少的服務(wù)。
過半選舉算法
ZK投票處理策略
投票信息包含 :所選舉leader的Serverid,Zxid,SelectionEpoch
- Epoch判斷,自身logicEpoch與SelectionEpoch判斷:大于、小于、等于。
- 優(yōu)先檢查ZXID。ZXID比較大的服務(wù)器優(yōu)先作為Leader。
- 如果ZXID相同,那么就比較myid。myid較大的服務(wù)器作為Leader服務(wù)器。
ZK中有三種選舉算法,分別是LeaderElection,FastLeaderElection,AuthLeaderElection,F(xiàn)astLeaderElection和AuthLeaderElection是類似的選舉算法,唯一區(qū)別是后者加入了認證信息, FastLeaderElection比LeaderElection更高效,后續(xù)的版本只保留FastLeaderElection。
理解:
在集群環(huán)境下多個節(jié)點啟動,ZK首先需要在多個節(jié)點中選出一個節(jié)點作為leader并處于Leading狀態(tài),這樣就面臨一個選舉問題,同時選舉規(guī)則是什么樣的。“過半選舉算法”:投票選舉中獲得票數(shù)過半的節(jié)點勝出,即狀態(tài)從looking變?yōu)閘eading,效率更高。
官網(wǎng)資料描述:Clustered (Multi-Server) Setup,如下圖:

以5臺服務(wù)器講解思路:
- 服務(wù)器1啟動,此時只有它一臺服務(wù)器啟動了,它發(fā)出去的Vote沒有任何響應(yīng),所以它的選舉狀態(tài)一直是LOOKING狀態(tài);
- 服務(wù)器2啟動,它與最開始啟動的服務(wù)器1進行通信,互相交換自己的選舉結(jié)果,由于兩者都沒有歷史數(shù)據(jù),所以id值較大的服務(wù)器2勝出,但是由于沒有達到超過半數(shù)以上的服務(wù)器都同意選舉它(這個例子中的半數(shù)以上是3),所以服務(wù)器1,2還是繼續(xù)保持LOOKING狀態(tài).
- 服務(wù)器3啟動,根據(jù)前面的理論,分析有三臺服務(wù)器選舉了它,服務(wù)器3成為服務(wù)器1,2,3中的老大,所以它成為了這次選舉的leader.
- 服務(wù)器4啟動,根據(jù)前面的分析,理論上服務(wù)器4應(yīng)該是服務(wù)器1,2,3,4中最大的,但是由于前面已經(jīng)有半數(shù)以上的服務(wù)器選舉了服務(wù)器3,所以它只能接收當小弟的命了.
- 服務(wù)器5啟動,同4一樣,當小弟.
假設(shè)5臺中掛了2臺(3、4),其中l(wèi)eader也掛掉:
leader和follower間有檢查心跳,需要同步數(shù)據(jù) Leader節(jié)點掛了,整個Zookeeper集群將暫停對外服務(wù),進入新一輪Leader選舉
1)服務(wù)器1、2、5發(fā)現(xiàn)與leader失聯(lián),狀態(tài)轉(zhuǎn)為looking,開始新的投票
2)服務(wù)器1、2、5分別開始投票并廣播投票信息,自身Epoch自增;
3) 服務(wù)器1、2、5分別處理投票,判斷出leader分別廣播
4)根據(jù)投票處理邏輯會選出一臺(2票過半)
5)各自服務(wù)器重新變更為leader、follower狀態(tài)
6)重新提供服務(wù)
源碼解析:
/** * Starts a new round of leader election. Whenever our QuorumPeer * changes its state to LOOKING, this method is invoked, and it * sends notifications to all other peers. */public Vote lookForLeader() throws InterruptedException {try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } self.start_fle = Time.currentElapsedTime();try { Map<Long, Vote> recvset = new HashMap<Long, Vote>(); Map<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = minNotificationInterval; synchronized (this) { logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications(); SyncedLearnerTracker voteSet;/* * Loop in which we exchange notifications until we find a leader */while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {/* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);/* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */if (n == null) {if (manager.haveDelivered()) { sendNotifications(); } else { manager.connectAll(); }/* * Exponential backoff */ int tmpTimeOut = notTimeout * 2; notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) {/* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING:if (getInitLastLoggedZxid() == -1) { LOG.debug("Ignoring notification as our zxid is -1");break; }if (n.zxid == -1) { LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);break; }// If notification > current, replace and send messages outif (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear();if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) {if (LOG.isDebugEnabled()) { LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); }break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); }if (LOG.isDebugEnabled()) { LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); }// don't care about the version if it's in LOOKING state recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));if (voteSet.hasAllQuorums()) {// Verify if there is any change in the proposed leaderwhile ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n);break; } }/* * This predicate is true once we don't read any new * relevant message from the reception queue */if (n == null) { setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote);return endVote; } }break; case OBSERVING: LOG.debug("Notification from observer: {}", n.sid);break; case FOLLOWING: case LEADING:/* * Consider all notifications from the same epoch * together. */if (n.electionEpoch == logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { setPeerState(n.leader, voteSet); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote);return endVote; } }/* * Before joining an established ensemble, verify that * a majority are following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized (this) { logicalclock.set(n.electionEpoch); setPeerState(n.leader, voteSet); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote);return endVote; }break;default: LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)");break; } } else {if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); }if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } }return null; } finally {try {if (self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); }}/** We return true if one of the following three cases hold:* 1- New epoch is higher* 2- New epoch is the same as current epoch, but new zxid is higher* 3- New epoch is the same as current epoch, new zxid is the same* as current zxid, but server id is higher.*/return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
腦裂問題
腦裂問題出現(xiàn)在集群中l(wèi)eader死掉,follower選出了新leader而原leader又復(fù)活了的情況下,因為ZK的過半機制是允許損失一定數(shù)量的機器而扔能正常提供給服務(wù),當leader死亡判斷不一致時就會出現(xiàn)多個leader。
方案:
ZK的過半機制一定程度上也減少了腦裂情況的出現(xiàn),起碼不會出現(xiàn)三個leader同時。ZK中的Epoch機制(時鐘)每次選舉都是遞增+1,當通信時需要判斷epoch是否一致,小于自己的則拋棄,大于自己則重置自己,等于則選舉;
// If notification > current, replace and send messages outif (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear();if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {if (LOG.isDebugEnabled()) { LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); }break;} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications();}
歸納
在日常的ZK運維時需要注意以上場景在極端情況下出現(xiàn)問題,特別是腦裂的出現(xiàn),可以采用:
過半選舉策略下部署原則:
- 服務(wù)器群部署要單數(shù),如:3、5、7、...,單數(shù)是最容易選出leader的配置量。
- ZK允許節(jié)點最大損失數(shù),原則就是“保證過半選舉正常”,多了就是浪費。
詳細的算法邏輯是很復(fù)雜要考慮很多情況,其中有個Epoch的概念(自增長),分為:LogicEpoch和ElectionEpoch,每次投票都有判斷每個投票周期是否一致等等。在思考ZK策略時經(jīng)常遇到這樣的問題(上文中兩塊),梳理了一下思路以便于理解也作為后續(xù)回顧。
作者:owen_jia(開源中國博客)
公眾號:互聯(lián)網(wǎng)技術(shù)到家
頭條號:互聯(lián)網(wǎng)技術(shù)到家