本篇內容主要講解“怎么解決windows中python3使用multiprocessing.Pool時出現的問題”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“怎么解決windows中python3使用multiprocessing.Pool時出現的問題”吧!
Python主要應用于:1、Web開發;2、數據科學研究;3、網絡爬蟲;4、嵌入式應用開發;5、游戲開發;6、桌面應用開發。
例如:
from multiprocessing import Pool def f(x): return x*x pool = Pool(processes=4) r=pool.map(f, range(100)) pool.close() pool.join()
在spyder里運行直接沒反應;在shell窗口里,直接報錯,如下:
Process SpawnPoolWorker-15: Traceback (most recent call last): File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstr self.run() File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker task = get() File "C:\Anaconda3\lib\multiprocessing\queues.py", line 357, in get return ForkingPickler.loads(res) AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
解決:
Windows下面的multiprocessing跟Linux下面略有不同,Linux下面基于fork,fork之后所有的本地變量都復制一份,因此可以使用任意的全局變量;在Windows下面,多進程是通過啟動新進程完成的,所有的全局變量都是重新初始化的,在運行過程中動態生成、修改過的全局變量是不能使用的。
multiprocessing內部使用pickling傳遞map的參數到不同的進程,當傳遞一個函數或類時,pickling將函數或者類用所在模塊+函數/類名的方式表示,如果對端的Python進程無法在對應的模塊中找到相應的函數或者類,就會出錯。
當你在Interactive Console當中創建函數的時候,這個函數是動態添加到__main__模塊中的,在重新啟動的新進程當中不存在,所以會出錯。
當不在Console中,而是在獨立Python文件中運行時,你會遇到另一個問題:由于你下面調用multiprocessing的代碼沒有保護,在新進程加載這個模塊的時候會重新執行這段代碼,創建出新的multiprocessing池,無限調用下去。
解決這個問題的方法是永遠把實際執行功能的代碼加入到帶保護的區域中:if __name__ == '__mian__':
補充知識:multiprocessing Pool的異常處理問題
multiprocessing.Pool開發多進程程序時,在某個子進程執行函數使用了mysql-python連接數據庫,
由于程序設計問題,沒有捕獲到所有異常,導致某個異常錯誤直接拋到Pool中,導致整個Pool掛了,其異常錯誤如下所示:
Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner self.run() File "/usr/lib64/python2.7/threading.py", line 765, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib64/python2.7/multiprocessing/pool.py", line 376, in _handle_results task = get() File "/usr/lib/python2.7/site-packages/mysql/connector/errors.py", line 194, in __init__ 'msg': self.msg.encode('utf8') if PY2 else self.msg AttributeError: ("'int' object has no attribute 'encode'", <class 'mysql.connector.errors.Error'>, (2055, "2055: Lost Connection to MySQL '192.169.36.189:3306', system error: timed out", None))
本文檔基于以上問題對multiprocessing.Pool以及python-mysql-connector的源碼實現進行分析,以定位具體的錯誤原因。解決方法其實很簡單,不要讓異常拋到Pool里就行。
問題產生場景
python 版本centos7.3自帶的2.7.5版本,或者最新的python-2.7.14
mysql-connector庫,版本是2.0及以上,可到官網下載最新版:mysql-connector
問題發生的code其實可以簡化為如下所示:
from multiprocessing import Pool, log_to_stderr import logging import mysql.connector # open multiprocessing lib log log_to_stderr(level=logging.DEBUG) def func(): raise mysql.connector.Error("demo test", 100) if __name__ == "__main__": p = Pool(3) res = p.apply_async(func) res.get()
所以解決問題很簡單,在func里加個try-except就可以了。但是如果你好奇為什么為出現AttributeError的異常,那么可以繼續往下看。
Multiprocessing.Pool的實現
通過查看源碼,大致上multiprocess.Pool的實現如下圖所示:
當我們執行以下語句時,主進程會創建三個子線程:_handle_workers、_handle_results、_handle_tasks;同時會創建Pool(n)個數的worker子進程。主進程與各個worker子進程間的通信使用內部定義的Queue,其實就是Pipe管道通信,如上圖的_taskqueue、_inqueue和_outqueue。
p = Pool(3) res = p.apply_async(func) res.get()
這三個子線程的作用是:
1. handle_workers線程管理worker進程,使進程池維持Pool(n)個worker進程數;
2. handle_tasks線程將用戶的任務(包括job_id, 處理函數func等信息)傳遞到_inqueue中,子進程們競爭獲取任務,然后運行相關函數,將結果放在_outqueue中,然后繼續監聽tasksqueue的任務列表。其實就是典型的生產消費問題。
3. handle_results線程監聽_outQqueue的內容,有就拿到,通過字典_cache找到對應的job,將結果存儲在*Result對象中,釋放該job的信號量,表明job執行完畢。此后,就可以通過*Result.get()函數獲取執行結果。
當我們調用p.apply_async 或者p.map時,其實就是創建了AsyncResult或者MapResult對象,然后將task放到_taskqueue中;調用*Result.get()方法等待task被worker子進程執行完成,獲取執行結果。
在知道了multprocess.Pool的實現邏輯后,現在我們來探索下,當func將異常拋出時,Pool的worker是怎么處理的。下面的代碼是pool.worker工作子進程的核心執行函數的簡化版。
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): ... while xxx: try: task = get() except: ... job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) ... try: put((job, i, result)) except Exception, e: ...
從代碼中可以看到,在執行func時,如果func拋出異常,那么worker會將異常對象直接放入到_outqueue中,然后等待下一個task。也就是說,worker是可以處理異常的。
那么接下來看看_handle_result線程是怎么處理worker發過來的結果的。如下所示:
@staticmethod def _handle_results(outqueue, get, cache): while 1: try: task = get() except (IOError, EOFError): return ...
上述代碼為_handle_result的主要處理邏輯,可以看到,它只對 IOError, EOFError進行了處理,也就是說,如果在get()時發生了其它異常錯誤,將導致_handle_result這個線程直接退出(而事實上的確如此)。既然_handle_result退出了,那么就沒有動作來觸發_cache中*Result對象釋放信號量,則用戶的執行流程就一直處于wait狀態。這樣,用戶主進程就會一直卡在get()中,導致主流程執行不下去。
我們通過打開multiprocessing庫的日志(log_to_stderr(level=logging.DEBUG)),然后修改multiprocessing.Pool中_handel_result的代碼,加上一個except Exception,然后運行文章一開始的的異常代碼,如下所示:
# multiprocessing : pool.py # class Pool(object): @staticmethod def _handle_results(outqueue, get, cache): while 1: try: task = get() except (IOError, EOFError): return except Exception: debug("handle_result not catch Exceptions.") return ...
控制臺如果輸出"handle_result not catch Exceptions.",表明_handle_results沒有catch到所有的異常。而實際上,真的是由于task = get()這句話拋異常了。
那么,_outqueue.get()方法做了什么。深入查看源碼,發現get()方法其實就是os.pipe的read/write方法,但是做了一些處理吧。其內部實現大致如下:
def Pipe(duplex=True): ... fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False) # get c2 = _multiprocessing.Connection(fd2, readable=False) # put return c1, c2
_multiprocessing.Connection內部使用了C的實現,就不再深入了,否則會就越來越復雜了。它內部應該使用了pickle庫,在put時將對象實例pickle(也就是序列化吧),然后在get時將實例unpikcle,重新生成實例對象。具體可查看python官方文檔關于pickle的介紹(包括object可pickle的條件以及在unpickle時調用的方法等)。不管如何,就是實例在get,即unpickle的過程出錯了。
'msg': self.msg.encode('utf8') if PY2 else self.msg
AttributeError: 'int' object has no attribute 'encode'
從上述錯誤日志中可以看到,表明在重構時msg參數傳入了int類型變量。就是說在unpickle階段,Mysql Error重新實例化時執行了__init__()方法,但是傳參錯誤了。為了驗證這一現象,我將MySql Error的__init__()進行簡化,最終確認到self.args的賦值上,即Exception及其子類在unpickle時會調用__init__()方法,并將self.args作為參數列表傳遞給__init__()。
通過以下代碼可以簡單的驗證問題:
import os from multiprocessing import Pipe class DemoError(Exception): def __init__(msg, errno): print "msg: %s, errno: %s" % (msg, errno) self.args = ("aa", "bb") def func(): raise DemoError("demo test", 100) r, w = Pipe(duplex=False) try: result = (True, func(1)) except Exception, e: result = (False, e) print "send result" w.send(result) print "get result" res = r.recv() print "finished."
日志會在recv調用時打印 msg: aa, errno: bb,表明recv異常類Exception時會將self.args作為參數傳入init()函數中。而Mysql的Error類重寫self.args變量,而且順序不對,導致msg在執行編碼時出錯。MySql Error的實現簡化如下:
class Error(Exception): def __init__(self, msg=None, errno=None, values=None, sqlstate=None): super(Error, self).__init__() ... if self.msg and self.errno != -1: fields = { 'errno': self.errno, 'msg': self.msg.encode('utf-8') if PY2 else self.msg } ... self.args = (self.errno, self._full_msg, self.sqlstate)
可以看到,mysql Error中的self.args與__init__(msg, errno, values, sqlstate)的順序不一,因此self.args第一個參數errno傳給了msg,導致AttributeError。至于self.args是什么,簡單查了下,是Exception類中定義的,一般用__str__或者__repr__方法的輸出,python官方文檔不建議overwrite。
總結
好吧,說了這么多,通過問題的追蹤,我們也基本上了解清楚multiprocessing.Pool庫的實現了。事實上,也很難說是誰的bug,是兩者共同作用下出現的。不管如何,希望在用到multiprocessing庫時,特別與Pipe相關時,謹慎點使用,最好的不要讓異常跑到multiprocess中處理,應該在func中將所有的異常處理掉,如果有自己定于的異常類,請最好保證self.args的順序與__init__()的順序一致。同時,網上好像也聽說使用multprocessing和subprocess庫出現問題,或許也是這個異常拋出的問題,畢竟suprocessError定義與Exception好像有些區別。
到此,相信大家對“怎么解決windows中python3使用multiprocessing.Pool時出現的問題”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。