溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么用python完成一個分布式事務TCC

發布時間:2021-10-25 10:11:41 來源:億速云 閱讀:169 作者:iii 欄目:開發技術
# 怎么用Python完成一個分布式事務TCC

## 1. 分布式事務概述

### 1.1 什么是分布式事務

分布式事務是指事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位于不同的分布式系統的不同節點之上。在微服務架構中,一個業務操作往往需要跨多個服務完成數據一致性保證,這就產生了分布式事務的需求。

### 1.2 分布式事務的挑戰

與傳統單機事務相比,分布式事務面臨以下挑戰:
- **網絡不確定性**:網絡延遲、分區、超時等問題
- **服務可用性**:部分服務可能不可用
- **數據一致性**:跨多個數據源的一致性保證
- **性能影響**:協調成本帶來的性能下降

### 1.3 常見解決方案

常見的分布式事務解決方案包括:
- 2PC(兩階段提交)
- TCC(Try-Confirm-Cancel)
- 本地消息表
- Saga模式
- 最大努力通知

## 2. TCC模式詳解

### 2.1 TCC基本概念

TCC(Try-Confirm-Cancel)是一種補償型分布式事務解決方案,它將一個完整的業務邏輯拆分為三個階段:

1. **Try階段**:嘗試執行業務,完成所有業務檢查,預留必須的業務資源
2. **Confirm階段**:確認執行業務,真正提交事務,通常不會失敗
3. **Cancel階段**:取消執行業務,釋放Try階段預留的資源

### 2.2 TCC的優勢

- **最終一致性**:通過補償機制保證最終一致性
- **高性能**:相比2PC減少了鎖等待時間
- **靈活性**:業務可定制化程度高
- **適用場景廣**:特別適合長事務和高并發場景

### 2.3 TCC的不足

- **開發成本高**:需要為每個操作實現三個接口
- **業務侵入性強**:需要改造現有業務邏輯
- **冪等性要求**:每個階段操作必須支持冪等
- **空回滾問題**:需要處理Try未執行但收到Cancel的情況

## 3. Python實現TCC事務

### 3.1 系統架構設計

我們設計一個簡單的電商下單場景,涉及三個服務:
- 訂單服務(Order)
- 庫存服務(Inventory)
- 支付服務(Payment)

架構圖如下:

[Client] ↓ [API Gateway] ↓ [Order Service] → [Inventory Service] ↓ [Payment Service]


### 3.2 數據庫設計

每個服務有自己的數據庫,關鍵表設計:

