# PHP如何創建簡單RPC服務
## 1. RPC基礎概念
### 1.1 什么是RPC
RPC(Remote Procedure Call,遠程過程調用)是一種計算機通信協議,允許程序像調用本地服務一樣調用遠程服務。它抽象了網絡通信細節,使開發者能夠專注于業務邏輯而非底層通信機制。
### 1.2 RPC的核心組成
- **客戶端(Client)**:服務調用方
- **服務端(Server)**:服務提供方
- **存根(Stub)**:客戶端和服務端的代理
- **序列化協議**:數據編碼/解碼方式
- **傳輸協議**:網絡通信協議
### 1.3 常見RPC框架對比
| 框架 | 語言 | 特點 |
|------------|---------|--------------------------|
| gRPC | 多語言 | HTTP/2, Protocol Buffers |
| Thrift | 多語言 | Facebook開發, 跨語言 |
| JSON-RPC | 多語言 | 簡單, JSON格式 |
| XML-RPC | 多語言 | 早期標準, XML格式 |
## 2. PHP實現RPC的技術選型
### 2.1 原生PHP實現方案
```php
// 基礎示例:使用stream_socket_server創建服務
$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);
if (!$socket) {
die("$errstr ($errno)");
}
json_encode()/json_decode()msgpack_pack()/msgpack_unpack()serialize()/unserialize()// server.php
$methods = [
'add' => function($a, $b) {
return $a + $b;
},
'getUser' => function($id) {
return ['id' => $id, 'name' => 'User'.$id];
}
];
$request = json_decode(file_get_contents('php://input'), true);
$response = [
'jsonrpc' => '2.0',
'id' => $request['id'] ?? null
];
try {
if (!isset($methods[$request['method']])) {
throw new Exception('Method not found');
}
$result = $methods[$request['method']](...$request['params']);
$response['result'] = $result;
} catch (Exception $e) {
$response['error'] = [
'code' => -32601,
'message' => $e->getMessage()
];
}
header('Content-Type: application/json');
echo json_encode($response);
// client.php
class JsonRpcClient {
private $url;
public function __construct($url) {
$this->url = $url;
}
public function __call($method, $params) {
$request = [
'jsonrpc' => '2.0',
'method' => $method,
'params' => $params,
'id' => uniqid()
];
$context = stream_context_create([
'http' => [
'method' => 'POST',
'header' => 'Content-Type: application/json',
'content' => json_encode($request)
]
]);
$response = file_get_contents($this->url, false, $context);
$data = json_decode($response, true);
if (isset($data['error'])) {
throw new Exception($data['error']['message'], $data['error']['code']);
}
return $data['result'] ?? null;
}
}
// 使用示例
$client = new JsonRpcClient('http://localhost/server.php');
echo $client->add(2, 3); // 輸出5
// tcp_server.php
$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);
if (!$socket) {
die("$errstr ($errno)");
}
$methods = [
'add' => function($a, $b) { return $a + $b; },
'multiply' => function($a, $b) { return $a * $b; }
];
while ($conn = stream_socket_accept($socket, -1)) {
$request = fread($conn, 1024);
$data = unserialize($request);
$response = [
'id' => $data['id'],
'result' => null,
'error' => null
];
try {
if (!isset($methods[$data['method']])) {
throw new Exception('Method not found');
}
$response['result'] = $methods[$data['method']](...$data['params']);
} catch (Exception $e) {
$response['error'] = $e->getMessage();
}
fwrite($conn, serialize($response));
fclose($conn);
}
fclose($socket);
// tcp_client.php
class TcpRpcClient {
private $host;
private $port;
public function __construct($host, $port) {
$this->host = $host;
$this->port = $port;
}
public function __call($method, $params) {
$socket = stream_socket_client("tcp://{$this->host}:{$this->port}", $errno, $errstr, 30);
if (!$socket) {
throw new Exception("$errstr ($errno)");
}
$request = [
'id' => uniqid(),
'method' => $method,
'params' => $params
];
fwrite($socket, serialize($request));
$response = unserialize(fread($socket, 1024));
fclose($socket);
if ($response['error']) {
throw new Exception($response['error']);
}
return $response['result'];
}
}
// 使用示例
$client = new TcpRpcClient('localhost', 8000);
echo $client->add(5, 3); // 輸出8
echo $client->multiply(4, 6); // 輸出24
// swoole_server.php
$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
$methods = [
'add' => function($a, $b) { return $a + $b; },
'getUserInfo' => function($id) {
return ['id' => $id, 'name' => 'User'.$id, 'score' => rand(60, 100)];
}
];
$server->on('receive', function ($server, $fd, $reactor_id, $data) use ($methods) {
$data = json_decode($data, true);
$response = [
'id' => $data['id'],
'result' => null,
'error' => null
];
try {
if (!isset($methods[$data['method']])) {
throw new Exception('Method not found');
}
$response['result'] = $methods[$data['method']](...$data['params']);
} catch (Exception $e) {
$response['error'] = $e->getMessage();
}
$server->send($fd, json_encode($response));
});
$server->start();
// swoole_client.php
class SwooleRpcClient {
private $client;
public function __construct($host, $port) {
$this->client = new Swoole\Client(SWOOLE_SOCK_TCP);
if (!$this->client->connect($host, $port, 0.5)) {
throw new Exception("connect failed. Error: {$this->client->errCode}");
}
}
public function __call($method, $params) {
$request = [
'id' => uniqid(),
'method' => $method,
'params' => $params
];
if (!$this->client->send(json_encode($request))) {
throw new Exception("send failed. Error: {$this->client->errCode}");
}
$response = $this->client->recv();
$data = json_decode($response, true);
if ($data['error']) {
throw new Exception($data['error']);
}
return $data['result'];
}
public function __destruct() {
$this->client->close();
}
}
// 使用示例
$client = new SwooleRpcClient('127.0.0.1', 9501);
echo $client->add(10, 20); // 輸出30
print_r($client->getUserInfo(1001));
// 服務注冊中心簡單實現
class ServiceRegistry {
private static $services = [];
public static function register($name, $host, $port) {
self::$services[$name] = [
'host' => $host,
'port' => $port,
'last_heartbeat' => time()
];
}
public static function discover($name) {
if (!isset(self::$services[$name])) {
throw new Exception("Service {$name} not found");
}
return self::$services[$name];
}
public static function heartbeat($name) {
if (isset(self::$services[$name])) {
self::$services[$name]['last_heartbeat'] = time();
}
}
}
interface LoadBalancer {
public function select(array $services);
}
class RandomLoadBalancer implements LoadBalancer {
public function select(array $services) {
return $services[array_rand($services)];
}
}
class RoundRobinLoadBalancer implements LoadBalancer {
private $index = 0;
public function select(array $services) {
$service = $services[$this->index % count($services)];
$this->index++;
return $service;
}
}
class CircuitBreaker {
private $failureThreshold;
private $recoveryTimeout;
private $failureCount = 0;
private $lastFailureTime = 0;
private $state = 'CLOSED';
public function __construct($failureThreshold = 3, $recoveryTimeout = 30) {
$this->failureThreshold = $failureThreshold;
$this->recoveryTimeout = $recoveryTimeout;
}
public function execute(callable $operation) {
if ($this->state === 'OPEN') {
if (time() - $this->lastFailureTime > $this->recoveryTimeout) {
$this->state = 'HALF_OPEN';
} else {
throw new Exception("Circuit breaker is OPEN");
}
}
try {
$result = $operation();
$this->reset();
return $result;
} catch (Exception $e) {
$this->recordFailure();
throw $e;
}
}
private function recordFailure() {
$this->failureCount++;
$this->lastFailureTime = time();
if ($this->failureCount >= $this->failureThreshold) {
$this->state = 'OPEN';
}
}
private function reset() {
$this->failureCount = 0;
$this->state = 'CLOSED';
}
}
class ConnectionPool {
private $pool;
private $config;
private $maxSize;
private $currentSize = 0;
public function __construct($config, $maxSize = 10) {
$this->config = $config;
$this->maxSize = $maxSize;
$this->pool = new SplQueue();
}
public function getConnection() {
if (!$this->pool->isEmpty()) {
return $this->pool->dequeue();
}
if ($this->currentSize < $this->maxSize) {
$this->currentSize++;
return $this->createConnection();
}
throw new Exception("Connection pool exhausted");
}
public function releaseConnection($connection) {
$this->pool->enqueue($connection);
}
private function createConnection() {
$client = new Swoole\Client(SWOOLE_SOCK_TCP);
if (!$client->connect($this->config['host'], $this->config['port'], 0.5)) {
throw new Exception("connect failed. Error: {$client->errCode}");
}
return $client;
}
}
class AsyncRpcClient {
private $host;
private $port;
public function __construct($host, $port) {
$this->host = $host;
$this->port = $port;
}
public function callAsync($method, $params, callable $callback) {
$client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$client->on("connect", function($cli) use ($method, $params) {
$request = [
'id' => uniqid(),
'method' => $method,
'params' => $params
];
$cli->send(json_encode($request));
});
$client->on("receive", function($cli, $data) use ($callback) {
$response = json_decode($data, true);
$callback($response['result'], $response['error'] ?? null);
$cli->close();
});
$client->on("error", function($cli) use ($callback) {
$callback(null, "Connection failed");
});
$client->connect($this->host, $this->port, 0.5);
}
}
// 使用示例
$client = new AsyncRpcClient('127.0.0.1', 9501);
$client->callAsync('add', [5, 7], function($result, $error) {
if ($error) {
echo "Error: $error\n";
} else {
echo "Result: $result\n";
}
});
認證與授權
數據驗證
// 參數驗證示例
$validator = new Validator();
$validator->validate($params, [
'username' => 'required|string|min:3|max:20',
'password' => 'required|string|min:8'
]);
傳輸安全
// 日志記錄中間件
class LoggingMiddleware {
public function handle($request, $next) {
$start = microtime(true);
$logData = [
'time' => date('Y-m-d H:i:s'),
'method' => $request['method'],
'params' => $request['params'],
'ip' => $_SERVER['REMOTE_ADDR'] ?? ''
];
try {
$response = $next($request);
$logData['duration'] = microtime(true) - $start;
$logData['status'] = 'success';
$this->writeLog($logData);
return $response;
} catch (Exception $e) {
$logData['duration'] = microtime(true) - $start;
$logData['status'] = 'error';
$logData['error'] = $e->getMessage();
$this->writeLog($logData);
throw $e;
}
}
private function writeLog($data) {
file_put_contents(
'rpc.log',
json_encode($data) . PHP_EOL,
FILE_APPEND
);
}
}
class RpcServiceTest extends PHPUnit\Framework\TestCase {
public function testAddMethod() {
$client = new JsonRpcClient('http://localhost/server.php');
$this->assertEquals(5, $client->add(2, 3));
$this->assertEquals(0, $client->add(-1, 1));
}
public function testNonexistentMethod() {
$this->expectException(Exception::class);
$client = new JsonRpcClient('http://localhost/server.php');
$client->nonexistentMethod();
}
}
”`php // 壓力測試腳本 \(start = microtime(true); \)concurrency = 50; $requests = 1000;
$client = new SwooleRpcClient(‘127.0.0.1’, 9501);
\(success = 0; \)failures = 0;
for (\(i = 0; \)i
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。