溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python中怎么使用Celery并行分布式框架

發布時間:2021-06-16 16:16:07 來源:億速云 閱讀:201 作者:Leah 欄目:開發技術

這篇文章將為大家詳細講解有關Python中怎么使用Celery并行分布式框架,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

Celery 簡介

除了redis,還可以使用另外一個神器---Celery。Celery是一個異步任務的調度工具。

Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等著被分配工作的碼農。

在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是“中間人”的意思,在這里 Broker 起到一個中間人的角色。在工頭提出任務的時候,把所有的任務放到 Broker 里面,在 Broker 的另外一頭,一群碼農等著取出一個個任務準備著手做。

這種模式注定了整個系統會是個開環系統,工頭對于碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像我們的 Broker,也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。

Celery(芹菜)是一個異步任務隊列/基于分布式消息傳遞的作業隊列。它側重于實時操作,但對調度支持也很好。Celery用于生產系統每天處理數以百萬計的任務。Celery是用Python編寫的,但該協議可以在任何語言實現。它也可以與其他語言通過webhooks實現。Celery建議的消息隊列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和數據庫(使用SQLAlchemy的或Django的 ORM) 。

Celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

在學習Celery之前,我先簡單的去了解了一下什么是生產者消費者模式。

生產者消費者模式

在實際的軟件開發過程中,經常會碰到如下場景:某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。

單單抽象出生產者和消費者,還夠不上是生產者消費者模式。該模式還需要有一個緩沖區處于生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據,如下圖所示:

Python中怎么使用Celery并行分布式框架

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過消息隊列(緩沖區)來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給消息隊列,消費者不找生產者要數據,而是直接從消息隊列里取,消息隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。這個消息隊列就是用來給生產者和消費者解耦的。------------->這里又有一個問題,什么叫做解耦?

解耦

假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對于消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴于某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。生產者直接調用消費者的某個方法,還有另一個弊端。由于函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。

因為太抽象,看過網上的說明之后,通過我的理解,我舉了個例子:吃包子。

假如你非常喜歡吃包子(吃起來根本停不下來),今天,你媽媽(生產者)在蒸包子,廚房有張桌子(緩沖區),你媽媽將蒸熟的包子盛在盤子(消息)里,然后放到桌子上,你正在看巴西奧運會,看到蒸熟的包子放在廚房桌子上的盤子里,你就把盤子取走,一邊吃包子一邊看奧運。在這個過程中,你和你媽媽使用同一個桌子放置盤子和取走盤子,這里桌子就是一個共享對象。生產者添加食物,消費者取走食物。桌子的好處是,你媽媽不用直接把盤子給你,只是負責把包子裝在盤子里放到桌子上,如果桌子滿了,就不再放了,等待。而且生產者還有其他事情要做,消費者吃包子比較慢,生產者不能一直等消費者吃完包子把盤子放回去再去生產,因為吃包子的人有很多,如果這期間你好朋友來了,和你一起吃包子,生產者不用關注是哪個消費者去桌子上拿盤子,而消費者只去關注桌子上有沒有放盤子,如果有,就端過來吃盤子中的包子,沒有的話就等待。對應關系如下圖:

Python中怎么使用Celery并行分布式框架

考察了一下,原來當初設計這個模式,主要就是用來處理并發問題的,而Celery就是一個用python寫的并行分布式框架。

然后我接著去學習Celery

Celery 是一個強大的 分布式任務隊列 的 異步處理框架,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。我們需要一個消息隊列來下發我們的任務。首先要有一個消息中間件,此處選擇rabbitmq (也可選擇 redis 或 Amazon Simple Queue Service(SQS)消息隊列服務)。推薦 選擇 rabbitmq 。使用RabbitMQ是官方特別推薦的方式,因此我也使用它作為我們的broker。

Celery的定義

Celery(芹菜)是一個簡單、靈活且可靠的,處理大量消息的分布式系統,并且提供維護這樣一個系統的必需工具。

