threading模塊
局域網IP掃描實例
# 單線程:
import subprocess,time,threading a = time.clock() with open("check_ping.txt",'w') as f: for i in range(1,20): my_ip = ".".join(["192.163.37",str(i)]) try: subprocess.check_call('ping -n 1 -w 1 %s'%(my_ip),shell=True) except subprocess.CalledProcessError: pass else: f.write("%s 可以ping通\n"%my_ip) b = time.clock() print(b) # 總共花費8s多
# 多線程(一)創建 Thread 的實例,傳給它一個函數
a = time.clock() def check_ping(IP,obj): try: subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True) except subprocess.CalledProcessError: pass else: obj.write("%s 可以ping通\n" % IP) def main(): threads = [] with open("check_ping_1.txt", 'w') as f: for i in range(1, 20): my_ip = ".".join(["192.163.37", str(i)]) t = threading.Thread(target=check_ping,args=(my_ip,f)) #Thread方法:實例化一個線程對象,把函數(target)和參數(args)傳進去,然后返回Thread實例,這里并沒有執行。 threads.append(t) num = range(len(threads)) for i in num: threads[i].start() #執行線程的start方法,線程開始執行 for i in num: threads[i].join() #這行線程的join方法,等待線程結束,如果主進程不需要等待線程結束,可以不需要調用join方法。 if __name__ == '__main__': main() b = time.clock() print(b) # 總共花費0.9s
# 多線程(二)創建 Thread 的實例,傳給它一個可調用的類實例
a = time.clock() class Thread_func: def __init__(self,func,args): self.func = func self.args = args def __call__(self): #__call__方法:將類模擬成函數,實例化后的類再次實例化相當于執行了__call__方法。 self.func(*self.args) def check_ping(IP,obj): try: subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True) except subprocess.CalledProcessError: pass else: obj.write("%s 可以ping通\n" % IP) def main(): threads = [] with open("check_ping_2.txt", 'w') as f: for i in range(1, 20): my_ip = ".".join(["192.163.37", str(i)]) t = threading.Thread(target=Thread_func(check_ping,(my_ip,f))) #Thread_func實例化時已經傳入了參數,所以Thread方法中就不用args來傳參了。 threads.append(t) num = range(len(threads)) for i in num: threads[i].start() for i in num: threads[i].join() if __name__ == '__main__': main() b = time.clock() print(b) # 總共花費1
# 多線程(三)派生 Thread 的子類,并創建子類的實例
a = time.clock() class Thread_func(threading.Thread): #繼承Thread,調用更靈活。 def __init__(self,func,args): threading.Thread.__init__(self) self.func = func self.args = args def run(self): #這里必須使用run方法,當線程開啟后執行。 self.func(*self.args) def check_ping(IP,obj): try: subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True) except subprocess.CalledProcessError: pass else: obj.write("%s 可以ping通\n" % IP) def main(): threads = [] with open("check_ping_3.txt", 'w') as f: for i in range(1, 20): my_ip = ".".join(["192.163.37", str(i)]) t = Thread_func(check_ping,(my_ip,f)) threads.append(t) num = range(len(threads)) for i in num: threads[i].start() for i in num: threads[i].join() if __name__ == '__main__': main() b = time.clock() print(b) # 花費0.7
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。