**訂單服務**:
```python
class Order(models.Model):
    order_id = models.UUIDField(primary_key=True)
    user_id = models.IntegerField()
    status = models.CharField(max_length=20)  # 'created', 'confirmed', 'canceled'
    amount = models.DecimalField(max_digits=10, decimal_places=2)
    created_at = models.DateTimeField(auto_now_add=True)
    
class OrderTCC(models.Model):
    tcc_id = models.UUIDField(primary_key=True)
    order_id = models.UUIDField()
    status = models.CharField(max_length=20)  # 'trying', 'confirmed', 'canceled'
    created_at = models.DateTimeField(auto_now_add=True)

庫存服務

class Inventory(models.Model):
    product_id = models.IntegerField(primary_key=True)
    stock = models.IntegerField()
    
class InventoryTCC(models.Model):
    tcc_id = models.UUIDField(primary_key=True)
    product_id = models.IntegerField()
    frozen_stock = models.IntegerField()  # Try階段凍結的庫存
    status = models.CharField(max_length=20)

3.3 核心代碼實現

3.3.1 Try階段實現

# order_service/views.py
@transaction.atomic
def create_order_try(request):
    data = json.loads(request.body)
    order_id = uuid.uuid4()
    tcc_id = uuid.uuid4()
    
    # 本地事務:創建訂單TCC記錄
    OrderTCC.objects.create(
        tcc_id=tcc_id,
        order_id=order_id,
        status='trying'
    )
    
    # 調用庫存服務Try
    inventory_response = requests.post(
        'http://inventory-service/api/inventory/try',
        json={
            'tcc_id': str(tcc_id),
            'product_id': data['product_id'],
            'quantity': data['quantity']
        }
    )
    
    if inventory_response.status_code != 200:
        raise Exception("Inventory try failed")
    
    # 調用支付服務Try
    payment_response = requests.post(
        'http://payment-service/api/payment/try',
        json={
            'tcc_id': str(tcc_id),
            'user_id': data['user_id'],
            'amount': data['amount']
        }
    )
    
    if payment_response.status_code != 200:
        # 補償已成功的Try操作
        requests.post('http://inventory-service/api/inventory/cancel', 
                     json={'tcc_id': str(tcc_id)})
        raise Exception("Payment try failed")
    
    return JsonResponse({'tcc_id': tcc_id, 'order_id': order_id})

3.3.2 Confirm階段實現

def confirm_order(tcc_id):
    try:
        # 獲取TCC記錄
        tcc = OrderTCC.objects.get(tcc_id=tcc_id, status='trying')
        
        with transaction.atomic():
            # 創建正式訂單
            Order.objects.create(
                order_id=tcc.order_id,
                user_id=user_id,  # 從上下文中獲取
                status='confirmed',
                amount=amount
            )
            
            # 更新TCC狀態
            tcc.status = 'confirmed'
            tcc.save()
            
        # 調用庫存服務Confirm
        requests.post('http://inventory-service/api/inventory/confirm',
                    json={'tcc_id': str(tcc_id)})
                    
        # 調用支付服務Confirm
        requests.post('http://payment-service/api/payment/confirm',
                    json={'tcc_id': str(tcc_id)})
                    
    except Exception as e:
        logger.error(f"Confirm failed: {str(e)}")
        raise

3.3.3 Cancel階段實現

def cancel_order(tcc_id):
    try:
        tcc = OrderTCC.objects.filter(tcc_id=tcc_id).first()
        
        if not tcc:
            # 處理空回滾情況
            return
            
        if tcc.status == 'canceled':
            # 已處理過,冪等返回
            return
            
        with transaction.atomic():
            if tcc.status == 'trying':
                # 只有Try成功才需要創建取消記錄
                Order.objects.create(
                    order_id=tcc.order_id,
                    user_id=user_id,
                    status='canceled',
                    amount=0
                )
            
            tcc.status = 'canceled'
            tcc.save()
            
        # 調用庫存服務Cancel
        requests.post('http://inventory-service/api/inventory/cancel',
                    json={'tcc_id': str(tcc_id)})
                    
        # 調用支付服務Cancel
        requests.post('http://payment-service/api/payment/cancel',
                    json={'tcc_id': str(tcc_id)})
                    
    except Exception as e:
        logger.error(f"Cancel failed: {str(e)}")
        raise

3.4 服務間通信設計

3.4.1 基于HTTP的實現

# 使用requests庫實現服務調用
def call_service(url, data, retry=3):
    for i in range(retry):
        try:
            response = requests.post(url, json=data, timeout=5)
            if response.status_code == 200:
                return True
        except (requests.exceptions.RequestException, 
                requests.exceptions.Timeout) as e:
            logger.warning(f"Retry {i+1} for {url}: {str(e)}")
            time.sleep(1)
    return False

3.4.2 引入消息隊列

為提高可靠性,可以引入RabbitMQ實現異步通信:

# 使用pika庫實現MQ通信
import pika

def setup_mq():
    connection = pika.BlockingConnection(pika.ConnectionParameters('mq_host'))
    channel = connection.channel()
    
    # 聲明TCC事務交換機和隊列
    channel.exchange_declare(exchange='tcc_transaction', exchange_type='topic')
    channel.queue_declare(queue='order_service')
    channel.queue_bind(queue='order_service', exchange='tcc_transaction', routing_key='order.#')
    
    return channel

def publish_tcc_event(channel, event_type, tcc_id):
    channel.basic_publish(
        exchange='tcc_transaction',
        routing_key=f'order.{event_type}',
        body=json.dumps({'tcc_id': str(tcc_id)})

4. 異常處理與可靠性保障

4.1 冪等性設計

每個TCC接口必須實現冪等:

# 庫存服務Confirm接口示例
def inventory_confirm(request):
    data = json.loads(request.body)
    tcc_id = data['tcc_id']
    
    tcc = InventoryTCC.objects.filter(tcc_id=tcc_id).first()
    if not tcc:
        return JsonResponse({'status': 'not_found'}, status=404)
        
    if tcc.status == 'confirmed':
        return JsonResponse({'status': 'already_confirmed'})
        
    with transaction.atomic():
        # 扣減真實庫存
        inventory = Inventory.objects.get(product_id=tcc.product_id)
        inventory.stock -= tcc.frozen_stock
        inventory.save()
        
        # 更新TCC狀態
        tcc.status = 'confirmed'
        tcc.save()
        
    return JsonResponse({'status': 'success'})

4.2 空回滾處理

def inventory_cancel(request):
    data = json.loads(request.body)
    tcc_id = data['tcc_id']
    
    tcc = InventoryTCC.objects.filter(tcc_id=tcc_id).first()
    if not tcc:
        # 記錄空回滾,防止Try階段成功后再執行Cancel
        InventoryTCC.objects.create(
            tcc_id=tcc_id,
            product_id=data.get('product_id', 0),
            frozen_stock=0,
            status='canceled'
        )
        return JsonResponse({'status': 'empty_cancel'})
        
    # 正常取消邏輯...

4.3 定時任務補償

from apscheduler.schedulers.background import BackgroundScheduler

def check_hanging_transactions():
    # 查找超過一定時間未完成的TCC記錄
    timeout = datetime.now() - timedelta(minutes=30)
    hanging_tccs = OrderTCC.objects.filter(
        status='trying',
        created_at__lt=timeout
    )
    
    for tcc in hanging_tccs:
        try:
            cancel_order(tcc.tcc_id)
        except Exception as e:
            logger.error(f"Failed to cancel hanging TCC {tcc.tcc_id}: {str(e)}")

# 啟動定時任務
scheduler = BackgroundScheduler()
scheduler.add_job(check_hanging_transactions, 'interval', minutes=5)
scheduler.start()

5. 性能優化與擴展

5.1 異步化改造

使用Celery實現異步任務:

from celery import Celery

app = Celery('tcc_transaction', broker='pyamqp://guest@localhost//')

@app.task(bind=True, max_retries=3)
def confirm_order_task(self, tcc_id):
    try:
        confirm_order(tcc_id)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=2**self.request.retry)

5.2 批量處理

def batch_confirm(tcc_ids):
    with transaction.atomic():
        tcc_list = OrderTCC.objects.filter(
            tcc_id__in=tcc_ids,
            status='trying'
        ).select_for_update()
        
        for tcc in tcc_list:
            # 批量確認邏輯
            pass

5.3 引入分布式鎖

使用Redis實現分布式鎖:

import redis
from contextlib import contextmanager

redis_client = redis.StrictRedis(host='localhost', port=6379)

@contextmanager
def redis_lock(lock_key, timeout=10):
    identifier = str(uuid.uuid4())
    end = time.time() + timeout
    
    while time.time() < end:
        if redis_client.setnx(lock_key, identifier):
            redis_client.expire(lock_key, timeout)
            try:
                yield
            finally:
                if redis_client.get(lock_key) == identifier:
                    redis_client.delete(lock_key)
            return
        time.sleep(0.001)
    raise Exception("Could not acquire lock")

6. 測試策略

6.1 單元測試

import pytest
from unittest.mock import patch

@pytest.mark.django_db
def test_order_try_success():
    with patch('requests.post') as mock_post:
        mock_post.return_value.status_code = 200
        
        response = client.post('/api/order/try', data={
            'user_id': 1,
            'product_id': 101,
            'quantity': 2,
            'amount': 100.00
        })
        
        assert response.status_code == 200
        assert OrderTCC.objects.count() == 1

6.2 集成測試

@pytest.mark.integration
def test_full_tcc_flow():
    # 1. 執行Try階段
    try_response = create_order_try(test_data)
    
    # 2. 驗證Try結果
    assert try_response.status == 'success'
    
    # 3. 執行Confirm階段
    confirm_response = confirm_order(try_response.tcc_id)
    
    # 4. 驗證最終一致性
    assert order.status == 'confirmed'
    assert inventory.stock == original_stock - quantity
    assert payment.balance == original_balance - amount

6.3 混沌測試

def test_network_partition():
    with ChaosMonkey() as cm:
        cm.network_partition('order-service', 'inventory-service')
        
        response = create_order_try(test_data)
        assert response.status_code == 500
        
        cm.heal_network()
        
        # 驗證補償是否成功
        assert InventoryTCC.objects.get(tcc_id=response.tcc_id).status == 'canceled'

7. 總結與展望

本文詳細介紹了如何使用Python實現TCC模式的分布式事務,包括:

  1. TCC模式的基本原理和三個階段實現
  2. Python下的具體代碼實現方案
  3. 異常處理和可靠性保障機制
  4. 性能優化和擴展方案
  5. 全面的測試策略

在實際應用中,還可以考慮以下方向進行擴展:

  • 與Saga模式結合,處理更復雜的業務場景
  • 引入分布式事務框架如Seata的Python版本
  • 增加監控和告警機制,提高系統可觀測性
  • 優化事務日志存儲,提高恢復效率

TCC模式雖然實現復雜度較高,但在需要強一致性的分布式系統中仍然是非常有價值的解決方案。通過合理的架構設計和代碼實現,可以在Python生態中構建可靠的分布式事務處理能力。 “`

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女