我比較喜歡的一點是:Celery支持使用任務隊列的方式在分布的機器、進程、線程上執行任務調度。然后我接著去理解什么是任務隊列。

任務隊列

任務隊列是一種在線程或機器間分發任務的機制。

消息隊列

消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)進程持續監視隊列中是否有需要處理的新任務。

Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之后中間人把消息派送給職程,職程對消息進行處理。如下圖所示:

Python中怎么使用Celery并行分布式框架

Celery 系統可包含多個職程和中間人,以此獲得高可用性和橫向擴展能力。

Celery的架構

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件

Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ,Redis,MongoDB等,這里我先去了解RabbitMQ,Redis。

linux安裝redis參考:https://www.jb51.net/article/146751.htm

docker 安裝redis參考://www.jb51.net/article/148880.htm

docker安裝rabbitmq參考:https://www.jb51.net/article/144748.htm

任務執行單元

Worker是Celery提供的任務執行的單元,worker并發的運行在分布式的系統節點中

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。

然后我接著去安裝Celery,在安裝Celery之前,我已經在自己虛擬機上安裝好了Python,版本是3.6,

安裝celery,版本為4.2.1

sudo apt install python-celery-common

因為涉及到消息中間件,所以我先去選擇一個在我工作中要用到的消息中間件(在Celery幫助文檔中稱呼為中間人<broker>),為了更好的去理解文檔中的例子,我安裝了兩個中間件,一個是RabbitMQ,一個redis。

在這里我就先根據Celery的幫助文檔安裝和設置RabbitMQ。要使用 Celery,我們需要創建一個 RabbitMQ 用戶、一個虛擬主機,并且允許這個用戶訪問這個虛擬主機。下面是我在個人pc機Ubuntu16.04上的設置:

$ sudo rabbitmqctl add_user forward password

#創建了一個RabbitMQ用戶,用戶名為forward,密碼是password

$ sudo rabbitmqctl add_vhost ubuntu

#創建了一個虛擬主機,主機名為ubuntu

$ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*"

#允許用戶forward訪問虛擬主機ubuntu,因為RabbitMQ通過主機名來與節點通信

$ sudo rabbitmq-server

之后我啟用RabbitMQ服務器,結果如下,成功運行:

Python中怎么使用Celery并行分布式框架

之后我安裝Redis,它的安裝比較簡單,如下:

$ sudo pip install redis

然后進行簡單的配置,只需要設置 Redis 數據庫的位置:

BROKER_URL = 'redis://localhost:6379/0'

URL的格式為:

redis://:password**@hostname**:port/db_number

URL Scheme 后的所有字段都是可選的,并且默認為 localhost 的 6379 端口,使用數據庫 0。我的配置是:

redis://:password**@ubuntu**:6379/5

之后安裝Celery,我是用標準的Python工具pip安裝的,如下:

$ sudo pip install celery

開始使用 Celery

使用celery包含三個方面:1. 定義任務函數。2. 運行celery服務。3. 客戶應用程序的調用。

為了測試Celery能否工作,我運行了一個最簡單的任務,編寫tasks.py:

from celery import Celery
# broker設置中間件,backend設置后端存儲
app = Celery('tasks',broker='redis://127.0.0.1:6379/5',backend='redis://127.0.0.1:6379/6')
@app.task
def add(x,y):
  return x+y

編輯保存退出后,我在當前目錄下運行如下命令(記得要先開啟redis):

$ celery -A tasks worker --loglevel=info

啟動一個worker

#查詢文檔,了解到該命令中-A參數表示的是Celery APP的名稱,這個實例中指的就是tasks.py(和文件名一致),后面的tasks就是APP的名稱,worker是一個執行任務角色,后面的loglevel=info記錄日志類型默認是info,這個命令啟動了一個worker,用來執行程序中add這個加法任務(task)。

