Python实现分布式键值存储系统
标签搜索
侧边栏壁纸
  • 累计撰写 9 篇文章
  • 累计收到 1 条评论

Python实现分布式键值存储系统

AiiCi
2024-05-12 / 0 评论 / 5 阅读 / 正在检测是否收录...

一、简介

  在当今互联网时代,分布式系统已经成为构建高可用性、可扩展性和容错性的关键技术之一。然而,构建一个稳定、可靠的分布式系统并不容易,其中最大的挑战之一是如何实现一致性复制和故障容错。为了解决这个问题,出现了许多分布式共识算法,如Paxos和Raft等。
  Raft算法是一种相对容易理解和实现的分布式一致性算法,它通过选举机制和日志复制来保证系统中各个节点之间的一致性。Raft算法的出现极大地简化了分布式系统的设计和实现,因此被广泛应用于各种分布式系统中,如分布式存储系统、分布式数据库、分布式日志系统等。

二、算法原理

  • Leader选举:在Raft算法中,节点通过选举产生一个领导者(Leader)。在正常情况下,每个节点都处于三种可能的状态之一:Leader、Follower和Candidate。初始状态下,所有节点都是Follower。当一个Follower节点在一段时间内没有收到Leader节点的心跳消息时,它会变成Candidate并开始一轮新的选举。节点会向其他节点发送投票请求,并根据收到的投票决定是否成为新的Leader。
  • 日志复制:一旦选出Leader,它就负责处理客户端的请求,并将操作作为日志条目复制到其他节点。Leader节点会周期性地向其他节点发送心跳消息,以保持其领导地位。其他节点在接收到Leader的日志条目后,会将其追加到自己的日志中,并向Leader发送确认消息。一旦Leader收到大多数节点的确认消息,就会提交该日志条目,并将其应用到状态机中。
  • 一致性保证:Raft算法保证了日志的一致性和强一致性。只有在大多数节点(超过半数)都提交了相同的日志条目时,才能保证日志的一致性。如果Leader节点失效或者网络分区,系统会重新选举新的Leader,以保证系统的可用性和一致性。
  • 安全性保障:Raft算法通过适当的消息通信和状态转换保证了系统的安全性。节点在选举过程中会根据自己的状态和收到的消息进行状态转换,从而避免出现多个Leader或者数据不一致的情况。
  • 容错性:Raft算法具有良好的容错性,能够应对节点失效、网络分区等异常情况。一旦Leader节点失效,其他节点会在一定时间内重新选举新的Leader;如果网络分区发生,Raft算法会保证只有大多数节点中的一部分成为新的Leader,从而避免数据不一致的情况。
  • Raft算法通过Leader选举和日志复制机制保证了分布式系统的一致性和可用性,具有较好的容错性和安全性。你的代码实现了Raft算法的关键部分,包括节点状态转换、Leader选举、日志复制等,从而实现了一个简单的分布式键值存储系统。

三、算法实现

from flask_executor import Executor
import threading
import time
import random
import logging
import requests
from flask import Flask, request, render_template_string

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
MAX_RETRIES = 3
ELECTION_TIMEOUT_MIN = 1  # 最小选举超时时间(秒)
ELECTION_TIMEOUT_MAX = 2  # 最大选举超时时间(秒)


