遇到grpclib服务处理高并发性能瓶颈问题,查阅了相关资料今天找到了解决方案.如下:
采用SO_REUSEPO绑定同一个端口启动多进程,uvloop提升asyncio性能
# grpc_service.py
import os
import sys
import signal
import logging
import socket
import asyncio
import uvloop
from grpclib.server import Server
from grpclib.utils import graceful_exit
from multiprocessing import Process, current_process
from prometheus_client import start_http_server, Counter, Gauge
from typing import List
# 配置uvloop事件循环(提升3倍性能)
uvloop.install()
# 结构化日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(process)d] %(levelname)s %(module)s:%(lineno)d - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
# 监控指标
REQUEST_COUNTER = Counter('grpc_requests', 'Total requests', ['method', 'status'])
PROCESS_HEALTH = Gauge('process_health', 'Process health status', ['pid'])
class HealthCheckServicer:
async def Check(self, stream):
return health_pb2.HealthCheckResponse(status=health_pb2.HealthCheckResponse.SERVING)
class GRPCService:
def __init__(self, port: int, metrics_port: int):
self.port = port
self.metrics_port = metrics_port
self.server = None
async def _start_metrics(self):
start_http_server(self.metrics_port)
logger.info(f"Metrics server started on :{self.metrics_port}")
async def _configure_socket(self) -> socket.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) # 关键配置[6,9](@ref)
sock.bind(('0.0.0.0', self.port))
return sock
async def run(self):
await self._start_metrics()
sock = await self._configure_socket()
self.server = Server([YourServiceServicer()], loop=asyncio.get_event_loop())
await self.server.start(sock=sock)
# 注册优雅终止信号
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, self.graceful_shutdown)
logger.info(f"gRPC server started on :{self.port} (PID: {os.getpid()})")
PROCESS_HEALTH.labels(os.getpid()).set(1)
await self.server.wait_closed()
async def graceful_shutdown(self):
logger.info("Initiating graceful shutdown...")
PROCESS_HEALTH.labels(os.getpid()).set(0)
self.server.close()
await self.server.wait_closed()
logger.info("Server shutdown completed")
class ProcessManager:
def __init__(self, worker_num: int = None):
self.workers: List[Process] = []
self.worker_num = worker_num or os.cpu_count()
def start_workers(self, port: int, metrics_base: int = 9100):
"""启动多进程服务"""
for i in range(self.worker_num):
p = Process(
target=self._worker_entry,
args=(port, metrics_base + i),
name=f"grpc-worker-{i}",
daemon=True
)
p.start()
self.workers.append(p)
logger.info(f"Started worker process {p.pid}")
def _worker_entry(self, port: int, metrics_port: int):
"""工作进程入口"""
try:
service = GRPCService(port, metrics_port)
asyncio.run(service.run())
except Exception as e:
logger.critical(f"Process {os.getpid()} crashed: {str(e)}")
sys.exit(1)
def monitor(self):
"""进程健康监控与自动重启"""
while True:
for i, p in enumerate(self.workers):
if not p.is_alive():
logger.warning(f"Process {p.pid} exited, restarting...")
new_p = Process(
target=self._worker_entry,
args=(self.port, self.metrics_base + i),
daemon=True
)
new_p.start()
self.workers[i] = new_p
time.sleep(5)
if __name__ == "__main__":
# 生产环境配置建议
PORT = int(os.getenv("GRPC_PORT", "50051"))
WORKER_NUM = int(os.getenv("WORKER_NUM", str(os.cpu_count())))
METRICS_BASE = 9100
# 信号处理配置
signal.signal(signal.SIGCHLD, signal.SIG_IGN) # 避免僵尸进程[9](@ref)
# 启动服务
manager = ProcessManager(WORKER_NUM)
manager.start_workers(PORT, METRICS_BASE)
try:
while True: # 主进程监控循环
manager.monitor()
except KeyboardInterrupt:
logger.info("Shutting down all workers...")
for p in manager.workers:
p.terminate()
适合用于生产环境