然后看到界面顯示結果如下:

Python中怎么使用Celery并行分布式框架

我們可以看到Celery正常工作在名稱luanpeng-XPS15R的虛擬主機上,版本為v4.2.1,在下面的[config]中我們可以看到當前APP的名稱tasks,運輸工具transport就是我們在程序中設置的中間人redis://127.0.0.1:6379/5,result我們沒有設置,暫時顯示為disabled,然后我們也可以看到worker缺省使用perfork來執行并發,當前并發數顯示為1,然后可以看到下面的[queues]就是我們說的隊列,當前默認的隊列是celery,然后我們看到下面的[tasks]中有一個任務tasks.add.

如果你有多個不同類型的任務可以放在不同的文件夾下,比如我們在在app1文件夾創建一個tasks.py,在app2文件夾下創建一個tasks.py

我們可以這樣定義任務

celery -A app1.tasks worker --loglevel=info

注意:目錄結構和命令發起的當前目錄決定了任定義時的命令,任務定義的命令決定了任務定義的名稱,任務的名稱決定了任務調用時的名稱。

了解了這些之后,根據文檔在當前目錄,我重新打開一個terminal,然后執行Python,進入Python交互界面,用delay()方法調用任務,執行如下操作:

如果我們只有一個tasks.py文件,我們可以這樣定義任務

celery -A tasks worker --loglevel=info

那我們可以這樣調用任務start_task.py,py文件必須和tasks.py文件在同一個目錄下

from tasks import add
add.delay(6,6)  # 調用delay函數即可執行任務

如果我們在app1文件夾下有tasks.py文件,我們可以這樣定義任務

celery -A app1.tasks worker --loglevel=info

那我們可以這樣調用任務start_task.py

from app1.tasks import add
add.delay(6,6)  # 調用delay函數即可執行任務

所以定義任務和調用任務必須在同一個目錄。

執行調用任務的start_task.py文件

python start_task.py

這個任務已經由之前啟動的Worker異步執行了,然后我打開之前啟動的worker的控制臺,對輸出進行查看驗證,結果如下:

[2018-09-24 20:07:11,496: INFO/MainProcess] Received task: app1.tasks.add[8207c280-0864-4b1e-8792-155de5417406] 
[2018-09-24 20:07:11,501: INFO/ForkPoolWorker-4] Task app1.tasks.add[8207c280-0864-4b1e-8792-155de5417406] succeeded in 0.003930353002942866s: 12

第一行說明worker收到了一個任務:app1.tasks.add,這里我們和之前發送任務返回的AsyncResult對比我們發現,每個task都有一個唯一的ID,第二行說明了這個任務執行succeed,執行結果為12。

查看資料說調用任務后會返回一個AsyncResult實例,可用于檢查任務的狀態,等待任務完成或獲取返回值(如果任務失敗,則為異常和回溯)。但這個功能默認是不開啟的,需要設置一個 Celery 的結果后端(backend),也就是tasks.py設置的使用redis進行結果存儲。

通過這個例子后我對Celery有了初步的了解,然后我在這個例子的基礎上去進一步的學習。

因為Celery是用Python編寫的,所以為了讓代碼結構化一些,就像一個應用

Python中怎么使用Celery并行分布式框架

app1/app1_app.py

