溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python多線程--threading簡單實現

發布時間:2020-07-16 12:41:14 來源:網絡 閱讀:1204 作者:FJCA 欄目:編程語言

threading模塊                                                                                                                               

局域網IP掃描實例

# 單線程:

import subprocess,time,threading

a = time.clock()
with open("check_ping.txt",'w') as f:
    for i in range(1,20):
        my_ip = ".".join(["192.163.37",str(i)])
        try:
            subprocess.check_call('ping -n 1 -w 1 %s'%(my_ip),shell=True)
        except subprocess.CalledProcessError:
            pass
        else:
            f.write("%s 可以ping通\n"%my_ip)
b = time.clock()
print(b)
# 總共花費8s多

# 多線程(一)創建 Thread 的實例,傳給它一個函數

a = time.clock()
def check_ping(IP,obj):
    try:
        subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True)
    except subprocess.CalledProcessError:
        pass
    else:
        obj.write("%s 可以ping通\n" % IP)
def main():
    threads = []
    with open("check_ping_1.txt", 'w') as f:
        for i in range(1, 20):
            my_ip = ".".join(["192.163.37", str(i)])
            t = threading.Thread(target=check_ping,args=(my_ip,f))  #Thread方法:實例化一個線程對象,把函數(target)和參數(args)傳進去,然后返回Thread實例,這里并沒有執行。
            threads.append(t)
        num = range(len(threads))
        for i in num:
            threads[i].start()  #執行線程的start方法,線程開始執行
        for i in num:
            threads[i].join()   #這行線程的join方法,等待線程結束,如果主進程不需要等待線程結束,可以不需要調用join方法。

if __name__ == '__main__':
    main()
    b = time.clock()
    print(b)
# 總共花費0.9s

# 多線程(二)創建 Thread 的實例,傳給它一個可調用的類實例

a = time.clock()
class Thread_func:
    def __init__(self,func,args):
        self.func = func
        self.args = args
    def __call__(self):     #__call__方法:將類模擬成函數,實例化后的類再次實例化相當于執行了__call__方法。
        self.func(*self.args)
def check_ping(IP,obj):
    try:
        subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True)
    except subprocess.CalledProcessError:
        pass
    else:
        obj.write("%s 可以ping通\n" % IP)
def main():
    threads = []
    with open("check_ping_2.txt", 'w') as f:
        for i in range(1, 20):
            my_ip = ".".join(["192.163.37", str(i)])
            t = threading.Thread(target=Thread_func(check_ping,(my_ip,f)))  #Thread_func實例化時已經傳入了參數,所以Thread方法中就不用args來傳參了。
            threads.append(t)
        num = range(len(threads))
        for i in num:
            threads[i].start()
        for i in num:
            threads[i].join()
if __name__ == '__main__':
    main()
    b = time.clock()
    print(b)
# 總共花費1

# 多線程(三)派生 Thread 的子類,并創建子類的實例

a = time.clock()
class Thread_func(threading.Thread):    #繼承Thread,調用更靈活。
    def __init__(self,func,args):
        threading.Thread.__init__(self)
        self.func = func
        self.args = args
    def run(self):  #這里必須使用run方法,當線程開啟后執行。
        self.func(*self.args)
def check_ping(IP,obj):
    try:
        subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True)
    except subprocess.CalledProcessError:
        pass
    else:
        obj.write("%s 可以ping通\n" % IP)
def main():
    threads = []
    with open("check_ping_3.txt", 'w') as f:
        for i in range(1, 20):
            my_ip = ".".join(["192.163.37", str(i)])
            t = Thread_func(check_ping,(my_ip,f))
            threads.append(t)
        num = range(len(threads))
        for i in num:
            threads[i].start()
        for i in num:
            threads[i].join()
if __name__ == '__main__':
    main()
    b = time.clock()
    print(b)
# 花費0.7


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女