class RaftNode:
    def __init__(self, node_id, peers):
        self.node_id = node_id
        self.peers = peers
        self.current_term = 0
        self.voted_for = None
        self.state = 'follower'
        self.last_heartbeat = None  # 初始化为None,等待启动后再更新
        self.timeout = random.uniform(ELECTION_TIMEOUT_MIN, ELECTION_TIMEOUT_MAX)
        self.failed_election_attempts = 0
        self.max_failed_attempts = 3
        self.lock = threading.Lock()  # 添加锁对象
        self.retry_timeout = 1  # 初始重试超时时间为1秒
        self.max_retry_timeout = 10  # 最大重试超时时间为10秒
        self.session = requests.Session()  # 使用Session对象管理连接
        self.vote_count = {candidate: 0 for candidate in peers}  # 初始化每个候选人的选票数为0

    def start(self):
        self.last_heartbeat = time.time()  # 初始化 last_heartbeat
        threading.Thread(target=self.run).start()

    def run(self):
        while True:
            if time.time() - self.last_heartbeat > self.timeout:
                with self.lock:
                    if self.state in ['follower', 'candidate']:
                        logging.info("开始选举...")
                        self.start_election()
                    elif self.state == 'leader':
                        logging.info("有leader产生了!")
                        self.send_heartbeat()
            time.sleep(0.1)

    def start_election(self):
        # with self.lock:
        if self.state == 'follower' or self.state == 'candidate':
            self.state = 'candidate'
            self.current_term += 1
            self.voted_for = self.node_id  # 自己先投自己一票
            self.last_heartbeat = time.time()
            self.failed_election_attempts = 0
            votes = 1
            responses = []
            for peer in self.peers:
                response = self.request_vote(peer)
                responses.append(response)
                if response and response['term'] == self.current_term and response['vote_granted']:
                    votes += 1
                    if votes > len(self.peers) / 2:
                        self.state = 'leader'
                        logging.info(f"Node {self.node_id} becomes leader for term {self.current_term}")
                        break
                else:
                    if response and response['term'] > self.current_term:
                        self.current_term = response['term']
                        self.state = 'follower'
                        break
            else:
                max_term = max(response['term'] for response in responses if response)
                if max_term > self.current_term:
                    self.current_term = max_term
                    self.state = 'follower'
                    return
            if self.state != 'leader':
                self.failed_election_attempts += 1
                if self.failed_election_attempts >= self.max_failed_attempts:
                    logging.error("Exceeded maximum failed election attempts. Retrying election with backoff.")
                    self.retry_election_with_backoff()  # 使用指数退避策略重试选举

            # 输出当前的选票情况
            logging.info("当前选票情况:")
            for candidate, votes in self.vote_count.items():
                logging.info(f"候选人 {candidate} 获得 {votes} 票")

    def replicate_data(self, data):
        for peer in self.peers:
            try:
                response = self.session.post(peer + '/replicate_data', json=data)
                if response.status_code != 200:
                    logging.error(f"Error replicating data to {peer}: {response.status_code}")
            except requests.RequestException as e:
                logging.error(f"Error replicating data to {peer}: {e}")

    def retry_election_with_backoff(self):
        # with self.lock:
        retry_count = 0
        while retry_count < MAX_RETRIES:
            time.sleep(self.retry_timeout)
            self.retry_timeout *= 2
            self.start_election()
            retry_count += 1

    def request_vote(self, peer):
        # with self.lock:
        if self.state == 'candidate':
            try:
                response = self.session.post(peer + '/vote_request',
                                             data={'term': self.current_term, 'candidate_id': self.node_id})
                return response.json()
            except requests.RequestException as e:
                logging.error(f"Error sending vote request to {peer}: {e}")
                return None  # 网络请求出错时返回None
        return None

    def handle_vote_request(self):
        # 处理来自其他节点的投票请求
        data = request.form
        term = int(data['term'])
        candidate_id = data['candidate_id']
        # with self.lock:
        if term > self.current_term:
            self.state = 'follower'
            self.current_term = term
            self.voted_for = candidate_id
            return {'term': self.current_term, 'vote_granted': True}
        elif term == self.current_term and (not self.voted_for or self.voted_for == candidate_id):
            self.voted_for = candidate_id
            return {'term': self.current_term, 'vote_granted': True}
        else:
            return {'term': self.current_term, 'vote_granted': False}

    def handle_heartbeat(self):
        data = request.get_json()
        term = data.get('term')

        if term > self.current_term:
            self.current_term = term
            self.state = 'follower'
            self.voted_for = None

        return {'success': True}

    def send_heartbeat(self):
        logging.info(f"Leader {self.node_id} sending heartbeat for term {self.current_term}")
        self.last_heartbeat = time.time()
        for peer in self.peers:
            retries = 0
            while retries < MAX_RETRIES:
                try:
                    response = self.session.post(peer + '/heartbeat', data={'term': self.current_term})
                    if response.status_code == 200:
                        logging.info(f"Heartbeat sent successfully to {peer}")
                        break
                    else:
                        logging.error(f"Error sending heartbeat to {peer}: {response.status_code}")
                        retries += 1
                        time.sleep(1)  # 等待一段时间后重试发送心跳
                except requests.RequestException as e:
                    logging.error(f"Error sending heartbeat to {peer}: {e}")
                    retries += 1
                    time.sleep(1)  # 等待一段时间后重试发送心跳


