?
?
?
目錄
multiprocessing模塊:... 1
多進程舉例:... 2
multiprocessing.Pool,進程池:... 3
concurrent包:... 3
?
?
?
多進程:
由于py的GIL,多線程未必是cpu密集型程序好的選擇;
?
多進程,可在完全獨立的進程環境中運行程序,可充分利用多處理器;
但進程本身的隔離帶來的數據不共享是個問題;
線程比進程輕量級;
?
Process類:
遵循了Thread類的API,減少了學習難度;
多進程,一定要在__main__()中,否則拋錯;
?
p=multiprocessing.Process()
p.pid,進程ID;
p.exitcode,進程的退出狀態碼;
p.terminate(),終止指定的進程;
?
進程間同步:
提供了和線程同步一樣的類,使用的方法一樣,使用的效果也類似;
不過,進程間代價要高于線程,而且底層實現不同,只不過py屏蔽了這些,讓用戶能簡單使用(py提供的庫抹平了它們之間的差別,為方便使用);
?
multiprocessing還提供了共享內存、服務器進程來共享數據;還提供了Queue、Pipe(上一個進程的標準輸出到下一個進程的標準輸入)用來進程間通信;
Queue.queue是線程級別的鎖;
multiprocessing.Queue可跨進程用,用的少;此時應用第三方工具MQ,單機進程間通信用的也少,一般應用都是跨節點的進程間通信,即RPC;RPC框架很多,如swfit、dubbo;
?
通信方式不同:
多進程就是啟動多個解釋器進程,進程間通信必須序列化、反序列化;
數據的線程安全性問題,由于每個進程中沒有實現多線程,GIL可以說沒用了;
?
進程池:
只允許使用計算機這么多資源(不能搶占計算機其它資源);
很多進程要反復創建的情形下,用進程池;
?
多進程、多線程的選擇:
CPU密集型,cpython中使用GIL,多線程時鎖相互競爭,多核優勢不能發揮,用py多進程(多個解釋器進程)效率更高;
IO密集型,適合用多線程,減少IO序列化開銷,且在IO等待時,切換到其它線程繼續執行,效率不錯;
?
應用場景:
請求-應答模型;
web應用中常見的處理模型,用多進程+多線程;
如,nginx工作模式:
master啟動多個worker工作進程,一般和cpu數目相同;
worker中啟動多線程,提高并發處理能力,worker處理用戶的請求,往往需要等待數據;
?
nginx應就地本地編譯(本地指令集,甚至用指定cpu(如intel)編譯器編譯,這樣可優化指令,性能更高);
?
?
例:
def calc(i):
??? sum = 0
??? for _ in range(100000000):
??????? sum += 1
?
if __name__ == '__main__':?? #使用多進程,必須要在main中執行,否則報錯
??? start = datetime.datetime.now()
?
??? lst = []
??? for i in range(5):
??????? p = multiprocessing.Process(target=calc, args=(i,), name='p-{}'.format(i))
??????? p.start()
??????? lst.append(p)
??? for p in lst:
??????? p.join()
?
??? delta = (datetime.datetime.now()-start).total_seconds()
??? print(delta)?? #win上查看,有5個py進程
輸出:
20.037146
?
?
例,進程池:
def calc(i):
??? sum = 0
??? for _ in range(100000000):
??????? sum += 1
?
if __name__ == '__main__':
??? start = datetime.datetime.now()
?
??? pool = multiprocessing.Pool(5)
??? for i in range(5):
??????? pool.apply_async(calc,args=(i,))
??? pool.close()?? #重要,要有此步
??? pool.join()
?
??? delta = (datetime.datetime.now()-start).total_seconds()
??? print(delta)
?
?
?
目前僅有一個concurrent.futures,3.2引入;
異步并行任務編程模塊,提供一個高級的異步可執行的便利接口;
?
提供了2個池執行器:
ThreadPoolExecutor,異步調用的線程池的Executor;
ProcessPoolExecutor,異步調用的進程池的Executor;
?
from conncurrent import futures
executor=futures.ThreadPoolExecutor(max_workers=1),池中至多創建max_workers個線程來同時異步執行,默認1個,返回Executor實例;
f=executor.submit(fn,*args,**kwargs),提交執行的函數及其參數,返回Futures實例;
executor.shutdown(wait=True),清理池;
?
Future類(submit()的實例):
f=executor.submit(work,2)
f.result(),可查看調用的返回的結果,函數的return結果;
f.done(),如果調用被成功的取消或執行完成,返回True,是標志,注意不是函數的返回值;
f.cancelled(),如果調用被成功的取消,返回True;
f.running(),如果正在運行且不能被取消,返回True;
f.cancel(),嘗試取消調用,如果已經執行且不能取消,返回False,否則返回True;
f.result(timeout=None),取返回的結果,超時為None,一直等待返回;
f.exception(timeout=None),取返回的異常,超時為None,一直等待返回;
?
支持上下文管理:
futures.ThreadPoolExecutor繼承自_base.Executor,父類有__enter__()和__exit__()方法;
例:
with futures.ThreadPoolExecutor(max_workers=1) as executor:
??? future = executor.submit(pow, 2, 3)
??? print(future.result())
輸出:
8
?
總結:
統一了線程池、進程池調用,簡化了編程;
是py簡單的思想哲學的體現;
唯一的缺點,無法設置線程名稱;
?
例,多線程:
def work(n):
??? logging.info('working-{}'.format(n))
??? time.sleep(5)
??? logging.info('end-work-{}'.format(n))
?
executor = futures.ThreadPoolExecutor(3)
?
fs = []
?
for i in range(3):
??? f = executor.submit(work, i)
??? fs.append(f)
?
for i in range(3):
??? f = executor.submit(work, i)
??? fs.append(f)
?
for i in range(3):
??? f = executor.submit(work, i)
??? fs.append(f)
?
while True:
??? time.sleep(2)
??? logging.info(threading.enumerate())
??? flag = True
??? for f in fs:
??????? flag = flag and f.done()
??????? if flag:
??????????? executor.shutdown()
??????????? logging.info(threading.enumerate())
??????????? break
?
例,多進程:
def work(n):
??? logging.info('working-{}'.format(n))
??? time.sleep(5)
??? logging.info('end-work-{}'.format(n))
?
if __name__ == '__main__':
??? executor = futures.ProcessPoolExecutor(3)
??? fs = []
?
??? for i in range(3):
??????? f = executor.submit(work, i)
??????? fs.append(f)
?
??? for i in range(3):
??????? f = executor.submit(work, i)
??????? fs.append(f)
?
??? for i in range(3):
??????? f = executor.submit(work, i)
??????? fs.append(f)
?
??? while True:
??????? time.sleep(2)
??????? logging.info(threading.enumerate())
??????? flag = True
??????? for f in fs:
??????????? flag = flag and f.done()
??????????? if flag:
??????????????? executor.shutdown()
??????????????? # logging.info(threading.enumerate())?? #多進程用不上
???????????? ???break
?
?
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。