Appearance
分布式系统
Dustin是一个开源的软件开发者,同时也是Mozilla的一名发布工程师。他参与的项目包括在Puppet中配置主机系统,一个基于Flask的Web框架,为防火墙配置做单元测试,还有一个在Twisted Python下开发的持续集成系统框架。你可以通过GitHub或者dustin@mozillar.com联系他。
介绍
在这一章,我们将会一起探索如何实现一个网络协议用于可靠的分布式计算。正确实现一个网络协议并不简单,因此我们会采用一些技巧来尽可能的减少、查找和修复漏洞。要建立一个可靠地软件,同样需要一些特别的开发和调试技巧。
情景思考
这一章的重点在于网络协议的实现,但是首先让我们以简单的银行账户管理服务为例做一个思考。在这个服务中,每一个账户都有一个当前余额,同时每个账户都有自己的账号。用户可以通过"存款"、"转账"、"查询当前余额"等操作来连接账户。"转账"操作同时涉及了两个账户——转出账户和转入账户——并且如果账户余额不足,转账操作必须被驳回。
如果这个服务仅仅在一个服务器上部署,很容易就能够实现:使用一个操作锁来确保"转账"操作不会同时进行,同时对转出账户的进行校验。然而,银行不可能仅仅依赖于一个服务器来储存账户余额这样的关键信息,通常,这些服务都是被分布在多个服务器上的,每一个服务器各自运行着相同代码的实例。用户可以通过任何一个服务器来操作账户。
在一个简单的分布式处理系统的实现中,每个服务器都会保存一份账户余额的副本。它会处理任何收到的操作,并且将账户余额的更新发送给其他的服务器。但是这种方法有一个严重的问题:如果两个服务器同时对一个账户进行操作,哪一个新的账户余额是正确的?即使服务器不共享余额而是共享操作,对一个账户同时进行转账操作也可能造成透支。
从根本上来说,这些错误的发生都是由于服务器使用它们本地状态来响应操作,而不是首先确保本地状态与其他服务器相匹配。比如,想象服务器A接到了从账号101向账号202转账的操作指令,而此时服务器B已经处理了另一个把账号101的钱都转到账号202的请求,却没有通知服务器A。这样,服务器A的本地状态与服务器B不一样,即使会造成账户101透支,服务器A依然允许从账号101进行转账操作。
分布式状态机
为了防止上述情况发生我们采用了一种叫做"分布式状态机(Distributed State Machine)"的工具。它的思路是对每个同样的输入,每个服务器都运行同样的对应的状态机。由于状态机的特性,对于同样的输入每个服务器的输出都是一样的。对于像"转账"、"查询当前余额"等操作,账号和余额也都是状态机的输入。
这个应用的状态机比较简单:
python
def execute_operation(state, operation):
if operation.name == 'deposit':
if not verify_signature(operation.deposit_signature):
return state, False
state.accounts[operation.destination_account] += operation.amount
return state, True
elif operation.name == 'transfer':
if state.accounts[operation.source_account] < operation.amount:
return state, False
state.accounts[operation.source_account] -= operation.amount
state.accounts[operation.destination_account] += operation.amount
return state, True
elif operation.name == 'get-balance':
return state, state.accounts[operation.account]值得注意的是,运行"查询当前余额"操作时虽然并不会改变当前状态,但是我们依然把它当做一个状态变化操作来实现。这确保了返回的余额是分布式系统中的最新信息,并且不是基于一个服务器上的本地状态来进行返回的。
这可能跟你在计算机课程中学习到的典型的状态机不太一样。传统的状态机是一系列有限个状态的集合,每个状态都与一个标记的转移行为相对应,而在本文中,状态机的状态是账户余额的集合,因此存在无穷多个可能的状态。但是,状态机的基本规则同样适用于本文的状态机:对于同样的初始状态,同样的输入总是有同样的输出。
因此,分布式状态机确保了对于同样的操作,每个主机都会有同样的响应。但是,为了确保每个服务器都认同状态机的输入,前文中提到的问题依然存在。这是一个_一致性_(Consensus)问题,为了解决它我们采用了一种派生的Paxos算法。
通过Paxos达成一致性
Leslie Lamport在1990年的一篇富有趣味的论文中描述了Paxos算法,这篇论文最终于1998年以"The Part-Time Parliament"为题发表。Lamport的论文比本文提供了更多的细节,并且非常值得阅读。本章参考文献中描述了在本实现中所采用的算法扩展。
简单Paxos(Simple Paxos)使一组服务器能够就单个值永久地达成一致。多Paxos(Multi-Paxos)在此基础上构建,通过逐个对编号的事实序列达成一致。实现分布式状态机则需要使用Multi-Paxos来对每一个状态机输入达成一致,并按顺序执行它们。
简单Paxos
"简单Paxos",也称为Synod协议,使得对单个不变值达成一致成为可能。Paxos这个名字来源于"The Part-Time Parliament"中虚构的Paxos岛,那里的立法者通过Lamport的Synod协议对立法进行投票。
这个算法是更复杂算法的基础构件。在我们的例子中,达成一致的这个单一值代表着假想银行所处理的第一笔交易。虽然银行每天都在处理交易,但第一笔交易只发生一次并且永远不会改变,这使得简单Paxos可以适用。
协议通过一系列投票(ballot)进行,每次投票由称为提议者(proposer)的单个集群成员主导。每次投票都有唯一的投票编号,由整数和提议者身份组成。提议者的目标是让集群成员中的多数派(这些成员充当接受者/acceptor的角色)接受某个值——但前提是另一个值尚未被决定。
一次投票始于提议者向接受者发送带有投票编号 N 的 Prepare 消息,然后等待多数派回应。
Prepare 消息请求获取投票编号低于 N 的已接受值(如果有的话)。接受者用 Promise 消息回应,其中包含之前已接受的值,并承诺将来不再接受编号低于 N 的投票。如果接受者已经承诺了更大的投票编号,它们会在 Promise 消息中包含该编号,表明提议者已被抢占(preempted)。当被抢占时,该次投票结束,但提议者仍然可以在后续投票中使用更大的编号重新尝试。
在收到多数派的回应后,提议者向所有接受者发送包含投票编号和值的 Accept 消息。如果提议者没有收到任何已有值,则发送它所期望的值。否则,它发送编号最大的承诺值。
除非违反了承诺,接受者将 Accept 消息中的值记录为已接受,并以 Accepted 消息回复。当提议者收到来自多数派的投票编号时,投票完成,值就被决定了。
回到我们的例子:初始时没有其他值被接受,所以接受者回送不包含任何值的 Promise 消息,提议者发送包含如下值的 Accept 消息:
operation(name='deposit', amount=100.00, destination_account='Mike DiBernardo')如果后来的提议者用更小的投票编号和不同的操作(比如转账到账户 'Dustin J. Mitchell')发起投票,接受者会直接拒绝它们。使用更大投票编号的投票中,接受者的 Promise 消息会告知提议者关于Michael的100美元存款,因此提议者在 Accept 消息中发送该值而不是转账操作。新投票会被接受,但选中的值与第一次投票相同。
该协议保证永远不会出现两个不同的值被决定的情况,即使在投票重叠、消息延迟或少数接受者失败的情况下也是如此。
当多个提议者同时发起投票时,通常两个投票都不会被接受。两个提议者重新提议,希望其中一个能胜出,但在特定的时序下,死锁可能无限期地持续下去。
考虑以下序列:
- 提议者A对投票编号1执行
Prepare/Promise。 - 在提议者A的提案被接受之前,提议者B对投票编号2执行
Prepare/Promise。 - 当提议者A最终发送投票编号1的
Accept时,接受者拒绝它,因为它们已经承诺了投票编号2。 - 提议者A在提议者B发送
Accept消息之前,发送了更高投票编号(3)的Prepare。 - 提议者B随后的
Accept被拒绝,过程不断重复。
在不幸的时序下——在长距离连接中更为常见,因为消息响应间隔更长——这种死锁可能持续很多轮。
Multi-Paxos
就单个静态值达成一致本身用途有限。像银行账户服务这样的集群系统需要对不断变化的状态(账户余额)达成一致。Paxos对每个操作达成一致,将其视为状态机转换。
Multi-Paxos代表一连串顺序的简单Paxos实例(称为槽/slot),每个按顺序编号。每个状态转换都被分配一个"槽号",集群成员严格按照数字顺序执行转换。改变集群状态(处理转账)意味着在下一个槽中对操作达成一致。具体来说,这需要在每条消息中添加槽号,并按槽跟踪所有协议状态。
为每个槽运行Paxos(至少需要两轮往返)太慢了。Multi-Paxos通过在所有槽中使用相同的投票编号集来优化,同时对所有槽执行 Prepare/Promise 阶段。
Paxos实现之难
在实际软件中实现Multi-Paxos是出了名的困难,催生了许多论文以戏谑Lamport的"Paxos Made Simple"为标题,如"Paxos Made Practical"等。
首先,前面描述的多提议者问题在繁忙的环境中变得很棘手,因为每个集群成员都试图在每个槽中获得状态机操作的决定。解决方案是选举一个"领导者(leader)"负责为每个槽提交投票。所有其他集群节点将新操作发送给领导者执行。这样,在只有一个领导者的正常操作中,投票冲突就不会发生。
Prepare/Promise 阶段在某种程度上起到了领导者选举的作用:拥有最近被承诺的投票编号的集群成员成为领导者。然后领导者可以直接执行 Accept/Accepted 阶段,而不需要重复第一阶段。如下文所示,领导者选举涉及相当大的复杂性。
虽然简单Paxos保证集群永远不会达成冲突的决定,但无法保证一定能达成任何决定。例如,如果初始的 Prepare 消息丢失而未到达接受者,提议者会一直等待永远不会到来的 Promise 消息。修复这个问题需要精心安排的重传:足够确保最终能取得进展,但又不能太多以至于集群被数据包风暴淹没。
另一个问题涉及决定的传播。简单的 Decision 消息广播可以处理正常情况。但如果消息丢失,节点将永远不知道决定,也无法为后续槽应用状态机转换。实现需要在节点间共享已决定提案信息的机制。
分布式状态机还面临另一个有趣的挑战:启动。新节点必须追赶现有集群状态。虽然通过追赶从初始槽以来的所有决定可以做到,但成熟的集群可能涉及数百万个槽。此外,还必须存在初始化新集群的方法。
理论和算法讲得够多了——让我们来看看代码。
Cluster简介
本章中的 Cluster 库实现了简单形式的Multi-Paxos。它被设计为一个为更大的应用程序提供一致性服务的库。
库的使用者依赖于其正确性,因此以能够展示与规范对应关系的方式组织代码非常重要。复杂的协议会产生复杂的故障,因此需要构建支持复现和调试罕见故障的能力。
本章的实现是概念验证代码:足以展示核心概念的实用性,但没有包含日常生产环境所需的设施。代码的结构允许以后以最小的核心实现变动来添加这些设施。
让我们开始吧。
类型和常量
Cluster的协议定义了十五种不同的消息类型,每种都是Python的 namedtuple:
python
Accepted = namedtuple('Accepted', ['slot', 'ballot_num'])
Accept = namedtuple('Accept', ['slot', 'ballot_num', 'proposal'])
Decision = namedtuple('Decision', ['slot', 'proposal'])
Invoked = namedtuple('Invoked', ['client_id', 'output'])
Invoke = namedtuple('Invoke', ['caller', 'client_id', 'input_value'])
Join = namedtuple('Join', [])
Active = namedtuple('Active', [])
Prepare = namedtuple('Prepare', ['ballot_num'])
Promise = namedtuple('Promise', ['ballot_num', 'accepted_proposals'])
Propose = namedtuple('Propose', ['slot', 'proposal'])
Welcome = namedtuple('Welcome', ['state', 'slot', 'decisions'])
Decided = namedtuple('Decided', ['slot'])
Preempted = namedtuple('Preempted', ['slot', 'preempted_by'])
Adopted = namedtuple('Adopted', ['ballot_num', 'accepted_proposals'])
Accepting = namedtuple('Accepting', ['leader'])使用命名元组来描述消息类型使代码更加简洁,并能防止简单的错误。如果属性不匹配,命名元组的构造函数会抛出异常,使拼写错误变得显而易见。元组在日志消息中格式化良好,而且比字典使用更少的内存。
创建消息的语法非常自然:
python
msg = Accepted(slot=10, ballot_num=30)消息字段的访问也只需要很少的额外输入:
python
got_ballot_num = msg.ballot_num这些消息的含义将在以下章节中介绍。代码还引入了几个常量,主要定义了各种消息的超时时间:
python
JOIN_RETRANSMIT = 0.7
CATCHUP_INTERVAL = 0.6
ACCEPT_RETRANSMIT = 1.0
PREPARE_RETRANSMIT = 1.0
INVOKE_RETRANSMIT = 0.5
LEADER_TIMEOUT = 1.0
NULL_BALLOT = Ballot(-1, -1) # 排在所有真实投票之前
NOOP_PROPOSAL = Proposal(None, None, None) # 用于填充空槽的空操作最后,Cluster使用了两种数据类型,命名与协议描述中的相对应:
python
Proposal = namedtuple('Proposal', ['caller', 'client_id', 'input'])
Ballot = namedtuple('Ballot', ['n', 'leader'])组件模型
人的主动记忆能力有限,无法同时推理整个Cluster实现——太多的复杂性使得细节很容易被忽略。类似地,大型单体代码库也很难测试:测试用例需要操作很多活动部件,而且很脆弱,几乎任何代码变动都会导致失败。
为了鼓励可测试性并保持代码的可读性,需要将Cluster拆分为少量的类,每个类对应于协议中描述的角色。每个类都继承自 Role。
python
class Role(object):
def __init__(self, node):
self.node = node
self.node.register(self)
self.running = True
self.logger = node.logger.getChild(type(self).__name__)
def set_timer(self, seconds, callback):
return self.node.network.set_timer(self.node.address, seconds,
lambda: self.running and callback())
def stop(self):
self.running = False
self.node.unregister(self)集群节点上的角色通过 Node 类粘合在一起,该类代表单个网络节点。角色在执行过程中被添加到节点和从节点中移除。到达的消息被转发给所有活跃的角色,调用以消息类型命名并加上 do_ 前缀的方法。这些 do_ 方法接收消息属性作为关键字参数以便于访问。Node 类提供 send 方法作为便利方法,使用 functools.partial 为 Network 类方法提供参数。
python
class Node(object):
unique_ids = itertools.count()
def __init__(self, network, address):
self.network = network
self.address = address or 'N%d' % self.unique_ids.next()
self.logger = SimTimeLogger(
logging.getLogger(self.address), {'network': self.network})
self.logger.info('starting')
self.roles = []
self.send = functools.partial(self.network.send, self)
def register(self, roles):
self.roles.append(roles)
def unregister(self, roles):
self.roles.remove(roles)
def receive(self, sender, message):
handler_name = 'do_%s' % type(message).__name__
for comp in self.roles[:]:
if not hasattr(comp, handler_name):
continue
comp.logger.debug("received %s from %s", message, sender)
fn = getattr(comp, handler_name)
fn(sender=sender, **message._asdict())应用程序接口
应用程序在每个集群成员上创建并启动 Member 对象,提供特定于应用的状态机和对等节点列表。Member对象在节点加入现有集群时添加引导(bootstrap)角色,或者在创建新集群时添加种子(seed)角色。它们通过 Network.run 在单独的线程中运行协议。
应用程序通过 invoke 方法与集群交互,发起状态转换提案。一旦提案被决定并且状态机运行完成,invoke 返回机器输出。该方法使用简单的同步 Queue 等待协议线程的结果。
python
class Member(object):
def __init__(self, state_machine, network, peers, seed=None,
seed_cls=Seed, bootstrap_cls=Bootstrap):
self.network = network
self.node = network.new_node()
if seed is not None:
self.startup_role = seed_cls(self.node, initial_state=seed, peers=peers,
execute_fn=state_machine)
else:
self.startup_role = bootstrap_cls(self.node,
execute_fn=state_machine, peers=peers)
self.requester = None
def start(self):
self.startup_role.start()
self.thread = threading.Thread(target=self.network.run)
self.thread.start()
def invoke(self, input_value, request_cls=Requester):
assert self.requester is None
q = Queue.Queue()
self.requester = request_cls(self.node, input_value, q.put)
self.requester.start()
output = q.get()
self.requester = None
return output角色类
让我们逐一查看库中的每个角色类。
接受者(Acceptor)
Acceptor 实现了接受者的协议角色,存储代表最近承诺的投票编号,以及每个槽已接受的提案集合。它按照协议响应 Prepare 和 Accept 消息。最终的类很短,很容易与协议进行比对。
对于接受者来说,Multi-Paxos看起来与简单Paxos类似,只是在消息中添加了槽号。
python
class Acceptor(Role):
def __init__(self, node):
super(Acceptor, self).__init__(node)
self.ballot_num = NULL_BALLOT
self.accepted_proposals = {} # {slot: (ballot_num, proposal)}
def do_Prepare(self, sender, ballot_num):
if ballot_num > self.ballot_num:
self.ballot_num = ballot_num
# 我们收到了scout的消息,所以它可能是下一个领导者
self.node.send([self.node.address], Accepting(leader=sender))
self.node.send([sender], Promise(
ballot_num=self.ballot_num,
accepted_proposals=self.accepted_proposals
))
def do_Accept(self, sender, ballot_num, slot, proposal):
if ballot_num >= self.ballot_num:
self.ballot_num = ballot_num
acc = self.accepted_proposals
if slot not in acc or acc[slot][0] < ballot_num:
acc[slot] = (ballot_num, proposal)
self.node.send([sender], Accepted(
slot=slot, ballot_num=self.ballot_num))副本(Replica)
Replica 类是最复杂的角色类,承担着几个紧密相关的职责:
- 提出新提案;
- 当提案被决定时调用本地状态机;
- 跟踪当前的领导者;以及
- 将新启动的节点添加到集群中。
副本响应来自客户端的 Invoke 消息来创建新提案,选择认为未被使用的槽,并向当前领导者发送 Propose 消息。此外,当所选槽的共识结果不同时,副本必须用新的槽重新提出提案。
Decision 消息代表集群已达成共识的槽。副本存储新的决定,然后运行状态机直到遇到未决定的槽。副本区分_已决定_(decided)的槽——集群已达成一致,和_已提交_(committed)的槽——本地状态机已处理完毕。当槽乱序决定时,已提交的提案会滞后,等待下一个槽的决定。当槽被提交时,副本向请求者发送包含操作结果的 Invoked 消息。
有时候某些槽没有活跃的提案也没有决定。状态机逐个执行槽,所以集群必须对填充槽的内容达成共识。为了防止这种情况,副本在追赶槽时会提出"空操作(no-op)"提案。如果这样的提案最终被决定,状态机对该槽不做任何操作。
同样,相同的提案可能被决定两次。副本会跳过对任何重复提案的状态机调用,对这些槽不执行转换。
副本需要知道哪些节点是活跃的领导者,以便发送 Propose 消息。正确实现这一点涉及令人惊讶的微妙之处,如后文所示。每个副本通过三个信息来源跟踪活跃的领导者。
当领导者角色变为活跃状态时,它们向同一节点上的副本发送 Adopted 消息。
当接受者角色向新领导者发送 Promise 消息时,它们向本地副本发送 Accepting 消息。
活跃的领导者发送 Active 消息作为心跳。如果在 LEADER_TIMEOUT 到期之前没有收到这样的消息,副本会认为领导者已失效并切换到下一个领导者。重要的是,所有副本必须选择_相同的_新领导者,这通过对成员排序并选择列表中的下一个来实现。
最后,当节点加入网络时,引导角色发送 Join 消息。副本用包含最新状态的 Welcome 消息回应,使新节点能够快速追赶。
python
class Replica(Role):
def __init__(self, node, execute_fn, state, slot, decisions, peers):
super(Replica, self).__init__(node)
self.execute_fn = execute_fn
self.state = state
self.slot = slot
self.decisions = decisions
self.peers = peers
self.proposals = {}
# 下一个提案的槽号(可能领先于slot)
self.next_slot = slot
self.latest_leader = None
self.latest_leader_timeout = None
# 提出提案
def do_Invoke(self, sender, caller, client_id, input_value):
proposal = Proposal(caller, client_id, input_value)
slot = next((s for s, p in self.proposals.iteritems() if p == proposal), None)
# 提出提案,或者如果这个提案已经有槽了则重新提出
self.propose(proposal, slot)
def propose(self, proposal, slot=None):
"""向领导者发送(或重新发送,如果指定了槽)提案"""
if not slot:
slot, self.next_slot = self.next_slot, self.next_slot + 1
self.proposals[slot] = proposal
# 找到一个我们认为正在工作的领导者——要么是我们知道的最新领导者,
# 要么是我们自己(这可能触发一个scout来使我们成为领导者)
leader = self.latest_leader or self.node.address
self.logger.info(
"proposing %s at slot %d to leader %s" % (proposal, slot, leader))
self.node.send([leader], Propose(slot=slot, proposal=proposal))
# 处理已决定的提案
def do_Decision(self, sender, slot, proposal):
assert not self.decisions.get(self.slot, None), \
"next slot to commit is already decided"
if slot in self.decisions:
assert self.decisions[slot] == proposal, \
"slot %d already decided with %r!" % (slot, self.decisions[slot])
return
self.decisions[slot] = proposal
self.next_slot = max(self.next_slot, slot + 1)
# 如果我们的提案失去了它的槽并且不是空操作,则在新槽中重新提出
our_proposal = self.proposals.get(slot)
if (our_proposal is not None and
our_proposal != proposal and our_proposal.caller):
self.propose(our_proposal)
# 执行所有待处理的已决定提案
while True:
commit_proposal = self.decisions.get(self.slot)
if not commit_proposal:
break # 尚未决定
commit_slot, self.slot = self.slot, self.slot + 1
self.commit(commit_slot, commit_proposal)
def commit(self, slot, proposal):
"""实际提交一个已决定且按顺序的提案"""
decided_proposals = [p for s, p in self.decisions.iteritems() if s < slot]
if proposal in decided_proposals:
self.logger.info(
"not committing duplicate proposal %r, slot %d", proposal, slot)
return # 重复提案
self.logger.info("committing %r at slot %d" % (proposal, slot))
if proposal.caller is not None:
# 执行客户端操作
self.state, output = self.execute_fn(self.state, proposal.input)
self.node.send([proposal.caller],
Invoked(client_id=proposal.client_id, output=output))
# 跟踪领导者
def do_Adopted(self, sender, ballot_num, accepted_proposals):
self.latest_leader = self.node.address
self.leader_alive()
def do_Accepting(self, sender, leader):
self.latest_leader = leader
self.leader_alive()
def do_Active(self, sender):
if sender != self.latest_leader:
return
self.leader_alive()
def leader_alive(self):
if self.latest_leader_timeout:
self.latest_leader_timeout.cancel()
def reset_leader():
idx = self.peers.index(self.latest_leader)
self.latest_leader = self.peers[(idx + 1) % len(self.peers)]
self.logger.debug("leader timed out; tring the next one, %s",
self.latest_leader)
self.latest_leader_timeout = self.set_timer(LEADER_TIMEOUT, reset_leader)
# 添加新的集群成员
def do_Join(self, sender):
if sender in self.peers:
self.node.send([sender], Welcome(
state=self.state, slot=self.slot, decisions=self.decisions))领导者(Leader)、侦察兵(Scout)和指挥官(Commander)
领导者的主要任务是接受请求新投票的 Propose 消息并产生决定。领导者在成功执行了协议的 Prepare/Promise 部分后变为"活跃"状态。活跃的领导者会立即响应 Propose 消息发送 Accept 消息。
遵循每个角色一个类的模型,领导者将协议的各个部分委托给侦察兵(scout)和指挥官(commander)角色来执行。
python
class Leader(Role):
def __init__(self, node, peers, commander_cls=Commander, scout_cls=Scout):
super(Leader, self).__init__(node)
self.ballot_num = Ballot(0, node.address)
self.active = False
self.proposals = {}
self.commander_cls = commander_cls
self.scout_cls = scout_cls
self.scouting = False
self.peers = peers
def start(self):
# 在LEADER_TIMEOUT到期之前提醒其他节点我们是活跃的
def active():
if self.active:
self.node.send(self.peers, Active())
self.set_timer(LEADER_TIMEOUT / 2.0, active)
active()
def spawn_scout(self):
assert not self.scouting
self.scouting = True
self.scout_cls(self.node, self.ballot_num, self.peers).start()
def do_Adopted(self, sender, ballot_num, accepted_proposals):
self.scouting = False
self.proposals.update(accepted_proposals)
# 注意我们在这里不重新生成commander;如果有未决定的
# 提案,副本会重新提出
self.logger.info("leader becoming active")
self.active = True
def spawn_commander(self, ballot_num, slot):
proposal = self.proposals[slot]
self.commander_cls(self.node, ballot_num, slot, proposal, self.peers).start()
def do_Preempted(self, sender, slot, preempted_by):
if not slot: # 来自scout
self.scouting = False
self.logger.info("leader preempted by %s", preempted_by.leader)
self.active = False
self.ballot_num = Ballot((preempted_by or self.ballot_num).n + 1,
self.ballot_num.leader)
def do_Propose(self, sender, slot, proposal):
if slot not in self.proposals:
if self.active:
self.proposals[slot] = proposal
self.logger.info("spawning commander for slot %d" % (slot,))
self.spawn_commander(self.ballot_num, slot)
else:
if not self.scouting:
self.logger.info("got PROPOSE when not active - scouting")
self.spawn_scout()
else:
self.logger.info("got PROPOSE while scouting; ignored")
else:
self.logger.info("got PROPOSE for a slot already being proposed")领导者在想要变为活跃状态时创建侦察兵角色,这是对在非活跃状态下收到 Propose 消息的响应。侦察兵发送(并在必要时重新发送)Prepare 消息,收集 Promise 回应,直到收到来自大多数对等节点的回应或被抢占。它们分别通过 Adopted 或 Preempted 向领导者通报结果。
python
class Scout(Role):
def __init__(self, node, ballot_num, peers):
super(Scout, self).__init__(node)
self.ballot_num = ballot_num
self.accepted_proposals = {}
self.acceptors = set([])
self.peers = peers
self.quorum = len(peers) / 2 + 1
self.retransmit_timer = None
def start(self):
self.logger.info("scout starting")
self.send_prepare()
def send_prepare(self):
self.node.send(self.peers, Prepare(ballot_num=self.ballot_num))
self.retransmit_timer = self.set_timer(PREPARE_RETRANSMIT, self.send_prepare)
def update_accepted(self, accepted_proposals):
acc = self.accepted_proposals
for slot, (ballot_num, proposal) in accepted_proposals.iteritems():
if slot not in acc or acc[slot][0] < ballot_num:
acc[slot] = (ballot_num, proposal)
def do_Promise(self, sender, ballot_num, accepted_proposals):
if ballot_num == self.ballot_num:
self.logger.info("got matching promise; need %d" % self.quorum)
self.update_accepted(accepted_proposals)
self.acceptors.add(sender)
if len(self.acceptors) >= self.quorum:
# 从self.accepted_proposals中去掉投票编号,因为它
# 现在代表了多数派的结果
accepted_proposals = \
dict((s, p) for s, (b, p) in self.accepted_proposals.iteritems())
# 我们被采纳了;注意这并*不*意味着没有其他
# 领导者是活跃的。任何这样的冲突将由
# commander来处理。
self.node.send([self.node.address],
Adopted(ballot_num=ballot_num,
accepted_proposals=accepted_proposals))
self.stop()
else:
# 这个接受者已经向另一个领导者承诺了更高的投票编号,
# 所以我们失败了
self.node.send([self.node.address],
Preempted(slot=None, preempted_by=ballot_num))
self.stop()领导者为每个有活跃提案的槽创建指挥官角色。与侦察兵类似,指挥官发送和重新发送 Accept 消息,等待多数接受者的 Accepted 回复,或者被抢占的消息。当提案被接受时,指挥官向所有节点广播 Decision 消息。它们通过 Decided 或 Preempted 向领导者报告结果。
python
class Commander(Role):
def __init__(self, node, ballot_num, slot, proposal, peers):
super(Commander, self).__init__(node)
self.ballot_num = ballot_num
self.slot = slot
self.proposal = proposal
self.acceptors = set([])
self.peers = peers
self.quorum = len(peers) / 2 + 1
def start(self):
self.node.send(set(self.peers) - self.acceptors, Accept(
slot=self.slot, ballot_num=self.ballot_num, proposal=self.proposal))
self.set_timer(ACCEPT_RETRANSMIT, self.start)
def finished(self, ballot_num, preempted):
if preempted:
self.node.send([self.node.address],
Preempted(slot=self.slot, preempted_by=ballot_num))
else:
self.node.send([self.node.address],
Decided(slot=self.slot))
self.stop()
def do_Accepted(self, sender, slot, ballot_num):
if slot != self.slot:
return
if ballot_num == self.ballot_num:
self.acceptors.add(sender)
if len(self.acceptors) < self.quorum:
return
self.node.send(self.peers, Decision(
slot=self.slot, proposal=self.proposal))
self.finished(ballot_num, False)
else:
self.finished(ballot_num, True)在开发过程中,这里出现了一些令人惊讶的微妙bug。网络模拟器在节点内部消息上也引入了丢包。当_所有_ Decision 消息都丢失时,协议无法继续。副本继续重传 Propose 消息,但领导者因为已有该槽的提案而忽略它们。副本的追赶过程也找不到结果,因为没有副本收到过决定。解决方案是确保本地消息始终被送达,这与真实网络栈的行为一致。
引导(Bootstrap)
当节点加入集群时,它们必须在参与之前确定当前的集群状态。引导角色通过逐个向每个对等节点发送 Join 消息来处理这个问题,直到收到 Welcome 消息。
早期的实现版本让节点以完整的角色集(副本、领导者和接受者)启动,每个角色都从"启动"阶段开始,等待 Welcome 消息中的信息。这将初始化逻辑分散到了每个角色中,需要分别测试。最终的设计使用引导角色,在启动完成后将其他角色添加到节点,并将初始状态传递给构造函数。
python
class Bootstrap(Role):
def __init__(self, node, peers, execute_fn,
replica_cls=Replica, acceptor_cls=Acceptor, leader_cls=Leader,
commander_cls=Commander, scout_cls=Scout):
super(Bootstrap, self).__init__(node)
self.execute_fn = execute_fn
self.peers = peers
self.peers_cycle = itertools.cycle(peers)
self.replica_cls = replica_cls
self.acceptor_cls = acceptor_cls
self.leader_cls = leader_cls
self.commander_cls = commander_cls
self.scout_cls = scout_cls
def start(self):
self.join()
def join(self):
self.node.send([next(self.peers_cycle)], Join())
self.set_timer(JOIN_RETRANSMIT, self.join)
def do_Welcome(self, sender, state, slot, decisions):
self.acceptor_cls(self.node)
self.replica_cls(self.node, execute_fn=self.execute_fn, peers=self.peers,
state=state, slot=slot, decisions=decisions)
self.leader_cls(self.node, peers=self.peers, commander_cls=self.commander_cls,
scout_cls=self.scout_cls).start()
self.stop()种子(Seed)
在正常操作中,当节点加入集群时,它们期望集群已经在运行,并且至少有一个节点愿意响应 Join 消息。但是集群最初是如何启动的呢?一种选择是:引导角色在联系完每一个其他节点后,确定自己是集群中的第一个。但这有两个问题。首先,大型集群意味着在每个 Join 超时时需要漫长等待。更重要的是,网络分区可能阻止新节点联系任何其他节点并启动新集群。
网络分区是集群应用面临的最具挑战性的故障情况。在网络分区中,所有集群成员都保持活跃,但某些成员之间的通信失败。例如,连接柏林和台北集群的网络链路故障会造成网络分区。如果分区期间集群的两部分都继续运行,那么在链路恢复后重新合并将变得很困难。在Multi-Paxos的情况下,恢复后的网络上会存在两个集群,它们对相同的槽号有不同的决定。
避免这种结果要求将创建新集群作为用户指定的操作。恰好有一个集群节点运行种子角色,其他节点正常运行引导角色。种子等待收到来自大多数对等节点的 Join 消息,然后发送包含初始状态机状态和空决定集的 Welcome 消息。然后种子角色停止自身并启动引导角色,加入新播种的集群。
种子模拟了引导/副本交互中的 Join/Welcome 部分,因此通信图与副本角色匹配。
python
class Seed(Role):
def __init__(self, node, initial_state, execute_fn, peers,
bootstrap_cls=Bootstrap):
super(Seed, self).__init__(node)
self.initial_state = initial_state
self.execute_fn = execute_fn
self.peers = peers
self.bootstrap_cls = bootstrap_cls
self.seen_peers = set([])
self.exit_timer = None
def do_Join(self, sender):
self.seen_peers.add(sender)
if len(self.seen_peers) <= len(self.peers) / 2:
return
# 集群已准备好——欢迎所有人
self.node.send(list(self.seen_peers), Welcome(
state=self.initial_state, slot=1, decisions={}))
# 停留足够长的时间,这样我们就不会从
# 新组建的集群中收到任何新的JOIN
if self.exit_timer:
self.exit_timer.cancel()
self.exit_timer = self.set_timer(JOIN_RETRANSMIT * 2, self.finish)
def finish(self):
# 将此节点引导到我们刚刚播种的集群中
bs = self.bootstrap_cls(self.node,
peers=self.peers, execute_fn=self.execute_fn)
bs.start()
self.stop()请求者(Requester)
请求者角色管理分布式状态机请求。该角色类简单地向本地副本发送 Invoke 消息,直到收到相应的 Invoked 消息。通信图请参见上面的"副本"章节。
python
class Requester(Role):
client_ids = itertools.count(start=100000)
def __init__(self, node, n, callback):
super(Requester, self).__init__(node)
self.client_id = self.client_ids.next()
self.n = n
self.output = None
self.callback = callback
def start(self):
self.node.send([self.node.address],
Invoke(caller=self.node.address,
client_id=self.client_id, input_value=self.n))
self.invoke_timer = self.set_timer(INVOKE_RETRANSMIT, self.start)
def do_Invoked(self, sender, client_id, output):
if client_id != self.client_id:
return
self.logger.debug("received output %r" % (output,))
self.invoke_timer.cancel()
self.callback(output)
self.stop()总结
回顾一下,集群中的角色有:
- 接受者(Acceptor)—— 做出承诺并接受提案
- 副本(Replica)—— 管理分布式状态机:提交提案、提交决定、响应请求者
- 领导者(Leader)—— 领导Multi-Paxos算法的各轮
- 侦察兵(Scout)—— 为领导者执行Multi-Paxos算法的
Prepare/Promise部分 - 指挥官(Commander)—— 为领导者执行Multi-Paxos算法的
Accept/Accepted部分 - 引导(Bootstrap)—— 将新节点引入现有集群
- 种子(Seed)—— 创建新集群
- 请求者(Requester)—— 请求分布式状态机操作
只需要再加一个组件就能让Cluster工作:所有节点通过其通信的网络。
网络
任何网络协议都需要消息的发送和接收能力以及在未来某个时间调用函数的能力。
Network 类提供了一个具有这些功能的简单模拟网络,并模拟了丢包和消息传播延迟。
定时器使用Python的 heapq 模块,允许高效地选择下一个事件。设置定时器涉及将 Timer 对象推入堆中。由于从堆中移除项目效率低下,被取消的定时器保留在原位但标记为已取消。
消息传输使用定时器功能,在每个节点上安排稍后的消息送达,使用随机的模拟延迟。同样,functools.partial 用于设置对目标节点 receive 方法的未来调用,并传入适当的参数。
运行模拟涉及从堆中弹出定时器,如果未被取消且目标节点仍然活跃,则执行它们。
python
class Timer(object):
def __init__(self, expires, address, callback):
self.expires = expires
self.address = address
self.callback = callback
self.cancelled = False
def __cmp__(self, other):
return cmp(self.expires, other.expires)
def cancel(self):
self.cancelled = True
class Network(object):
PROP_DELAY = 0.03
PROP_JITTER = 0.02
DROP_PROB = 0.05
def __init__(self, seed):
self.nodes = {}
self.rnd = random.Random(seed)
self.timers = []
self.now = 1000.0
def new_node(self, address=None):
node = Node(self, address=address)
self.nodes[node.address] = node
return node
def run(self):
while self.timers:
next_timer = self.timers[0]
if next_timer.expires > self.now:
self.now = next_timer.expires
heapq.heappop(self.timers)
if next_timer.cancelled:
continue
if not next_timer.address or next_timer.address in self.nodes:
next_timer.callback()
def stop(self):
self.timers = []
def set_timer(self, address, seconds, callback):
timer = Timer(self.now + seconds, address, callback)
heapq.heappush(self.timers, timer)
return timer
def send(self, sender, destinations, message):
sender.logger.debug("sending %s to %s", message, destinations)
# 通过为每个目标创建包含消息的不同深拷贝的闭包来避免别名问题
def sendto(dest, message):
if dest == sender.address:
# 可靠地送达本地消息,无延迟
self.set_timer(sender.address, 0,
lambda: sender.receive(sender.address, message))
elif self.rnd.uniform(0, 1.0) > self.DROP_PROB:
delay = self.PROP_DELAY + self.rnd.uniform(-self.PROP_JITTER,
self.PROP_JITTER)
self.set_timer(dest, delay,
functools.partial(self.nodes[dest].receive,
sender.address, message))
for dest in (d for d in destinations if d in self.nodes):
sendto(dest, copy.deepcopy(message))虽然没有包含在本实现中,组件模型允许换入真实的网络实现,在实际服务器上通过实际网络进行通信,而无需更改其他组件。测试和调试使用模拟网络,而库的生产使用则通过实际网络硬件运行。
调试支持
在开发像这样的复杂系统时,bug很快就会从简单的 NameError 之类的错误过渡到只在协议运行数分钟后才显现的晦涩故障。追踪这类bug需要从错误明显的地方倒推。交互式调试器没什么用,因为它们只能向前单步执行。
Cluster最重要的调试特性涉及_确定性的_模拟器。与真实网络不同,给定相同的随机数生成器种子,模拟器在每次运行时表现完全相同。这允许在代码中添加额外的调试检查或输出,然后重新运行模拟以更详细地看到相同的故障。
大量细节存在于集群节点的消息交换中,因此这些信息被完整地自动记录。日志包括发送或接收消息的角色类,以及通过 SimTimeLogger 类注入的模拟时间戳。
python
class SimTimeLogger(logging.LoggerAdapter):
def process(self, msg, kwargs):
return "T=%.3f %s" % (self.extra['network'].now, msg), kwargs
def getChild(self, name):
return self.__class__(self.logger.getChild(name),
{'network': self.extra['network']})弹性协议经常在bug触发之后还能运行很长时间。例如,在开发过程中,数据别名错误导致所有副本共享相同的 decisions 字典。这意味着一旦一个节点处理了决定,所有其他节点都会认为它们已经被决定了。尽管是严重的bug,集群在死锁之前仍然对几笔交易产生了正确的结果。
断言成为提前捕获此类错误的重要工具。断言应该包含算法设计的不变量,但当代码的行为不符合预期时,对预期进行断言可以显示问题出在哪里。
python
assert not self.decisions.get(self.slot, None), \
"next slot to commit is already decided"
if slot in self.decisions:
assert self.decisions[slot] == proposal, \
"slot %d already decided with %r!" % (slot, self.decisions[slot])识别正确的代码阅读假设是调试的艺术所在。在这段 Replica.do_Decision 代码中,问题在于下一个待提交槽的 Decision 因为已经在 self.decisions 中而被忽略。被违反的底层假设是下一个待提交的槽不应该已经被决定。在 do_Decision 开始时对此进行断言识别了缺陷并迅速导向了修复。类似地,其他bug导致相同的槽被决定为不同的提案——这是严重的错误。
在协议开发过程中出现了许多其他断言,但篇幅有限,只保留了其中一些。
测试
在过去的十年里,不写测试的编码已经变得和不系安全带开车一样疯狂。没有测试的代码可能是不正确的,在不改变行为的前提下修改代码也是有风险的。
当代码为可测试性而组织时,测试最为有效。这里存在几种不同的思路,但本文采用的方法是将代码分成小的、最小耦合的单元,可以独立测试。这与角色模型很好地吻合,每个角色都有特定的目的并且独立于其他角色运行,形成了紧凑、自包含的类。
Cluster的编写最大化了这种隔离:所有角色间的通信都通过消息进行,除了创建新角色。因此,可以通过发送消息并观察回应来测试角色。
单元测试
Cluster的单元测试简单且短小:
python
class Tests(utils.ComponentTestCase):
def test_propose_active(self):
"""A PROPOSE received while active spawns a commander."""
self.activate_leader()
self.node.fake_message(Propose(slot=10, proposal=PROPOSAL1))
self.assertCommanderStarted(Ballot(0, 'F999'), 10, PROPOSAL1)方法测试单个行为(commander的生成)和单个单元(Leader 类)。它们遵循著名的"安排、执行、断言"(arrange, act, assert)模式:设置一个活跃的领导者,发送消息,然后检查结果。
依赖注入
"依赖注入"技术处理新角色的创建。每个添加其他角色的角色类都将类对象列表作为构造函数参数,默认为实际的类。例如,Leader 的构造函数如下:
python
class Leader(Role):
def __init__(self, node, peers, commander_cls=Commander, scout_cls=Scout):
super(Leader, self).__init__(node)
self.ballot_num = Ballot(0, node.address)
self.active = False
self.proposals = {}
self.commander_cls = commander_cls
self.scout_cls = scout_cls
self.scouting = False
self.peers = peersspawn_scout 方法(以及类似的 spawn_commander)使用 self.scout_cls 创建新的角色对象:
python
class Leader(Role):
def spawn_scout(self):
assert not self.scouting
self.scouting = True
self.scout_cls(self.node, self.ballot_num, self.peers).start()这种技术的巧妙之处在于测试时可以用假的类来测试 Leader,从而与 Scout 和 Commander 分开测试。
接口正确性
单元测试的一个陷阱是不测试单元间的接口关系。例如,接受者角色的单元测试验证 Promise 消息中 accepted 属性的格式,而侦察兵角色的单元测试提供格式良好的值。但两个测试都不检查格式是否匹配。
解决方法是使接口自我约束。在Cluster中,使用命名元组和关键字参数避免了消息属性的不一致。因为角色类之间的所有交互都通过消息进行,这覆盖了接口的大部分。
对于像 accepted_proposals 格式这样的问题,真实数据和测试数据都使用相同的函数进行验证,例如 verifyPromiseAccepted。接受者测试验证每个返回的 Promise 使用该方法,侦察兵测试验证每个伪造的 Promise 也使用该方法。
集成测试
防止接口问题和设计错误的最后堡垒是集成测试。集成测试将多个单元组装在一起并测试组合效果。在本例中,这意味着构建多节点网络、注入一些请求并验证结果。如果单元测试遗漏了接口问题,集成测试应该能快速失败。
因为协议能够优雅地处理节点故障,测试也覆盖了一些故障场景,包括活跃领导者的不合时宜的故障。
集成测试比单元测试更困难,因为它们不太隔离。对于Cluster来说,这意味着测试失败的领导者,因为任何节点都可能是活跃的。即使使用确定性网络,消息的变化也会改变随机数生成器的状态,不可预测地改变后续事件。测试代码不是硬编码预期的领导者,而是深入内部领导者状态,找到认为自己是活跃的那些节点。
模糊测试
测试弹性代码很困难:它可能对自己的bug也很有弹性,所以集成测试可能检测不到严重的bug。同时也很难想象并构造每种可能的故障模式进行测试。
常见的方法是"模糊测试"(fuzz testing):以随机变化的输入反复运行代码,直到出现问题。当确实出现问题时,所有的调试支持变得至关重要:如果故障无法复现,日志信息又不足以定位bug,那就无法修复它!
Cluster在开发过程中进行了手动的模糊测试,但完整的模糊测试基础设施超出了本项目的范围。
权力争夺
拥有多个活跃领导者的集群会变得非常嘈杂,侦察兵向接受者发送不断增大的投票编号,而没有投票能够达成决定。没有活跃领导者的集群则很安静,但同样无法运行。平衡实现使得集群几乎总是恰好有一个领导者被证明是相当困难的。
避免领导者争斗很容易:被抢占的领导者接受新的非活跃状态。然而,这很容易导致没有活跃领导者的情况,所以非活跃的领导者在每次收到 Propose 消息时都尝试变为活跃。
如果整个集群对谁是活跃成员没有达成一致,就会出现麻烦:不同的副本向不同的领导者发送 Propose 消息,导致侦察兵之间的争斗。因此领导者选举决定必须快速做出,所有集群成员必须尽快得知结果。
Cluster通过尽快检测领导者变化来处理这个问题:当接受者发送 Promise 消息时,被承诺的成员很可能成为下一个领导者。故障通过心跳协议来检测。
进一步扩展
有很多方式可以扩展和改进本实现。
追赶
在"纯粹的"Multi-Paxos中,未能收到消息的节点会比集群其他成员落后很多槽。只要分布式状态机的状态除了通过状态机转换之外从不被访问,设计就保持功能正常。读取状态需要客户端请求实际上不改变状态但返回所需值的状态机转换。转换在集群范围内执行,确保返回值与提案槽处的状态匹配。
即使在最优情况下,这也很慢,仅仅读取值就需要几次往返。分布式对象存储如果每次访问都要发出这样的请求,性能会非常糟糕。落后的节点承受更大的请求延迟,因为它们必须在成功提案之前先追赶上来。
简单的解决方案是实现类似gossip的协议,副本定期联系其他副本,共享已知的最高槽号并请求未知槽的信息。这样即使 Decision 消息丢失,副本也能快速从对等节点那里获知决定。
一致的内存使用
集群管理库尽管组件不可靠,仍要提供可靠性。它们不应该自身引入不可靠性。不幸的是,由于内存使用和消息大小不断增长,Cluster无法长期运行而不出现故障。
在协议定义中,接受者和副本构成了协议的"记忆",因此它们记住一切。这些类永远不知道何时会收到旧槽的请求,也许来自落后的副本或领导者。为了保持正确性,它们保留自集群启动以来的所有决定列表。更糟糕的是,在 Welcome 消息中在副本之间传输决定使得这些消息在长期运行的集群中变得巨大。
解决这个问题的技术涉及定期"快照"(checkpoint)节点状态,只保留有限数量的决定。过于落后而无法提交快照的节点必须通过离开并重新加入集群来"重置"自己。
持久化存储
少数集群成员的失败是可以接受的,但接受者"忘记"已接受的值或做出的承诺则不可以。
不幸的是,当集群成员失败并重新启动时,恰好发生了这种情况:新初始化的接受者实例缺少前任的承诺和已接受值的记录。
有两种解决方法。较简单的方案是将接受者状态写入磁盘,并在启动时重新读取。较复杂的方案是移除失败的集群成员并要求添加新成员。动态的集群成员变更称为"视图变更"(view changes)。
视图变更
运维工程师必须调整集群大小以满足负载和可用性要求。简单的测试项目从最少三个节点的集群开始,任何一个节点的故障都不会产生影响。项目"上线"带来额外负载时需要更大的集群。
按现在的写法,Cluster无法在不重启集群的情况下更改对等节点集合。理想情况下,集群像状态机转换一样对成员关系保持一致性。这意味着集群成员集合(即_视图/view_)通过特殊的视图变更提案来更改。但Paxos算法依赖于对成员集合的普遍一致,因此必须为每个槽定义视图。
Lamport在"Paxos Made Simple"的最后几段中讨论了这个挑战:
"我们可以让一个领导者提前获取 \(\alpha\) 个命令,方法是让执行一致性算法第 \(i+\alpha\) 个实例的服务器集合由第 \(i\) 个状态机命令执行后的状态来指定。"(Lamport, 2001)
这个思路是每个Paxos实例(槽)使用 \(\alpha\) 个槽之前的视图。这允许集群同时工作在最多 \(\alpha\) 个槽上,因此较小的 \(\alpha\) 值限制了并发性,而较大的 \(\alpha\) 值则减慢了视图变更的生效速度。
在早期的实现草案中(忠实地保存在git历史中!),我实现了视图变更支持(使用 \(\alpha\) 为3)。这个看似简单的变更引入了相当大的复杂性:
- 跟踪最近 \(\alpha\) 个已提交槽的视图,并正确地与新节点共享,
- 忽略没有可用槽的提案,
- 检测失败的节点,
- 正确序列化多个竞争的视图变更,以及
- 在领导者和副本之间传递视图信息。
结果变得对于一本书来说太大了!
参考文献
除了原始的Paxos论文和Lamport的后续论文"Paxos Made Simple"之外,实现中所添加的扩展参考了多个资源。角色名称来自"Paxos Made Moderately Complex"。"Paxos Made Live"在快照方面特别有帮助,而"Paxos Made Practical"描述了视图变更(虽然不是本文所描述的类型)。Liskov的"From Viewstamped Replication to Byzantine Fault Tolerance"提供了视图变更的另一种视角。最后,Stack Overflow上的讨论帮助了解了成员的添加和移除。
L. Lamport, "The Part-Time Parliament," ACM Transactions on Computer Systems, 16(2):133–169, May 1998.
L. Lamport, "Paxos Made Simple," ACM SIGACT News (Distributed Computing Column) 32, 4 (Whole Number 121, December 2001) 51-58.
R. Van Renesse and D. Altinbuken, "Paxos Made Moderately Complex," ACM Comp. Survey 47, 3, Article 42 (Feb. 2015)
T. Chandra, R. Griesemer, and R. Redstone, "Paxos Made Live - An Engineering Perspective," Proceedings of the twenty-sixth annual ACM symposium on Principles of distributed computing (PODC '07). ACM, New York, NY, USA, 398-407.
B. Liskov, "From Viewstamped Replication to Byzantine Fault Tolerance," In Replication, Springer-Verlag, Berlin, Heidelberg 121-149 (2010)