class KeyValueStoreNode(RaftNode):
    def __init__(self, node_id, peers):
        super().__init__(node_id, peers)
        self.shared_data = {}  # 存储共享数据

        self.app = Flask(__name__)
        self.executor = Executor(self.app)
        self.app.config['EXECUTOR'] = self.executor

        # 设置路由规则
        self.app.add_url_rule('/kv', 'handle_request', self.handle_request, methods=['GET', 'POST', 'DELETE'])
        self.app.add_url_rule('/vote_request', 'handle_vote_request', self.handle_vote_request, methods=['POST'])
        self.app.add_url_rule('/replicate_data', 'replicate_data', self.replicate_data_handler, methods=['POST'])
        self.app.add_url_rule('/heartbeat', 'handle_heartbeat', self.handle_heartbeat, methods=['POST'])
        self.app.add_url_rule('/', 'index', self.index)

    def index(self):
        return render_template_string('''
               <h1>节点信息</h1>
               <p>当前节点: {{ node_id }}</p>
               <p>已存储的键值对: {{ shared_data }}</p>
               <p>当前状态: {{ state }}</p>
           ''', node_id=self.node_id, shared_data=self.shared_data, state=self.state)

    def handle_request(self):
        action = request.args.get('action')
        key = request.args.get('key')
        value = request.args.get('value')
        if self.state == 'leader':
            if action == 'get':
                return self.get(key)
            elif action == 'set':
                self.set(key, value)
                return "OK"
            elif action == 'delete':
                self.delete(key)
                return "OK"
            else:
                return "ERROR: Invalid action"
        else:
            leader_url = f"{self.peers[0]}/kv"
            try:
                response = self.session.request(method=request.method, url=leader_url, params=request.args)
                return response.text
            except requests.RequestException as e:
                return f"ERROR: {e}"

    def get(self, key):
        return str(self.shared_data.get(key, "Key not found"))

    def set(self, key, value):
        self.shared_data[key] = value
        # 当节点是领导者时,将数据复制到其他节点
        if self.state == 'leader':
            self.replicate_data({'key': key, 'value': value})

    def delete(self, key):
        if key in self.shared_data:
            del self.shared_data[key]
            # 当节点是领导者时,将删除操作复制到其他节点
            if self.state == 'leader':
                self.replicate_data({'key': key, 'value': None})

    def replicate_data_handler(self):
        data = request.json
        key = data.get('key')
        value = data.get('value')

        if value is not None:
            self.shared_data[key] = value
        else:
            if key in self.shared_data:
                del self.shared_data[key]

        return "OK"

    def start(self):
        super().start()
        threading.Thread(target=self.app.run,
                         kwargs={'host': '0.0.0.0', 'port': int(self.node_id.split(":")[-1])}).start()


if __name__ == "__main__":
    node1 = KeyValueStoreNode("http://127.0.0.1:5001", ["http://127.0.0.1:5002", "http://127.0.0.1:5003"])
    node2 = KeyValueStoreNode("http://127.0.0.1:5002", ["http://127.0.0.1:5001", "http://127.0.0.1:5003"])
    node3 = KeyValueStoreNode("http://127.0.0.1:5003", ["http://127.0.0.1:5001", "http://127.0.0.1:5002"])
    node1.start()
    node2.start()
    node3.start()
    while True:
        time.sleep(1)

四、总结

  选择使用Python实现Raft算法的分布式键值存储系统项目的原因在于,一方面通过实现一个基于Raft算法的分布式系统可以深入理解分布式一致性算法的原理和实现细节,提升对分布式系统的理解和掌握程度;另一方面,实践是学习的最佳方式,通过动手实现一个分布式系统可以将理论知识转化为实际能力,提升解决实际问题的能力。此外,分布式系统涉及到诸多复杂的技术挑战,如一致性算法、节点管理、故障处理等,通过挑战这些技术问题可以提升解决问题的能力和技术水平。

0

海报

正在生成.....

评论 (0)

取消