安裝Python
CentOS 7默認安裝Python 2.7,建議升級至Python 3:
sudo yum install python3 # 安裝Python 3
python3 --version # 驗證版本
安裝依賴庫
sudo yum groupinstall "Development Tools"
sudo yum install openssl-devel
pip3
安裝第三方庫(如requests
):pip3 install requests
服務器端
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', 12345)) # 綁定IP和端口
server_socket.listen(5) # 監聽連接
print("Server listening on port 12345...")
while True:
client_socket, client_address = server_socket.accept()
print(f"Connected by {client_address}")
data = client_socket.recv(1024).decode('utf-8')
print(f"Received: {data}")
client_socket.sendall("Hello from server!".encode('utf-8'))
client_socket.close()
客戶端
import socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('服務器IP', 12345)) # 替換為實際IP
client_socket.sendall("Hello from client!".encode('utf-8'))
data = client_socket.recv(1024).decode('utf-8')
print(f"Received: {data}")
client_socket.close()
GET請求
import requests
response = requests.get('http://httpbin.org/get')
print(response.text)
POST請求
import requests
response = requests.post('http://httpbin.org/post', data={'key': 'value'})
print(response.text)
import asyncio
async def handle_client(reader, writer):
data = await reader.read(100)
if not data:
return
message = data.decode()
print(f"Received: {message}")
writer.write("Hello from server!".encode())
await writer.drain()
writer.close()
async def main():
server = await asyncio.start_server(handle_client, '0.0.0.0', 12345)
async with server:
await server.serve_forever()
asyncio.run(main())
import socket
import threading
def handle_client(client_socket, address):
print(f"Connected by {address}")
data = client_socket.recv(1024).decode('utf-8')
print(f"Received: {data}")
client_socket.sendall("Hello from server!".encode('utf-8'))
client_socket.close()
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', 12345))
server_socket.listen(5)
print("Multi-threaded server started...")
while True:
client_socket, client_address = server_socket.accept()
client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
client_thread.start()
nc
(netcat)測試TCP服務nc localhost 12345 # 連接服務器并發送消息
netstat -tulnp | grep 12345 # 檢查端口是否被監聽