溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

42線程4_Semaphore_GIL

發布時間:2020-07-11 23:37:25 來源:網絡 閱讀:561 作者:chaijowin 欄目:編程語言

?

?

目錄

threading.Semaphore類:... 1

數據結構Queue... 5

GIL. 5

?

?

?

threading.Semaphore類:

線程同步技術;

Lock很像,信號量內部維護一個倒計數器,每一次acquire都會減1release1),當acquire方法發現計數為0就阻塞,直至其它線程對信號量release后,計數大于0,恢復阻塞的線程;

?

倒計數器永遠不會<0,是非負數,因為acquire時發現是0都會被阻塞;

linux OS的信號完全不同;

?

s=threading.Semaphore(value=1),構造方法,value<0ValueError異常;

s.acquire(blocking=True,timeout=None),獲取信號量,計數器減1,獲取成功返回True;

s.release(),釋放信號量,計數器加1;

?

應用場景:

連接池;

因為資源有限,每開啟一個,連接成本很高(TCP三次握手四次揮手等),所以使用連接池;

?

threading.BoundedSemaphore類:

有界的信號量,作上界控制;

不允許使用release超出初始值的范圍,否則拋ValueError異常;

?

信號量和鎖:

鎖,只允許同一個時間一個線程獨占資源,它是特殊的信號量,即信號量計數器初值為1;

信號量,允許多個線程訪問共享資源,但這個共享資源數量有限;

鎖,可看作是特殊的信號量;

?

例:

def work(s:threading.Semaphore):

??? logging.info('in sub')

??? s.acquire()

??? logging.info('end sub')

?

s = threading.Semaphore(3)

?

logging.info(s.acquire())

logging.info(s.acquire())

logging.info(s.acquire())

?

threading.Thread(target=work, args=(s,)).start()

?

print('~'*20)

time.sleep(2)

logging.info(s.acquire(False))

logging.info(s.acquire(timeout=3))

s.release()

print('end main')

輸出:

2018-08-07-10:07:23?????? Thread info: 9360 MainThread True

2018-08-07-10:07:23?????? Thread info: 9360 MainThread True

2018-08-07-10:07:23?????? Thread info: 9360 MainThread True

2018-08-07-10:07:23?????? Thread info: 13180 Thread-1 in sub

~~~~~~~~~~~~~~~~~~~~

2018-08-07-10:07:25?????? Thread info: 9360 MainThread False

2018-08-07-10:07:28?????? Thread info: 9360 MainThread False

2018-08-07-10:07:28?????? Thread info: 13180 Thread-1 end sub

end main

?

例:

s = threading.Semaphore(3)

s.release()?? #直接使用release改變了初始預設值

s.release()

s.release()

print(s.__dict__)

輸出:

{'_value': 6, '_cond': <Condition(<unlocked _thread.lock object at 0x00000000012AB5D0>, 0)>}

?

例,解決超出預設值:

s = threading.BoundedSemaphore(3)

s.release()

輸出:

Traceback (most recent call last):

? File "E:/git_practice/cmdb/example_threading2.py", line 359, in <module>

??? s.release()

? File "D:\Python\Python35\lib\threading.py", line 480, in release

??? raise ValueError("Semaphore released too many times")

ValueError: Semaphore released too many times

?

例:

連接池應有容量(總數),有一個工廠方法可獲取連接,能夠把不用的連接返回,供其它調用者使用;

?

class Conn:

??? def __init__(self, name):

??????? self.name = name

?

class Pool:

??? def __init__(self, count):

??????? self.count = count

??????? # self.pool = []

??????? self.pool = [self._connect('conn-{}'.format(i)) for i in range(count)]

?

??? def _connect(self, conn_name):

??????? return Conn(conn_name)

?

??? def get_conn(self):?? #此方法在多線程時有安全問題;如果池中正好有一個連接,有可能多個線程判斷池的長度是>0的,當一個線程拿走了連接對象,其它線程再來pop就拋異常,解決:用LockSemaphore

??????? # return self.pool.pop()

???? ???if len(self.pool) > 0:

??????????? return self.pool.pop()

?

??? def return_conn(self, conn:Conn):

??????? self.pool.append(conn)

?

例,連接池改進:

class Conn:

??? def __init__(self, name):

??????? self.name = name

?

class Pool:

??? def __init__(self, count):

??????? self.count = count

??????? self.pool = [self._connect('conn-{}'.format(i)) for i in range(count)]

??????? # self.sema = threading.Semaphore(count)

??????? self.sema = threading.BoundedSemaphore(count)?? #信號量比計算列表長度要好

?

??? def _connect(self, conn_name):

??????? return Conn(conn_name)

?

??? def get_conn(self):

??????? self.sema.acquire()

??????? data = self.pool.pop()

??????? return data

?

??? def return_conn(self, conn:Conn):

??????? self.pool.append(conn)

??????? self.sema.release()

?

pool = Pool(3)

def work(pool:Pool):

