Python提升grpclib服务并发能力(python高并发框架http请求)

遇到grpclib服务处理高并发性能瓶颈问题,查阅了相关资料今天找到了解决方案.如下:

采用SO_REUSEPO绑定同一个端口启动多进程,uvloop提升asyncio性能


# grpc_service.py

import os

import sys

import signal

import logging

import socket

import asyncio

import uvloop

from grpclib.server import Server

from grpclib.utils import graceful_exit

from multiprocessing import Process, current_process

from prometheus_client import start_http_server, Counter, Gauge

from typing import List

# 配置uvloop事件循环(提升3倍性能)

uvloop.install()

# 结构化日志配置

logging.basicConfig(

level=logging.INFO,

format='%(asctime)s [%(process)d] %(levelname)s %(module)s:%(lineno)d - %(message)s',

handlers=[logging.StreamHandler(sys.stdout)]

)

logger = logging.getLogger(__name__)

# 监控指标

REQUEST_COUNTER = Counter('grpc_requests', 'Total requests', ['method', 'status'])

PROCESS_HEALTH = Gauge('process_health', 'Process health status', ['pid'])

class HealthCheckServicer:

async def Check(self, stream):

return health_pb2.HealthCheckResponse(status=health_pb2.HealthCheckResponse.SERVING)

class GRPCService:

def __init__(self, port: int, metrics_port: int):

self.port = port

self.metrics_port = metrics_port

self.server = None


async def _start_metrics(self):

start_http_server(self.metrics_port)

logger.info(f"Metrics server started on :{self.metrics_port}")

async def _configure_socket(self) -> socket.socket:

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) # 关键配置[6,9](@ref)

sock.bind(('0.0.0.0', self.port))

return sock

async def run(self):

await self._start_metrics()

sock = await self._configure_socket()


self.server = Server([YourServiceServicer()], loop=asyncio.get_event_loop())

await self.server.start(sock=sock)


# 注册优雅终止信号

loop = asyncio.get_event_loop()

for sig in (signal.SIGINT, signal.SIGTERM):

loop.add_signal_handler(sig, self.graceful_shutdown)


logger.info(f"gRPC server started on :{self.port} (PID: {os.getpid()})")

PROCESS_HEALTH.labels(os.getpid()).set(1)

await self.server.wait_closed()

async def graceful_shutdown(self):

logger.info("Initiating graceful shutdown...")

PROCESS_HEALTH.labels(os.getpid()).set(0)

self.server.close()

await self.server.wait_closed()

logger.info("Server shutdown completed")

class ProcessManager:

def __init__(self, worker_num: int = None):

self.workers: List[Process] = []

self.worker_num = worker_num or os.cpu_count()


def start_workers(self, port: int, metrics_base: int = 9100):

"""启动多进程服务"""

for i in range(self.worker_num):

p = Process(

target=self._worker_entry,

args=(port, metrics_base + i),

name=f"grpc-worker-{i}",

daemon=True

)

p.start()

self.workers.append(p)

logger.info(f"Started worker process {p.pid}")

def _worker_entry(self, port: int, metrics_port: int):

"""工作进程入口"""

try:

service = GRPCService(port, metrics_port)

asyncio.run(service.run())

except Exception as e:

logger.critical(f"Process {os.getpid()} crashed: {str(e)}")

sys.exit(1)

def monitor(self):

"""进程健康监控与自动重启"""

while True:

for i, p in enumerate(self.workers):

if not p.is_alive():

logger.warning(f"Process {p.pid} exited, restarting...")

new_p = Process(

target=self._worker_entry,

args=(self.port, self.metrics_base + i),

daemon=True

)

new_p.start()

self.workers[i] = new_p

time.sleep(5)

if __name__ == "__main__":

# 生产环境配置建议

PORT = int(os.getenv("GRPC_PORT", "50051"))

WORKER_NUM = int(os.getenv("WORKER_NUM", str(os.cpu_count())))

METRICS_BASE = 9100


# 信号处理配置

signal.signal(signal.SIGCHLD, signal.SIG_IGN) # 避免僵尸进程[9](@ref)


# 启动服务

manager = ProcessManager(WORKER_NUM)

manager.start_workers(PORT, METRICS_BASE)


try:

while True: # 主进程监控循环

manager.monitor()

except KeyboardInterrupt:

logger.info("Shutting down all workers...")

for p in manager.workers:

p.terminate()



适合用于生产环境

原文链接:,转发请注明来源!