FinalRequestProcessor是請求處理鏈中最后的一個處理器。
public class FinalRequestProcessor implements RequestProcessor {
ZooKeeperServer zks;
}
FinalRequestProcessor只實現了RequestProcessor接口,需要實現process Request方法和shutdown方法。
核心屬性為zks,表示Zookeeper服務器,可以通過zks訪問到Zookeeper內存數據庫。
我們看一下核心方法process Request代碼:
synchronized (zks.outstandingChanges) {
// Need to process local session requests
// 當前節點,處理請求,若為事務性請求,則提交到ZooKeeper內存數據庫中。
// 對于processTxn函數而言,其最終會調用DataTree的processTxn
rc = zks.processTxn(request);
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
//只有寫請求才會有消息頭
if (request.getHdr() != null) {
TxnHeader hdr = request.getHdr();
Record txn = request.getTxn();
long zxid = hdr.getZxid();
//當outstandingChanges不為空且其首元素的zxid小于等于請求的zxid時,
// 就會一直從outstandingChanges中取出首元素,并且對outstandingChangesForPath做相應的操作
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = zks.outstandingChanges.remove();
if (cr.zxid < zxid) {
LOG.warn("Zxid outstanding " + cr.zxid
+ " is less than current " + zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
}
// do not add non quorum packets to the queue.
//判斷是否為事務性請求則是通過調用isQuorum函數
//只將quorum包(事務性請求)添加進隊列
//addCommittedProposal函數將請求添加至ZKDatabase的committedLog結構中
if (request.isQuorum()) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
根據請求的創建時間來更新Zookeeper服務器的延遲,updateLatency函數中會記錄最大延遲、最小延遲、總的延遲和延遲次數。
然后更新響應中的狀態,如請求創建到響應該請求總共花費的時間、最后的操作類型等。然后設置響應后返回
case OpCode.ping: {
//更新延遲
zks.serverStats().updateLatency(request.createTime);
lastOp = "PING";
// 更新響應的狀態
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, Time.currentElapsedTime());
// 設置響應
cnxn.sendResponse(new ReplyHeader(-2,
zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
return;
}
其他請求與此類似,
最后會根據其他請求再次更新服務器的延遲,設置響應的狀態等
// 獲取最后處理的zxid
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
// 響應頭
ReplyHeader hdr =
new ReplyHeader(request.cxid, lastZxid, err.intValue());
// 更新服務器延遲
zks.serverStats().updateLatency(request.createTime);
// 更新狀態
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
request.createTime, Time.currentElapsedTime());
最后使用sendResponse函數將響應發送給請求方。
try {
//返回相應
cnxn.sendResponse(hdr, rsp, "response");
if (request.type == OpCode.closeSession) {
//關閉會話
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。