# 怎么用Python完成一個分布式事務TCC
## 1. 分布式事務概述
### 1.1 什么是分布式事務
分布式事務是指事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位于不同的分布式系統的不同節點之上。在微服務架構中,一個業務操作往往需要跨多個服務完成數據一致性保證,這就產生了分布式事務的需求。
### 1.2 分布式事務的挑戰
與傳統單機事務相比,分布式事務面臨以下挑戰:
- **網絡不確定性**:網絡延遲、分區、超時等問題
- **服務可用性**:部分服務可能不可用
- **數據一致性**:跨多個數據源的一致性保證
- **性能影響**:協調成本帶來的性能下降
### 1.3 常見解決方案
常見的分布式事務解決方案包括:
- 2PC(兩階段提交)
- TCC(Try-Confirm-Cancel)
- 本地消息表
- Saga模式
- 最大努力通知
## 2. TCC模式詳解
### 2.1 TCC基本概念
TCC(Try-Confirm-Cancel)是一種補償型分布式事務解決方案,它將一個完整的業務邏輯拆分為三個階段:
1. **Try階段**:嘗試執行業務,完成所有業務檢查,預留必須的業務資源
2. **Confirm階段**:確認執行業務,真正提交事務,通常不會失敗
3. **Cancel階段**:取消執行業務,釋放Try階段預留的資源
### 2.2 TCC的優勢
- **最終一致性**:通過補償機制保證最終一致性
- **高性能**:相比2PC減少了鎖等待時間
- **靈活性**:業務可定制化程度高
- **適用場景廣**:特別適合長事務和高并發場景
### 2.3 TCC的不足
- **開發成本高**:需要為每個操作實現三個接口
- **業務侵入性強**:需要改造現有業務邏輯
- **冪等性要求**:每個階段操作必須支持冪等
- **空回滾問題**:需要處理Try未執行但收到Cancel的情況
## 3. Python實現TCC事務
### 3.1 系統架構設計
我們設計一個簡單的電商下單場景,涉及三個服務:
- 訂單服務(Order)
- 庫存服務(Inventory)
- 支付服務(Payment)
架構圖如下:
[Client] ↓ [API Gateway] ↓ [Order Service] → [Inventory Service] ↓ [Payment Service]
### 3.2 數據庫設計
每個服務有自己的數據庫,關鍵表設計:
**訂單服務**:
```python
class Order(models.Model):
order_id = models.UUIDField(primary_key=True)
user_id = models.IntegerField()
status = models.CharField(max_length=20) # 'created', 'confirmed', 'canceled'
amount = models.DecimalField(max_digits=10, decimal_places=2)
created_at = models.DateTimeField(auto_now_add=True)
class OrderTCC(models.Model):
tcc_id = models.UUIDField(primary_key=True)
order_id = models.UUIDField()
status = models.CharField(max_length=20) # 'trying', 'confirmed', 'canceled'
created_at = models.DateTimeField(auto_now_add=True)
庫存服務:
class Inventory(models.Model):
product_id = models.IntegerField(primary_key=True)
stock = models.IntegerField()
class InventoryTCC(models.Model):
tcc_id = models.UUIDField(primary_key=True)
product_id = models.IntegerField()
frozen_stock = models.IntegerField() # Try階段凍結的庫存
status = models.CharField(max_length=20)
# order_service/views.py
@transaction.atomic
def create_order_try(request):
data = json.loads(request.body)
order_id = uuid.uuid4()
tcc_id = uuid.uuid4()
# 本地事務:創建訂單TCC記錄
OrderTCC.objects.create(
tcc_id=tcc_id,
order_id=order_id,
status='trying'
)
# 調用庫存服務Try
inventory_response = requests.post(
'http://inventory-service/api/inventory/try',
json={
'tcc_id': str(tcc_id),
'product_id': data['product_id'],
'quantity': data['quantity']
}
)
if inventory_response.status_code != 200:
raise Exception("Inventory try failed")
# 調用支付服務Try
payment_response = requests.post(
'http://payment-service/api/payment/try',
json={
'tcc_id': str(tcc_id),
'user_id': data['user_id'],
'amount': data['amount']
}
)
if payment_response.status_code != 200:
# 補償已成功的Try操作
requests.post('http://inventory-service/api/inventory/cancel',
json={'tcc_id': str(tcc_id)})
raise Exception("Payment try failed")
return JsonResponse({'tcc_id': tcc_id, 'order_id': order_id})
def confirm_order(tcc_id):
try:
# 獲取TCC記錄
tcc = OrderTCC.objects.get(tcc_id=tcc_id, status='trying')
with transaction.atomic():
# 創建正式訂單
Order.objects.create(
order_id=tcc.order_id,
user_id=user_id, # 從上下文中獲取
status='confirmed',
amount=amount
)
# 更新TCC狀態
tcc.status = 'confirmed'
tcc.save()
# 調用庫存服務Confirm
requests.post('http://inventory-service/api/inventory/confirm',
json={'tcc_id': str(tcc_id)})
# 調用支付服務Confirm
requests.post('http://payment-service/api/payment/confirm',
json={'tcc_id': str(tcc_id)})
except Exception as e:
logger.error(f"Confirm failed: {str(e)}")
raise
def cancel_order(tcc_id):
try:
tcc = OrderTCC.objects.filter(tcc_id=tcc_id).first()
if not tcc:
# 處理空回滾情況
return
if tcc.status == 'canceled':
# 已處理過,冪等返回
return
with transaction.atomic():
if tcc.status == 'trying':
# 只有Try成功才需要創建取消記錄
Order.objects.create(
order_id=tcc.order_id,
user_id=user_id,
status='canceled',
amount=0
)
tcc.status = 'canceled'
tcc.save()
# 調用庫存服務Cancel
requests.post('http://inventory-service/api/inventory/cancel',
json={'tcc_id': str(tcc_id)})
# 調用支付服務Cancel
requests.post('http://payment-service/api/payment/cancel',
json={'tcc_id': str(tcc_id)})
except Exception as e:
logger.error(f"Cancel failed: {str(e)}")
raise
# 使用requests庫實現服務調用
def call_service(url, data, retry=3):
for i in range(retry):
try:
response = requests.post(url, json=data, timeout=5)
if response.status_code == 200:
return True
except (requests.exceptions.RequestException,
requests.exceptions.Timeout) as e:
logger.warning(f"Retry {i+1} for {url}: {str(e)}")
time.sleep(1)
return False
為提高可靠性,可以引入RabbitMQ實現異步通信:
# 使用pika庫實現MQ通信
import pika
def setup_mq():
connection = pika.BlockingConnection(pika.ConnectionParameters('mq_host'))
channel = connection.channel()
# 聲明TCC事務交換機和隊列
channel.exchange_declare(exchange='tcc_transaction', exchange_type='topic')
channel.queue_declare(queue='order_service')
channel.queue_bind(queue='order_service', exchange='tcc_transaction', routing_key='order.#')
return channel
def publish_tcc_event(channel, event_type, tcc_id):
channel.basic_publish(
exchange='tcc_transaction',
routing_key=f'order.{event_type}',
body=json.dumps({'tcc_id': str(tcc_id)})
每個TCC接口必須實現冪等:
# 庫存服務Confirm接口示例
def inventory_confirm(request):
data = json.loads(request.body)
tcc_id = data['tcc_id']
tcc = InventoryTCC.objects.filter(tcc_id=tcc_id).first()
if not tcc:
return JsonResponse({'status': 'not_found'}, status=404)
if tcc.status == 'confirmed':
return JsonResponse({'status': 'already_confirmed'})
with transaction.atomic():
# 扣減真實庫存
inventory = Inventory.objects.get(product_id=tcc.product_id)
inventory.stock -= tcc.frozen_stock
inventory.save()
# 更新TCC狀態
tcc.status = 'confirmed'
tcc.save()
return JsonResponse({'status': 'success'})
def inventory_cancel(request):
data = json.loads(request.body)
tcc_id = data['tcc_id']
tcc = InventoryTCC.objects.filter(tcc_id=tcc_id).first()
if not tcc:
# 記錄空回滾,防止Try階段成功后再執行Cancel
InventoryTCC.objects.create(
tcc_id=tcc_id,
product_id=data.get('product_id', 0),
frozen_stock=0,
status='canceled'
)
return JsonResponse({'status': 'empty_cancel'})
# 正常取消邏輯...
from apscheduler.schedulers.background import BackgroundScheduler
def check_hanging_transactions():
# 查找超過一定時間未完成的TCC記錄
timeout = datetime.now() - timedelta(minutes=30)
hanging_tccs = OrderTCC.objects.filter(
status='trying',
created_at__lt=timeout
)
for tcc in hanging_tccs:
try:
cancel_order(tcc.tcc_id)
except Exception as e:
logger.error(f"Failed to cancel hanging TCC {tcc.tcc_id}: {str(e)}")
# 啟動定時任務
scheduler = BackgroundScheduler()
scheduler.add_job(check_hanging_transactions, 'interval', minutes=5)
scheduler.start()
使用Celery實現異步任務:
from celery import Celery
app = Celery('tcc_transaction', broker='pyamqp://guest@localhost//')
@app.task(bind=True, max_retries=3)
def confirm_order_task(self, tcc_id):
try:
confirm_order(tcc_id)
except Exception as exc:
raise self.retry(exc=exc, countdown=2**self.request.retry)
def batch_confirm(tcc_ids):
with transaction.atomic():
tcc_list = OrderTCC.objects.filter(
tcc_id__in=tcc_ids,
status='trying'
).select_for_update()
for tcc in tcc_list:
# 批量確認邏輯
pass
使用Redis實現分布式鎖:
import redis
from contextlib import contextmanager
redis_client = redis.StrictRedis(host='localhost', port=6379)
@contextmanager
def redis_lock(lock_key, timeout=10):
identifier = str(uuid.uuid4())
end = time.time() + timeout
while time.time() < end:
if redis_client.setnx(lock_key, identifier):
redis_client.expire(lock_key, timeout)
try:
yield
finally:
if redis_client.get(lock_key) == identifier:
redis_client.delete(lock_key)
return
time.sleep(0.001)
raise Exception("Could not acquire lock")
import pytest
from unittest.mock import patch
@pytest.mark.django_db
def test_order_try_success():
with patch('requests.post') as mock_post:
mock_post.return_value.status_code = 200
response = client.post('/api/order/try', data={
'user_id': 1,
'product_id': 101,
'quantity': 2,
'amount': 100.00
})
assert response.status_code == 200
assert OrderTCC.objects.count() == 1
@pytest.mark.integration
def test_full_tcc_flow():
# 1. 執行Try階段
try_response = create_order_try(test_data)
# 2. 驗證Try結果
assert try_response.status == 'success'
# 3. 執行Confirm階段
confirm_response = confirm_order(try_response.tcc_id)
# 4. 驗證最終一致性
assert order.status == 'confirmed'
assert inventory.stock == original_stock - quantity
assert payment.balance == original_balance - amount
def test_network_partition():
with ChaosMonkey() as cm:
cm.network_partition('order-service', 'inventory-service')
response = create_order_try(test_data)
assert response.status_code == 500
cm.heal_network()
# 驗證補償是否成功
assert InventoryTCC.objects.get(tcc_id=response.tcc_id).status == 'canceled'
本文詳細介紹了如何使用Python實現TCC模式的分布式事務,包括:
在實際應用中,還可以考慮以下方向進行擴展:
TCC模式雖然實現復雜度較高,但在需要強一致性的分布式系統中仍然是非常有價值的解決方案。通過合理的架構設計和代碼實現,可以在Python生態中構建可靠的分布式事務處理能力。 “`
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。