這篇文章主要講解了flask怎么開啟多線程,內容清晰明了,對此有興趣的小伙伴可以學習一下,相信大家閱讀完之后會有幫助。
在我之前解釋了flask如何支持多線程主要通過兩個類來實現,LocalStack和Local,在Local中有兩個屬性,__storage__和__ident_func__,后者用來獲取線程id,從而區分不同線程發來的請求
這次要說的是flask如何開啟多線程
先從app.run()這個方法看起
def run(self, host=None, port=None, debug=None, **options): from werkzeug.serving import run_simple if host is None: host = '127.0.0.1' if port is None: server_name = self.config['SERVER_NAME'] if server_name and ':' in server_name: port = int(server_name.rsplit(':', 1)[1]) else: port = 5000 if debug is not None: self.debug = bool(debug) options.setdefault('use_reloader', self.debug) options.setdefault('use_debugger', self.debug) try: run_simple(host, port, self, **options) #會進入這個函數 finally: # reset the first request information if the development server # reset normally. This makes it possible to restart the server # without reloader and that stuff from an interactive shell. self._got_first_request = False
經過判斷和設置后進入run_simple()這個函數,看下源碼
def run_simple(hostname, port, application, use_reloader=False,
use_debugger=False, use_evalex=True, extra_files=None, reloader_interval=1, reloader_type='auto', threaded=False, processes=1, request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None): """Start a WSGI application. Optional features include a reloader, multithreading and fork support. This function has a command-line interface too:: python -m werkzeug.serving --help .. versionadded:: 0.5 `static_files` was added to simplify serving of static files as well as `passthrough_errors`. .. versionadded:: 0.6 support for SSL was added. .. versionadded:: 0.8 Added support for automatically loading a SSL context from certificate file and private key. .. versionadded:: 0.9 Added command-line interface. .. versionadded:: 0.10 Improved the reloader and added support for changing the backend through the `reloader_type` parameter. See :ref:`reloader` for more information. :param hostname: The host for the application. eg: ``'localhost'`` :param port: The port for the server. eg: ``8080`` :param application: the WSGI application to execute :param use_reloader: should the server automatically restart the python process if modules were changed? :param use_debugger: should the werkzeug debugging system be used? :param use_evalex: should the exception evaluation feature be enabled? :param extra_files: a list of files the reloader should watch additionally to the modules. For example configuration files. :param reloader_interval: the interval for the reloader in seconds. :param reloader_type: the type of reloader to use. The default is auto detection. Valid values are ``'stat'`` and ``'watchdog'``. See :ref:`reloader` for more information. :param threaded: should the process handle each request in a separate thread? :param processes: if greater than 1 then handle each request in a new process up to this maximum number of concurrent processes. :param request_handler: optional parameter that can be used to replace the default one. You can use this to replace it with a different :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass. :param static_files: a list or dict of paths for static files. This works exactly like :class:`SharedDataMiddleware`, it's actually just wrapping the application in that middleware before serving. :param passthrough_errors: set this to `True` to disable the error catching. This means that the server will die on errors but it can be useful to hook debuggers in (pdb etc.) :param ssl_context: an SSL context for the connection. Either an :class:`ssl.SSLContext`, a tuple in the form ``(cert_file, pkey_file)``, the string ``'adhoc'`` if the server should automatically create one, or ``None`` to disable SSL (which is the default). """ if not isinstance(port, int): raise TypeError('port must be an integer') if use_debugger: from werkzeug.debug import DebuggedApplication application = DebuggedApplication(application, use_evalex) if static_files: from werkzeug.wsgi import SharedDataMiddleware application = SharedDataMiddleware(application, static_files) def log_startup(sock): display_hostname = hostname not in ('', '*') and hostname or 'localhost' if ':' in display_hostname: display_hostname = '[%s]' % display_hostname quit_msg = '(Press CTRL+C to quit)' port = sock.getsockname()[1] _log('info', ' * Running on %s://%s:%d/ %s', ssl_context is None and 'http' or 'https', display_hostname, port, quit_msg) def inner(): try: fd = int(os.environ['WERKZEUG_SERVER_FD']) except (LookupError, ValueError): fd = None srv = make_server(hostname, port, application, threaded, processes, request_handler, passthrough_errors, ssl_context, fd=fd) if fd is None: log_startup(srv.socket) srv.serve_forever() if use_reloader: # If we're not running already in the subprocess that is the # reloader we want to open up a socket early to make sure the # port is actually available. if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': if port == 0 and not can_open_by_fd: raise ValueError('Cannot bind to a random port with enabled ' 'reloader if the Python interpreter does ' 'not support socket opening by fd.') # Create and destroy a socket so that any exceptions are # raised before we spawn a separate Python interpreter and # lose this ability. address_family = select_ip_version(hostname, port) s = socket.socket(address_family, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(get_sockaddr(hostname, port, address_family)) if hasattr(s, 'set_inheritable'): s.set_inheritable(True) # If we can open the socket by file descriptor, then we can just # reuse this one and our socket will survive the restarts. if can_open_by_fd: os.environ['WERKZEUG_SERVER_FD'] = str(s.fileno()) s.listen(LISTEN_QUEUE) log_startup(s) else: s.close() # Do not use relative imports, otherwise "python -m werkzeug.serving" # breaks. from werkzeug._reloader import run_with_reloader run_with_reloader(inner, extra_files, reloader_interval, reloader_type) else: inner() #默認會執行
經過判斷和設置后進入run_simple()這個函數,看下源碼
def run_simple(hostname, port, application, use_reloader=False,
use_debugger=False, use_evalex=True, extra_files=None, reloader_interval=1, reloader_type='auto', threaded=False, processes=1, request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None): """Start a WSGI application. Optional features include a reloader, multithreading and fork support. This function has a command-line interface too:: python -m werkzeug.serving --help .. versionadded:: 0.5 `static_files` was added to simplify serving of static files as well as `passthrough_errors`. .. versionadded:: 0.6 support for SSL was added. .. versionadded:: 0.8 Added support for automatically loading a SSL context from certificate file and private key. .. versionadded:: 0.9 Added command-line interface. .. versionadded:: 0.10 Improved the reloader and added support for changing the backend through the `reloader_type` parameter. See :ref:`reloader` for more information. :param hostname: The host for the application. eg: ``'localhost'`` :param port: The port for the server. eg: ``8080`` :param application: the WSGI application to execute :param use_reloader: should the server automatically restart the python process if modules were changed? :param use_debugger: should the werkzeug debugging system be used? :param use_evalex: should the exception evaluation feature be enabled? :param extra_files: a list of files the reloader should watch additionally to the modules. For example configuration files. :param reloader_interval: the interval for the reloader in seconds. :param reloader_type: the type of reloader to use. The default is auto detection. Valid values are ``'stat'`` and ``'watchdog'``. See :ref:`reloader` for more information. :param threaded: should the process handle each request in a separate thread? :param processes: if greater than 1 then handle each request in a new process up to this maximum number of concurrent processes. :param request_handler: optional parameter that can be used to replace the default one. You can use this to replace it with a different :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass. :param static_files: a list or dict of paths for static files. This works exactly like :class:`SharedDataMiddleware`, it's actually just wrapping the application in that middleware before serving. :param passthrough_errors: set this to `True` to disable the error catching. This means that the server will die on errors but it can be useful to hook debuggers in (pdb etc.) :param ssl_context: an SSL context for the connection. Either an :class:`ssl.SSLContext`, a tuple in the form ``(cert_file, pkey_file)``, the string ``'adhoc'`` if the server should automatically create one, or ``None`` to disable SSL (which is the default). """ if not isinstance(port, int): raise TypeError('port must be an integer') if use_debugger: from werkzeug.debug import DebuggedApplication application = DebuggedApplication(application, use_evalex) if static_files: from werkzeug.wsgi import SharedDataMiddleware application = SharedDataMiddleware(application, static_files) def log_startup(sock): display_hostname = hostname not in ('', '*') and hostname or 'localhost' if ':' in display_hostname: display_hostname = '[%s]' % display_hostname quit_msg = '(Press CTRL+C to quit)' port = sock.getsockname()[1] _log('info', ' * Running on %s://%s:%d/ %s', ssl_context is None and 'http' or 'https', display_hostname, port, quit_msg) def inner(): try: fd = int(os.environ['WERKZEUG_SERVER_FD']) except (LookupError, ValueError): fd = None srv = make_server(hostname, port, application, threaded, processes, request_handler, passthrough_errors, ssl_context, fd=fd) if fd is None: log_startup(srv.socket) srv.serve_forever() if use_reloader: # If we're not running already in the subprocess that is the # reloader we want to open up a socket early to make sure the # port is actually available. if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': if port == 0 and not can_open_by_fd: raise ValueError('Cannot bind to a random port with enabled ' 'reloader if the Python interpreter does ' 'not support socket opening by fd.') # Create and destroy a socket so that any exceptions are # raised before we spawn a separate Python interpreter and # lose this ability. address_family = select_ip_version(hostname, port) s = socket.socket(address_family, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(get_sockaddr(hostname, port, address_family)) if hasattr(s, 'set_inheritable'): s.set_inheritable(True) # If we can open the socket by file descriptor, then we can just # reuse this one and our socket will survive the restarts. if can_open_by_fd: os.environ['WERKZEUG_SERVER_FD'] = str(s.fileno()) s.listen(LISTEN_QUEUE) log_startup(s) else: s.close() # Do not use relative imports, otherwise "python -m werkzeug.serving" # breaks. from werkzeug._reloader import run_with_reloader run_with_reloader(inner, extra_files, reloader_interval, reloader_type) else: inner() #默認會執行
還是經過一系列判斷后默認會進入inner()函數,這個函數定義在run_simple()內,屬于閉包,inner()中會執行make_server()這個函數,看下源碼:
def make_server(host=None, port=None, app=None, threaded=False, processes=1,
request_handler=None, passthrough_errors=False, ssl_context=None, fd=None): """Create a new server instance that is either threaded, or forks or just processes one request after another. """ if threaded and processes > 1: raise ValueError("cannot have a multithreaded and " "multi process server.") elif threaded: return ThreadedWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd) elif processes > 1: return ForkingWSGIServer(host, port, app, processes, request_handler, passthrough_errors, ssl_context, fd=fd) else: return BaseWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd)
看到這也很明白了,想要配置多線程或者多進程,則需要設置threaded或processes這兩個參數,而這兩個參數是從app.run()中傳遞過來的:
app.run(**options) ---> run_simple(threaded,processes) ---> make_server(threaded,processes)
默認情況下flask是單線程,單進程的,想要開啟只需要在run中傳入對應的參數:app.run(threaded=True)即可.
從make_server中可知,flask提供了三種server:ThreadedWSGIServer,ForkingWSGIServer,BaseWSGIServer,默認情況下是BaseWSGIServer
以線程為例,看下ThreadedWSGIServer這個類:
class ThreadedWSGIServer(ThreadingMixIn, BaseWSGIServer): #繼承自ThreadingMixIn, BaseWSGIServer
"""A WSGI server that does threading.""" multithread = True daemon_threads = True
ThreadingMixIn = socketserver.ThreadingMixIn
class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
process_request就是對每個請求產生一個新的線程來處理
最后寫一個非常簡單的應用來驗證以上說法:
from flask import Flask
from flask import _request_ctx_stackapp = Flask(__name__)
@app.route('/')
def index():
print(_request_ctx_stack._local.__ident_func__()) while True: pass return '<h2>hello</h2>'
app.run() #如果需要開啟多線程則app.run(threaded=True)
_request_ctx_stack._local.__ident_func__()對應這get_ident()這個函數,返回當前線程id,為什么要在后面加上while True這句呢,我們看下get_ident()這個函數的說明:
Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.
關鍵字我已經加粗了,線程id會在線程結束后重復利用,所以我在路由函數中加了這個死循環來阻塞請求以便于觀察到不同的id,這就會產生兩種情況:
1.沒開啟多線程的情況下,一次請求過來,服務器直接阻塞,并且之后的其他請求也都阻塞
2.開啟多線程情況下,每次都會打印出不同的線程id
結果:
第一種情況
Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
139623180527360
第二種情況
Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
140315469436672
140315477829376
140315486222080
140315316901632
140315105163008
140315096770304
140315088377600
結果顯而易見
綜上所述:flask支持多線程,但默認沒開啟,其次app.run()只適用于開發環境,生產環境下可以使用uWSGI,Gunicorn等web服務器
內容擴展:
flask開啟多線程還是多進程
Flask 默認是單進程,單線程阻塞的任務模式,在項目上線的時候可以通過nginx+gunicorn 的方式部署flask任務。
但是在開發的過程中如果想通過延遲的方式測試高并發怎么實現呢,其實非常簡單,
app.run()中可以接受兩個參數,分別是threaded和processes,用于開啟線程支持和進程支持。
1.threaded : 多線程支持,默認為False,即不開啟多線程;
2.processes:進程數量,默認為1.
開啟方式:
if __name__ == '__main__': app.run(threaded=True) # app.run(processes=4)
注意:多進程或多線程只能選擇一個,不能同時開啟。
看完上述內容,是不是對flask怎么開啟多線程有進一步的了解,如果還想學習更多內容,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。