from celery import Celery
import os,io
# 在app1目錄同級目錄執行celery -A app1.app1_app worker -l info
app = Celery(main='app1.app1_app',include=['app1.tasks1','app1.tasks2']) # 創建app,并引入任務定義。main、include參數的值為模塊名,所以都是指定命令的相對目錄
app.config_from_object('app1.app1_config')  # 通過配置文件進行配置,而著這里是相對目錄
# broker設置中間件,backend設置后端存儲
# app = Celery('app1.app1_app',broker='redis://127.0.0.1:6379/5',backend='redis://127.0.0.1:6379/6',include=['app1.tasks1','app1.task2'])
if __name__ == "__main__":
  log_path = os.getcwd()+'/log/celery.log'
  if(not os.path.exists(log_path)):
    f = open(log_path, 'w')
    f.close()
  # 在app1目錄同級目錄執行celery -A app1.app1_app worker -l info
  app = Celery(main='app1_app',include=['tasks1', 'tasks2']) # 創建app,并引入任務定義。main、include參數的值為模塊名,所以都是指定命令的相對目錄
  app.config_from_object('app1_config') # 通過配置文件進行配置,而著這里是相對目錄
  # 使用下面的命令也可以啟動celery,不過要該模塊的名稱,是的相對目錄正確
  app.start(argv=['celery', 'worker', '-l', 'info', '-f', 'log/celery.log', "-c", "40"])

一定要注意模塊的相對目錄,和你想要執行命令的目錄

#首先創建了一個celery實例app,實例化的過程中,制定了任務名(也就是包名.模塊名),Celery的第一個參數是當前模塊的名稱,我們可以調用config_from_object()來讓Celery實例加載配置模塊,我的例子中的配置文件起名為app1_config.py,配置文件如下:

BROKER_URL = 'redis://127.0.0.1:6379/5'  # 配置broker 中間件
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/6'  # 配置backend結果存儲
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

在配置文件中我們可以對任務的執行等進行管理,比如說我們可能有很多的任務,但是我希望有些優先級比較高的任務先被執行,而不希望先進先出的等待。那么需要引入一個隊列的問題. 也就是說在我的broker的消息存儲里面有一些隊列,他們并行運行,但是worker只從對應 的隊列里面取任務。在這里我們希望tasks.py中的某些任務先被執行。task中我設置了兩個任務:

所以我通過from celery import group引入group,用來創建并行執行的一組任務。然后這塊現需要理解的就是這個@app.task,@符號在python中用作函數修飾符,到這塊我又回頭去看python的裝飾器(在代碼運行期間動態增加功能的方式)到底是如何實現的,在這里的作用就是通過task()裝飾器在可調用的對象(app)上創建一個任務。

tasks1.py

from app1.app1_app import app
@app.task
def deal1(text):
  print(text)
  return text+"======="

tasks2.py

from app1.app1_app import app
@app.task
def deal2(text):
  print(text)
  return text+"+++++++++"

隊列

了解完裝飾器后,我回過頭去整理配置的問題,前面提到任務的優先級問題,在這個例子中如果我們想讓deal1這個任務優先于deal2任務被執行,我們可以將兩個任務放到不同的隊列中,由我們決定先執行哪個任務,我們可以在配置文件app1_config.py中增加這樣配置:

from kombu import Exchange,Queue
BROKER_URL = 'redis://127.0.0.1:6379/5'  # 配置broker 中間件
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/6'  # 配置backend結果存儲
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# (當使用Redis作為broker時,Exchange的名字必須和Queue的名字一樣)
CELERY_QUEUES = (
          Queue("default", Exchange("default"), routing_key = "default"),
          Queue("for_task1", Exchange("for_task1"), routing_key="task_a"),
          Queue("for_task2", Exchange("for_task2"), routing_key="task_b")
)
# 定義任務的走向,不同的任務發送 進入不同的隊列,并為不同的任務設定不同的routing_key
# 若沒有指定這個任務route到那個Queue中去執行,此時執行此任務的時候,會route到Celery默認的名字叫做celery的隊列中去。
CELERY_ROUTES = {
  'app1.tasks1.deal1': {"queue": "for_task1", "routing_key": "task_a"},
  'app1.tasks2.deal2':{"queue":"for_task2","routing_key":"task_b"}
}

先了解了幾個常用的參數的含義:

Exchange:交換機,決定了消息路由規則;

Queue:消息隊列;

Channel:進行消息讀寫的通道;

Bind:綁定了Queue和Exchange,意即為符合什么樣路由規則的消息,將會放置入哪一個消息隊列;

