這篇文章主要講解了“python Paramiko的SSH怎么使用”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“python Paramiko的SSH怎么使用”吧!
說明
1、將所有設備信息寫入文本文檔。
簡單地使用txt,將登錄信息構建成字典。
2、初始化SSH連接和執行命令。
3、分析此需求指定的命令和輸出結果。
將結果存儲在文件中。
4、增加多線程執行。
提高效率。
5、添加Linux的crontab。
每小時收集一次信息(服務器配置)
實例
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import time
from concurrent.futures import ThreadPoolExecutor
import paramiko
def get_device_list(filename):
"""從文本文件讀取設備列表,返回由字典組成的列表。
文本內容格式為:ip,用戶名,密碼,別名,例如:
1.1.1.1 admin admin sw1
1.1.1.2 admin admin sw2
......
Args:
filename ([str]): 文件名稱
"""
with open(filename, 'r') as f:
device_list = []
for line in f.readlines():
ip, username, password, name = line.strip().split()
device_list.append(
{
"ip": ip,
"username": username,
"password": password,
"name": name,
}
)
return device_list
class NetworkDevice(object):
def __init__(self, ip="", username="", password="'", name="", port=22,):
self.conn = None
if ip:
self.ip = ip.strip()
elif name:
self.name = name.strip()
else:
raise ValueError("需要設備連接地址(ip 或 別名)")
self.port = int(port)
self.username = username
self.password = password
self._open_ssh()
def _open_ssh(self):
"""初始化 SSH 連接,調起一個模擬終端,會話結束前可以一直執行命令。
Raises:
e: 拋出 paramiko 連接失敗的任何異常
"""
ssh_connect_params = {
"hostname": self.ip,
"port": self.port,
"username": self.username,
"password": self.password,
"look_for_keys": False,
"allow_agent": False,
"timeout": 5, # TCP 連接超時時間
}
conn = paramiko.SSHClient()
conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
conn.connect(**ssh_connect_params)
except Exception as e:
raise e
self.conn = conn.invoke_shell(term="vt100", width=500, height=1000)
return ""
def exec_cmd(self, cmd, recv_time=3):
"""登錄設備,執行命令
Args:
cmd ([type]): 命令字符串
recv_time (int, optional): 讀取回顯信息的超時時間. Defaults to 3.
Raises:
EOFError: 沒有任何信息輸出,說明連接失敗。
Returns:
output:
"""
cmd = cmd.strip() + "\n"
self.conn.sendall("screen disable\n")
self.conn.sendall(cmd)
time.sleep(int(recv_time))
output = self.conn.recv(1024*1024)
if len(output) == 0:
raise EOFError("連接可能被關閉,沒有任何信息輸出")
return output.decode('utf-8', 'ignore')
dev = {
"ip":"192.168.56.21",
"username":"netdevops",
"password":"Admin@h3c.com",
"name": "sw1"
}
# sw1 = NetworkDevice(**dev)
# ret = sw1.exec_cmd("dis version")
# print(ret)
def parse_interface_drop(output):
"""把設備的輸出隊列丟包信息解析成累加值
命令及輸出示例如下:
# [H3C]dis qos queue-statistics interface outbound | in "^ Drop"
# Dropped: 0 packets, 0 bytes
"""
ptn = re.compile(r"\s(\S+):\s+(\d+)\s+(\S+),\s+(\d+)\s+(\S+)")
count = 0
for i in ptn.findall(output):
count += int(i[1])
return count
def run(cmd, **conn_parms):
"""登錄單臺設備,執行指定命令,解析丟包統計
"""
sw = NetworkDevice(**conn_parms)
output = sw.exec_cmd(cmd)
drop_count = parse_interface_drop(output)
return "%s %s %s"%(
conn_parms.get("name"),
conn_parms.get("ip"),
drop_count)
# cmd = r'dis qos queue-statistics interface outbound | in "^ Drop"'
# ret = run(cmd,**dev)
# print(ret)
if __name__== "__main__":
"""獲取設備列表,使用多線程登錄設備獲取信息并返回
"""
with ThreadPoolExecutor(10) as pool:
futures = []
cmd = r'dis qos queue-statistics interface outbound | in "^ Drop"'
dev_info = get_device_list("./iplist.txt")
for d in dev_info:
future = pool.submit(run, cmd, **d)
futures.append(future)
# for f in futures:
# print(f.result())
# 根據執行時間把結果寫入文件,精確到小時
with open("./drops/%s.log"%time.strftime("%Y%m%d_%H"),'w') as f:
for line in futures:
f.write(line.result() + "\n")感謝各位的閱讀,以上就是“python Paramiko的SSH怎么使用”的內容了,經過本文的學習后,相信大家對python Paramiko的SSH怎么使用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。