溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

43進程_multiprocessing_concurrent

發布時間:2020-07-11 23:21:08 來源:網絡 閱讀:353 作者:chaijowin 欄目:編程語言

?

?

?

目錄

multiprocessing模塊:... 1

多進程舉例:... 2

multiprocessing.Pool,進程池:... 3

concurrent包:... 3

?

?

?

多進程:

由于pyGIL,多線程未必是cpu密集型程序好的選擇;

?

multiprocessing模塊:

多進程,可在完全獨立的進程環境中運行程序,可充分利用多處理器;

但進程本身的隔離帶來的數據不共享是個問題;

線程比進程輕量級;

?

Process類:

遵循了Thread類的API,減少了學習難度;

多進程,一定要在__main__()中,否則拋錯;

?

p=multiprocessing.Process()

p.pid,進程ID;

p.exitcode,進程的退出狀態碼;

p.terminate(),終止指定的進程;

?

進程間同步:

提供了和線程同步一樣的類,使用的方法一樣,使用的效果也類似;

不過,進程間代價要高于線程,而且底層實現不同,只不過py屏蔽了這些,讓用戶能簡單使用(py提供的庫抹平了它們之間的差別,為方便使用);

?

multiprocessing還提供了共享內存、服務器進程來共享數據;還提供了Queue、Pipe(上一個進程的標準輸出到下一個進程的標準輸入)用來進程間通信;

Queue.queue是線程級別的鎖;

multiprocessing.Queue可跨進程用,用的少;此時應用第三方工具MQ,單機進程間通信用的也少,一般應用都是跨節點的進程間通信,即RPC;RPC框架很多,如swfit、dubbo;

?

通信方式不同:

多進程就是啟動多個解釋器進程,進程間通信必須序列化、反序列化;

數據的線程安全性問題,由于每個進程中沒有實現多線程,GIL可以說沒用了;

?

進程池:

只允許使用計算機這么多資源(不能搶占計算機其它資源);

很多進程要反復創建的情形下,用進程池;

?

多進程、多線程的選擇:

CPU密集型,cpython中使用GIL,多線程時鎖相互競爭,多核優勢不能發揮,用py多進程(多個解釋器進程)效率更高;

IO密集型,適合用多線程,減少IO序列化開銷,且在IO等待時,切換到其它線程繼續執行,效率不錯;

?

應用場景:

請求-應答模型;

web應用中常見的處理模型,用多進程+多線程;

如,nginx工作模式:

master啟動多個worker工作進程,一般和cpu數目相同;

worker中啟動多線程,提高并發處理能力,worker處理用戶的請求,往往需要等待數據;

?

nginx應就地本地編譯(本地指令集,甚至用指定cpu(如intel)編譯器編譯,這樣可優化指令,性能更高);

?

?

多進程舉例:

例:

def calc(i):

??? sum = 0

??? for _ in range(100000000):

??????? sum += 1

?

if __name__ == '__main__':?? #使用多進程,必須要在main中執行,否則報錯

??? start = datetime.datetime.now()

?

??? lst = []

??? for i in range(5):

??????? p = multiprocessing.Process(target=calc, args=(i,), name='p-{}'.format(i))

??????? p.start()

??????? lst.append(p)

??? for p in lst:

??????? p.join()

?

??? delta = (datetime.datetime.now()-start).total_seconds()

??? print(delta)?? #win上查看,有5py進程

輸出:

20.037146

?

?

multiprocessing.Pool,進程池:

例,進程池:

def calc(i):

??? sum = 0

??? for _ in range(100000000):

??????? sum += 1

?

if __name__ == '__main__':

??? start = datetime.datetime.now()

?

??? pool = multiprocessing.Pool(5)

??? for i in range(5):

??????? pool.apply_async(calc,args=(i,))

??? pool.close()?? #重要,要有此步

??? pool.join()

?

??? delta = (datetime.datetime.now()-start).total_seconds()

??? print(delta)

?

?

?

concurrent包:

目前僅有一個concurrent.futures,3.2引入;

異步并行任務編程模塊,提供一個高級的異步可執行的便利接口;

?

提供了2個池執行器:

ThreadPoolExecutor,異步調用的線程池的Executor;

ProcessPoolExecutor,異步調用的進程池的Executor;

?

from conncurrent import futures

executor=futures.ThreadPoolExecutor(max_workers=1),池中至多創建max_workers個線程來同時異步執行,默認1個,返回Executor實例;

f=executor.submit(fn,*args,**kwargs),提交執行的函數及其參數,返回Futures實例;

executor.shutdown(wait=True),清理池;

?

Future類(submit()的實例):

f=executor.submit(work,2)

f.result(),可查看調用的返回的結果,函數的return結果;

f.done(),如果調用被成功的取消或執行完成,返回True,是標志,注意不是函數的返回值;

f.cancelled(),如果調用被成功的取消,返回True;

f.running(),如果正在運行且不能被取消,返回True;

f.cancel(),嘗試取消調用,如果已經執行且不能取消,返回False,否則返回True;

f.result(timeout=None),取返回的結果,超時為None,一直等待返回;

f.exception(timeout=None),取返回的異常,超時為None,一直等待返回;

?

支持上下文管理:

futures.ThreadPoolExecutor繼承自_base.Executor,父類有__enter__()__exit__()方法;

例:

with futures.ThreadPoolExecutor(max_workers=1) as executor:

??? future = executor.submit(pow, 2, 3)

??? print(future.result())

輸出:

8

?

總結:

統一了線程池、進程池調用,簡化了編程;

py簡單的思想哲學的體現;

唯一的缺點,無法設置線程名稱;

?

例,多線程:

def work(n):

??? logging.info('working-{}'.format(n))

??? time.sleep(5)

??? logging.info('end-work-{}'.format(n))

?

executor = futures.ThreadPoolExecutor(3)

?

fs = []

?

for i in range(3):

??? f = executor.submit(work, i)

??? fs.append(f)

?

for i in range(3):

??? f = executor.submit(work, i)

??? fs.append(f)

?

for i in range(3):

??? f = executor.submit(work, i)

??? fs.append(f)

?

while True:

??? time.sleep(2)

??? logging.info(threading.enumerate())

??? flag = True

??? for f in fs:

??????? flag = flag and f.done()

??????? if flag:

??????????? executor.shutdown()

??????????? logging.info(threading.enumerate())

??????????? break

?

例,多進程:

def work(n):

??? logging.info('working-{}'.format(n))

??? time.sleep(5)

??? logging.info('end-work-{}'.format(n))

?

if __name__ == '__main__':

??? executor = futures.ProcessPoolExecutor(3)

??? fs = []

?

??? for i in range(3):

??????? f = executor.submit(work, i)

??????? fs.append(f)

?

??? for i in range(3):

??????? f = executor.submit(work, i)

??????? fs.append(f)

?

??? for i in range(3):

??????? f = executor.submit(work, i)

??????? fs.append(f)

?

??? while True:

??????? time.sleep(2)

??????? logging.info(threading.enumerate())

??????? flag = True

??????? for f in fs:

??????????? flag = flag and f.done()

??????????? if flag:

??????????????? executor.shutdown()

??????????????? # logging.info(threading.enumerate())?? #多進程用不上

???????????? ???break

?

?


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女