我將deal1這個函數任務放在了一個叫做for_task1的隊列里面,將deal2這個函數任務放在了一個叫做for_task2的隊列里面,然后我在當前應用目錄下執行命令:

celery -A app1.app1_app worker -l info -Q for_task1

這個worker就只負責處理for_task1這個隊列的任務,這是通過在啟動worker是使用-Q Queue_Name參數指定的。

我們定義任務調用文件start_task.py

from __future__ import print_function
from app1.app1_app import app
if __name__=="__main__":
  for i in range(10):
    text = 'text'+str(i)
    app.send_task('app1.tasks1.deal1',args=[text])  # 任務的名稱必須和Celery注冊的任務名稱相同
    app.send_task('app1.tasks2.deal2',args=[text]) # 任務的名稱必須和Celery注冊的任務名稱相同
    print('push over %d'%i)

執行上述代碼文件

python start_task.py

任務已經被執行,我在worker控制臺查看結果(只有app1.appa_app.deal1任務被這個worker執行了):

[2018-09-24 22:26:38,928: INFO/ForkPoolWorker-8] Task app1.tasks1.deal1[b3007993-9bfb-4161-b5b2-4f0f022f2f8b] succeeded in 0.0008255800021288451s: 'text4======='
[2018-09-24 22:26:38,928: INFO/ForkPoolWorker-6] Task app1.tasks1.deal1[df24b991-88fc-4253-86bf-540754c62da9] succeeded in 0.004320767002354842s: 'text3======='
[2018-09-24 22:26:38,929: INFO/MainProcess] Received task: app1.tasks1.deal1[dbdf9ac0-ea27-4455-90d2-e4fe8f3e895e] 
[2018-09-24 22:26:38,930: WARNING/ForkPoolWorker-4] text5
[2018-09-24 22:26:38,931: INFO/ForkPoolWorker-4] Task app1.tasks1.deal1[dbdf9ac0-ea27-4455-90d2-e4fe8f3e895e] succeeded in 0.0006721289973938838s: 'text5======='

可以看到worker收到任務,并且執行了任務。

Scheduler ( 定時任務,周期性任務 )

在這里我們還是在交互模式下手動去執行,我們想要crontab的定時生成和執行,我們可以用celery的beat去周期的生成任務和執行任務,在這個例子中我希望每10秒鐘產生一個任務,然后去執行這個任務,我可以這樣配置(在app1_config.py文件中添加如下內容):

# 設計周期任務
CELERY_TIMEZONE = 'Asia/Shanghai'
from celery.schedules import crontab  # 設置定時任務
from datetime import timedelta
# 每隔30秒執行app1.tasks1.deal函數
CELERYBEAT_SCHEDULE = {
  'deal-every-30-seconds': {
     'task': 'app1.tasks1.deal1',
     'schedule': timedelta(seconds=30),
     'args': ['hello']
  },
  'deal-every-10-seconds': {
     'task': 'app1.tasks2.deal2',
     'schedule': timedelta(seconds=10),
     'args': ['hello']
  },
   # Executes every Monday morning at 7:30 A.M
  'deal-every-monday-morning': {
     'task': 'app1.tasks2.deal2',
     'schedule': crontab(hour=7, minute=30, day_of_week=1),
     'args': ['hello']
  },
}

使用了scheduler,要制定時區:CELERY_TIMEZONE = ‘Asia/Shanghai',啟動celery加上-B的參數。

celery -A app1.app1_app worker -l info -B

前兩個任務為周期任務,第三個任務為定時任務,指定時間點開始執行分發任務,讓worker取走執行,可以這樣配置:

看完這些基礎的東西,我回過頭對celery在回顧了一下,用圖把它的框架大致畫出來,如下圖:

Python中怎么使用Celery并行分布式框架

關于Python中怎么使用Celery并行分布式框架就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女