??? conn = pool.get_conn()

??? logging.info(conn)

??? threading.Event().wait(random.randint(1,4))

??? pool.return_conn(conn)

?

for i in range(6):

??? threading.Thread(target=work, args=(pool,), name='worker-{}'.format(i)).start()

輸出:

2018-08-07-14:09:11?????? Thread info: 3264 worker-0 <__main__.Conn object at 0x00000000014E6780>

2018-08-07-14:09:11?????? Thread info: 12452 worker-1 <__main__.Conn object at 0x00000000014E6630>

2018-08-07-14:09:11?????? Thread info: 11468 worker-2 <__main__.Conn object at 0x00000000014E6748>

2018-08-07-14:09:13?????? Thread info: 13228 worker-3 <__main__.Conn object at 0x00000000014E6748>

2018-08-07-14:09:14?????? Thread info: 12692 worker-4 <__main__.Conn object at 0x00000000014E6780>

2018-08-07-14:09:15?????? Thread info: 6748 worker-5 <__main__.Conn object at 0x00000000014E6630>

?

注:

使用信號量解決資源有限問題;

如果池中有資源,請求者獲取資源時信號量減1,拿走資源;

當請求超過資源數,請求者只能等待;

當使用者用完歸還資源后信號量加1,等到線程拿到就可喚醒拿走資源;

?

以上,從程序邏輯上分析:

1、如果還沒有使用信號量就release,會怎么樣?

超出預設值,解決用threading.BoundedSemaphore;

2、如果使用了信號量,但還沒用完?

??????? self.pool.append(conn)

??????? self.sema.release()

如果一種極端情況,計數器還差1個就滿了,有三個線程A、B、C都執行了第一句,都沒來得及release,這時輪到線程A release,然后輪到C release,這時一定出問題,超界了ValueError,因此有界信號量能保證,一定不能多歸還;

3、很多線程用完了信號量?

沒有獲得信號量的線程都阻塞,沒有線程和歸還的線程爭搶,當append后才release,這時才能等待的線程被喚醒,才能pop,即沒有獲取信號量就不能pop,這是安全的;

?

?

?

數據結構Queue

標準庫;

提供FIFOLIFOQueue,優先隊列;

Queue類是線程安全的,適用于多線程間安全的交換數據,內部使用了LockCondition;

?

魔術方法中,說實現容器的大小不準確?

如果不加鎖,是不可能獲得準確的大小的,因為當前線程剛讀取了一個大小,還沒取走,就有可能被其它線程改掉了;

?

Queue類的size雖加了鎖,但依然不能保證立即get、put就能成功,因為讀取大小和get、put方法是分開的;

?

?

?

GIL

gobal intepreter lock,全局解釋器鎖;

GIL保證cpyton進程中,只有一個線程執行字節碼,甚至在多核cpu情況下也是如此;ruby中也有GIL;另,其它解釋器沒有這種情況,如pypy、jython等;

?

cpython中沒有真正的多線程;

GIL的本質:理解cpython多線程;

?

cpython中:

IO密集型,由于線程阻塞,就會調度其它線程;

cpu密集型,當前線程可能會連續的獲得GIL,導致其它線程幾乎無法使用cpu;

?

IO密集型,使用多線程;

cpu密集型,使用多進程,繞開GIL;

?

新版cpython正努力優化GIL問題,但不是移除;

如果非要使用多線程且要有效率,請繞行,選擇其它語言go、erlang等;

?

py中絕大多數內置數據結構都是原子操作,如lst.append()、lst.pop();

由于GIL的存在,py的內置數據類型在多線程編程時就變為安全的了,但實際上它們本身不是線程安全的;

?

項目一開始就要固定使用某一解釋器的XX版本;

?

保留GIL的原因:

guido堅持的簡單哲學,對于初學者門檻低,不需要高深的系統知識也能安全、簡單的使用py;

若移除GIL,會降低cpython單線程的執行效率;

?

1

def calc():

??? sum = 0

??? for _ in range(100000000):

??????? sum += 1

?

start = datetime.datetime.now()

?

calc()

calc()

calc()

calc()

calc()

?

delta = (datetime.datetime.now() - start).total_seconds()

print(delta)

輸出:

32.942885

?

2

def calc():

??? sum = 0

??? for _ in range(100000000):

??????? sum += 1

?

start = datetime.datetime.now()

?

lst = []

for _ in range(5):?? #CPU密集型,用多進程解決

??? t = threading.Thread(target=calc)

??? t.start()

??? # t.join()?? #t.join()不能放在此處,若放在此處是串行(第1個線程執行完才能循環執行到下一個線程),不是并行

??? lst.append(t)

for t in lst:

??? t.join()

?

delta = (datetime.datetime.now() - start).total_seconds()

print(delta)

輸出:

32.487859

?

對比例1和例2

從結果上看,單線程和多線程一樣,cpython的多線程沒有優勢和一個線程執行時間相當,因為GIL的存在;

?

?


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女