
一. 前言
在局域网内部署了一个 web/api
服务, 基于性能和websocket
使用的考量, 选择了fastapi
, 借此探讨一下fastapi
为何fast
的原因.
FastAPI is a modern, fast (high-performance), web framework for building APIs with Python based on standard Python type hints.
Round 23 results - TechEmpower Framework Benchmarks
Web Framework Benchmark - Sharkbench
A 2024 benchmark of main Web API frameworks - Okami101 Blog
在各类 web framework
benchmark
中, 作为python
届的杰出代表, fastapi
见于各榜单稍相对靠前的位置, 作为一个以慢著称语言写的 web
框架, fastapi
算是为python
扬眉吐气一把.
高性能一词, 几乎见于各类宣传和介绍fastapi
的文章或者视频中, 尽管无处不在强调这个fast
, 但是各种翻找试图找到fastapi
在一些具体的场景为什么会比诸如flask
等 web
框架性能faster
的原因, 但是除了寥寥无几的benchmark
做的简单测试之外, 找不到相关的项目工程实践, 来具体说明到底如何才能发挥出fastapi
这最大的卖点特性, 很遗憾几乎没有人提及这些内容.
在各类书籍或博客, 几乎都是老套的内容, 即面向属性编程(这个属性如何如何, 这个方法如何如何...always say how, never say why), 对于不管是异步还是其他的底层原理都是一笔带过或者干脆不谈.
弄来弄去, 举的例子
永远的asyncio.sleep()
, 然后如何如何巴拉巴拉, 总之一句话: 异步 ==
高性能.
甚至看不到上机实际测试一下跑出来的, 写的代码异步和同步之间的性能差异到底有多大, 或者说说其中有哪些坑.
(到底是真的强还是宣传的很强?)
python - FastAPI性能碾压Flask? - 个人文章 - SegmentFault 思否
做这个测试的本意是反驳前文提到的原因, 只是想说, 对比测试时应该使用第三方WSGI服务器启动Flask.
最终测试结果我也有点迷糊了, 只能保证测试数据和代码绝对真实, 看到本文的朋友最好自己测试一遍.
另外, 性能测试肯定要加上基本功能, 起码要有数据接收, 处理, 返回整个流程吧, 只测试HelloWorld没什么代表性.
被吹爆的性能强者FastAPI, 实际性能不到Flask一半_别再用fastapi-CSDN博客
(可以看到这种无厘头的测试....😂)
Balthazar - Blog – How to profile a FastAPI asynchronous request
为什么要了解其中的执行机制呢?
以这帖子为例, Python 的 async 协程当前真的能跑在生产环境吗? - V2EX, 很多人(考虑到使用python的庞大用户群)对于异步的了解非常有限(python对于异步的支持还是比较"新"的东西), 在一些标榜高性能的项目中, 也许用 fastapi
和 flask
或者其他的web框架没有任何的差异, 并未真正发挥出异步的性能.
(还有更离谱的....😂这也是异步..., Asynchronous I/O for File Operations in Python 3 with asyncio - DNMTechs - Sharing and Storing Technology Knowledge)
二. 回顾
在谈fastapi
之前, 简单回顾一下进程(Process
), 线程(Thread
), 协程(Coroutine
)之间的差异和联系.
2.1 基本概念
概念 | 定义 | 层级关系 |
---|---|---|
进程 | 操作系统资源分配的最小单位, 拥有独立的内存空间( 代码, 数据, 堆栈) , 是程序的一次独立执行实例. | CPU → 核心 → 进程( 每个核心可运行一个进程) |
线程 | 进程内的执行单元, 共享进程的内存和资源, 但拥有独立的栈和寄存器上下文. | 进程 → 线程( 一个进程可包含多个线程) |
协程 | 用户态的轻量级线程, 由程序员控制切换, 共享线程的内存空间, 通过协作式调度实现并发. | 线程 → 协程( 一个线程可包含多个协程) |
简单理解: 每个程序必包含一个进程, 每个进程必包含一个线程, 一个进程可以存在多个线程, 一个线程可以存在多个协程.
2.1.1 资源管理
概念 | 内存/资源 | 切换开销 | 隔离性 |
---|---|---|---|
进程 | 独立内存空间, 资源隔离性强. | 最大( 需切换页表, 内存映射, 缓存等, 微秒级) . | 高( 一个进程崩溃不影响其他进程) . |
线程 | 共享进程内存, 资源开销小. | 次之( 需保存寄存器和栈信息, 微秒级) . | 低( 线程间共享数据, 需同步机制) . |
协程 | 共享线程内存, 切换仅需保存寄存器上下文. | 最小( 用户态切换, 纳秒级) . | 无( 协程间无并行, 仅协作式切换) . |
小心在使用多线程/多进程时不受控制导致的程序崩溃.
2.1.2 通信机制
概念 | 通信机制 | 复杂度 | 同步需求 |
---|---|---|---|
进程 | 通过IPC( 管道, 共享内存, 套接字等) 通信. | 高( 需系统调用, 如pipe() , shmget() ) . |
需处理进程间同步( 如信号量, 文件锁) . |
线程 | 直接读写共享内存, 或通过锁机制( 互斥锁, 条件变量) . | 低( 线程间共享内存, 但需避免竞争条件) . | 需同步机制( 如mutex , semaphore ) . |
协程 | 通过共享变量或通道( channel ) 通信. |
最低( 单线程内运行, 无需锁) . | 无需同步( 协程按顺序执行, 无竞争) . |
进程间的通信是最麻烦的, 协程最简单
2.1.3 适用场景
需要"等"的, 用多线程/异步; 需要榨干cpu
的, 用多进程.
概念 | 适用场景 | 典型案例 |
---|---|---|
进程 | 需要强隔离性, 独立资源的场景; 高性能计算. | 多用户应用( 如服务器隔离用户任务) , 数学模型的计算. |
线程 | 轻量CPU密集型任务或需共享资源的并发场景. | Web服务器( 处理多客户端连接) , 游戏逻辑( 多角色控制) . |
协程 | I/O密集型任务或高并发场景. | 网络请求. |
2.2 GIL
global interpreter lock
, 全局解释器锁, GlobalInterpreterLock - Python Wiki, python知名度最高的一个特性了吧.
In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once. The GIL prevents race conditions and ensures thread safety. A nice explanation of how the Python GIL helps in these areas can be found here. In short, this mutex is necessary mainly because CPython's memory management is not thread-safe.
- cpython, 即python默认的解释器(python存在多种解释器), 在这个解释器种是启用GIL的.
- GIL的核心目的, 防止多个线程同时执行 Python 字节, GIL 可防止竞态条件并确保线程安全.
解析Python中的全局解释器锁( GIL) : 影响, 工作原理及解决方案_gil锁-CSDN博客
- CPU密集型任务受限: 对于CPU密集型任务, 由于GIL的存在, 多线程并不能有效地提高性能, 因为多个线程无法同时执行Python字节码.
- IO密集型任务相对不受限: 在IO密集型任务中, 线程在等待IO时会释放GIL, 允许其他线程执行Python字节码, 因此在这种情况下, 多线程能够发挥一定作用.
可以说, GIL
的存在某种程度也是python
这么亲民的一个原因, 毕竟单线程/同步的代码的理解是最为直观的.
2.3 线程
import threading
import time
def crawl(link, delay=3):
print(f"crawl started for {link}")
time.sleep(delay)
print(f"crawl ended for {link}")
links = [
"https://python.org",
"https://docs.python.org",
"https://peps.python.org",
]
threads = []
for link in links:
t = threading.Thread(target=crawl, args=(link,), kwargs={"delay": 2})
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
尽管 python
的多线程由于GIL
的存在一直被戏称假多线程
, 但还是有一定的用途的.
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(1)
return n * n
with ThreadPoolExecutor(max_workers=100) as executor:
start_time = time.time()
# 需要注意这里的操作不会立即执行
results = executor.map(task, [i for i in range(100)])
result_list = list(results) # 执行实际的任务
end_time = time.time()
print(f"耗时: {end_time - start_time:.2f}秒") # 耗时: 1.04秒
例如, 这种需要等待的并发执行, 尽管GIL
限制, 但是等待的时候会释放lock
, 即其他线程可以并行运算.
(当前系统运行中的线程数量)
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
result = 0
for i in range(10 7):
result += i
return n
if __name__ == '__main__':
# `max_workers`, 需要注意这里的参数设置, 不是计算机`cpu` 4核心4线程上所指的
with ThreadPoolExecutor(max_workers=4) as executor:
start_time = time.time()
# 需要注意这里的操作不会立即执行
results = executor.map(task, [i for i in range(4)])
result_list = list(results) # 执行实际的任务
end_time = time.time()
print(f"耗时: {end_time - start_time:.2f}秒")
'''
0.08165931701660156
0.12872791290283203
0.08131766319274902
0.11171102523803711
耗时: 0.19秒
'''
但面对的是cpu
密集型任务, 就只能一个个的算了, 完全无法发挥出多线程的并行计算的作用.
但, 转折点出现, python
最新的 3.14
版本正式推出官版free threading python
, Python3.14正式支持Free Threaded版本! - 个人文章 - SegmentFault 思否, 也许若干个版本迭代后, 真正并行的python将降临.
当 Python 默认 No GIL 后, Web 框架是不是要大洗牌了 - V2EX
2.4 协程
await - JavaScript | MDN, MDN文档更为详尽介绍await
机制, 对于 python
也是适用.
import time
import asyncio
def cpu_intensive_task():
s = time.time()
result = 0
for i in range(106):
result += i
print(time.time() - s)
async def test():
cpu_intensive_task()
return {"result": 'sync'}
async def main():
await asyncio.sleep(1)
return {"result": 'sync'}
async def test2():
await asyncio.sleep(1)
return 2
async def main2():
s = time.time()
await asyncio.gather(test(), main())
print('mix', time.time() - s)
s = time.time()
await asyncio.gather(test2(), main())
print('async', time.time() - s)
if __name__ == '__main__':
asyncio.run(main2())
"""
0.046948909759521484
mix 1.0554397106170654
async 1.007709264755249
"""
await asyncio.gather(test(), main())
, 当事件循环中存在cpu
密集型的任务, 将堵塞事件, 无法处理其他的协程任务导致总时间消耗的增长.
2.4.1 异步中的多线程/多进程
为了避免堵塞事件循环, 需要在事件中使用多进程/多线程处理可能堵塞事件的任务, 如IO.
loop.run_in_executor(executor, func, *args)
Arrange for func to be called in the specified executor.
The executor argument should be an
concurrent.futures.Executor
instance. The default executor is used if executor isNone
. The default executor can be set byloop.set_default_executor()
, otherwise, aconcurrent.futures.ThreadPoolExecutor
will be lazy-initialized and used byrun_in_executor()
if needed.
asyncio.to_thread
是一个比较新的api
, 在 python 3.9 引入.
async asyncio.to_thread(func, /, *args, kwargs)
Asynchronously run function func in a separate thread.
Any *args and kwargs supplied for this function are directly passed to func. Also, the current
contextvars.Context
is propagated, allowing context variables from the event loop thread to be accessed in the separate thread.Return a coroutine that can be awaited to get the eventual result of func.
This coroutine function is primarily intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if they were run in the main thread.
import asyncio
import time
def blocking_task():
time.sleep(1)
return "Task result"
async def main_with_to_thread():
result = await asyncio.to_thread(blocking_task)
print(f"Result from to_thread: {result}")
async def main_with_run_in_executor():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_task)
print(f"Result from run_in_executor: {result}")
asyncio.run(main_with_to_thread())
asyncio.run(main_with_run_in_executor())
但是需要注意loop.run_in_executor
除了可以实现调用线程池执行任务之外, 同样支持调用from concurrent.futures import ProcessPoolExecutor
的进程池.
process_pool = ProcessPoolExecutor(max_workers=multiprocessing.cpu_count())
@app.get("/")
async def root():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(process_pool, cpu_intensive_task)
return {"result": result}
假如是向其他的事件循环提交任务, asyncio.run_coroutine_threadsafe
.
2.4.2 其他函数
其他用于协调任务执行的函数.
方法 | 调度时机 | 线程安全性 | 适用场景 |
---|---|---|---|
call_soon |
当前迭代末尾 | 否( 仅主线程) | 同一线程内的即时任务 |
call_at |
绝对时间点 | 否( 仅主线程) | 定时任务 |
call_soon_threadsafe |
尽快( 跨线程) | 是 | 从其他线程向事件循环发送任务 |
import asyncio
import time
async def main():
loop = asyncio.get_running_loop()
loop.call_soon(lambda: print("1"))
await asyncio.sleep(1)
print("2")
if __name__ == '__main__':
asyncio.run(main())
print("3")
import asyncio
async def main():
loop = asyncio.get_running_loop()
current_time = loop.time()
loop.call_at(current_time + 1.0, lambda: print("1"))
await asyncio.sleep(0.5) # 等待足够长时间让回调执行
print("2")
asyncio.run(main())
# 输出: Hello from call_at( 约1秒后)
print('3')
2.4.3 协程锁
相对于人尽皆知的线程锁, 异步锁相对而言就清冷很多, 除此之外, 对于多进程也有专属的锁.
对于异步更多的时候写的是http
并发请求, 对于异步锁也基本没有使用的机会, 但是当我用异步数据库连接器操作数据库时这个问题出来.
import time
import mysql.connector as sync_c
from mysql.connector import Error
from mysql.connector.aio import connect as async_c
from faker import Faker
from typing import List, Tuple
# 配置数据库连接信息
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'database': 'test',
'port': 3306,
}
# 生成测试数据
def generate_test_data(rows: int) -> List[Tuple]:
fake = Faker()
data = []
for _ in range(rows):
record = (
fake.uuid4(), # UUID
fake.name(), # 姓名
fake.email(), # 邮箱
fake.phone_number(), # 电话
fake.address().replace('\n', ', '), # 地址
fake.date_of_birth(minimum_age=18, maximum_age=90), # 生日
fake.random_element(elements=('M', 'F', 'O')), # 性别
fake.random_int(min=1000, max=999999), # 随机整数
fake.pyfloat(min_value=0, max_value=100, right_digits=2), # 随机浮点数
fake.boolean(), # 布尔值
)
data.append(record)
return data
async def insert_async(conn, cursor, batch, lock):
async with lock:
sql = """
INSERT INTO test_async (id, name, email, phone, address, birth_date, gender, random_int,
random_float, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) \
"""
await cursor.executemany(sql, batch)
await conn.commit()
# 使用 mysqlclient 批量插入
async def test_async_bulk_insert(data: List[Tuple], batch_size: int = 100):
try:
conn = await async_c(**DB_CONFIG)
cursor = await conn.cursor()
lock = asyncio.Lock()
# 分批插入数据
start_time = time.time()
tasks = [
insert_async(conn, cursor, data[i:i + batch_size], lock) for i in range(0, len(data), batch_size)
]
await asyncio.gather(*tasks)
total_time = time.time() - start_time
print(f"mysqlclient 插入 {len(data)} 条数据, 耗时: {total_time:.4f} 秒")
except Exception as e:
print(f"mysqlclient 插入错误: {e}")
finally:
if conn:
await cursor.close()
await conn.close()
if __name__ == "__main__":
# 生成10000行测试数据
import asyncio
print("正在生成测试数据...")
test_data = generate_test_data(10000)
# 执行测试
print("\n开始测试 mysqlclient 批量插入性能...")
asyncio.run(test_async_bulk_insert(test_data, batch_size=100))
我习惯的还是像爬虫拿到session
, 将session
作为参数传递到具体执行请求的函数中, 但是在操作数据库时, 这个方法出现问题.
insert_async(conn, cursor, data[i:i + batch_size], lock)
cursor.executemany()
当把cursor
作为参数传递到具体执行函数
cursor.executemany()
, 查看源码
await self.execute(operation, params)
if self.with_rows and self._have_unread_result():
await self.fetchall()
对比
await client.get(url)
源码
def get(
self, url: StrOrURL, *, allow_redirects: bool = True, kwargs: Any
) -> "_RequestContextManager":
"""Perform HTTP GET request."""
return _RequestContextManager(
self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, kwargs)
)
class _RequestContextManager(_BaseRequestContextManager[ClientResponse]):
这里返回的是独立实例化对象.
async def spider():
semaphore = asyncio.Semaphore(4)
# connector=aiohttp.TCPConnector(limit=4),
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
) as client:
tasks = [fetch_url(client, 'http://127.0.0.1:8000/test', semaphore=semaphore) for _ in range(4)]
return await asyncio.gather(*tasks)
async def fetch_url(client, url, semaphore):
async with semaphore:
try:
r = await client.get(url)
return await r.text()
except aiohttp.http_exceptions as exc:
print(f"Error fetching {url}: {exc}")
return None
和cursor
执行后不同
if self.with_rows and self._have_unread_result():
await self.fetchall()
多个协程事件需要共享cursor
, cursor
每次执行后都会携带不同的信息, 这使得cursor
无法如http
请求一般, 使用一个session
对象可以连续执行不同的任务.
即这里也出现了多线程中的问题, 竞态(racing).
这里举一个更为简单的例子:
import asyncio
ic = 0
async def operation(i): # 这实际上就是个同步函数
global ic
ic = ic + i
ic = ic - i
print(ic)
async def main():
await asyncio.gather(operation(1), operation(2))
print(ic)
if __name__ == '__main__':
asyncio.run(main())
'''
0
0
0
''' #预期结果
稍微修改一下
import asyncio
ic = 0
async def operation(i):
global ic
ic = ic + i
await asyncio.sleep(0) # await, 即暂时挂起当前执行, 那怕休眠时间为0, 将控制权交还事件循环, 执行下一个协程任务操作
ic = ic - i
print(ic)
async def main():
await asyncio.gather(operation(1), operation(2))
print(ic)
if __name__ == '__main__':
asyncio.run(main())
'''
2
0
0
'''# 非预期
这个时候, 要实现对上述的控制, 就需要异步锁.
import asyncio
ic = 0
async def operation(i, lock):
async with lock:
global ic
ic = ic + i
await asyncio.sleep(0)
ic = ic - i
print(ic)
async def main():
lock = asyncio.Lock()
await asyncio.gather(operation(1, lock), operation(2, lock))
print(ic)
if __name__ == '__main__':
asyncio.run(main())
但这也让异步变成了同步执行的函数, 丧失了并发的能力.
无法做到既要又要.
2.5 进程
(资源管理器, 可以看到相关的参数)
两种底层机制, 在不同平台是不一样的, 理论上fork
的性能更好, Windows
只支持spawn
机制.
注意这个特性, 不然很容踩到坑里都不知道.
- spawn
The parent process starts a fresh Python interpreter process. The child process will only inherit those resources necessary to run the process object’s
run()
method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.Available on POSIX and Windows platforms. The default on Windows and macOS.
- fork
The parent process uses
os.fork()
to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.Available on POSIX systems. Currently the default on POSIX except macOS.
和线程一样, python的进程池的使用也分两种方式
需要注意的是, 下面的实现, multiprocessing
模块中提供的: multiprocessing.shared_memory --- 可跨进程直接访问的共享内存 - Python 3.13.5 文档
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool
特性 | ProcessPoolExecutor |
multiprocessing.Pool |
---|---|---|
抽象层级 | 高层 API, 统一接口 | 底层 API, 功能更丰富 |
异步管理 | 支持 Future 对象 |
支持回调函数和异步结果 |
共享资源 | 不直接支持 | 支持 Manager 和共享内存 |
适用场景 | 简单并行任务, 异步编程 | 复杂进程管理, 共享状态 |
需要稍微注意这两个函数:
- apply_async
- map_async
特性 | apply_async |
map_async |
---|---|---|
任务形式 | 单次执行一个函数, 参数需显式传递 | 对可迭代对象中的每个元素应用同一个函数 |
返回值 | 单个 AsyncResult 对象 |
单个 AsyncResult 对象( 封装结果列表) |
结果顺序 | 与调用顺序一致( 若不使用回调) | 结果顺序与输入迭代器一致 |
适用场景 | 任务参数不同, 异步执行独立任务 | 批量处理同类型数据, 并行映射操作 |
场景 | 推荐方法 |
---|---|
并行执行不同参数的独立任务 | apply_async |
批量处理同类型数据( 如数组) | map_async |
需要细粒度控制每个任务 | apply_async |
需要高效处理大量数据 | map_async + chunksize |
任务间有依赖关系 | 两者均不适用 |
from multiprocessing import Pool
# 这个函数不能改成 lambda x : x * x
def square(x):
return x ** 2
if __name__ == '__main__':
pool = Pool(processes=4)
result = pool.apply_async(square, args=(1, ))
print('start')
print(result.get())
# result = pool.apply_async(lambda x: x * x, args=(1, ))
# print(result) 不执行就没事
result = pool.apply(square, args=(1, ))
print(result)
results = [pool.apply_async(square, args=(x,)) for x in range(1, 5)]
print('next')
output = [result.get() for result in results] # 拿到结果
注意一些网站的提供的内容, 不加区分平台使用, Using Lambda Functions with Pool.map in Python 3 - DNMTechs - Sharing and Storing Technology Knowledge
(Python并发编程: 为什么传入进程池的目标函数不执行, 也没有报错? - 知乎
在Unix/Linux下,
multiprocessing
模块封装了fork()
调用, 使我们不需要关注fork()
的细节. 由于Windows没有fork
调用, 因此,multiprocessing
需要" 模拟" 出fork
的效果, 父进程所有Python对象都必须通过pickle序列化再传到子进程去, 所以, 如果multiprocessing
在Windows下调用失败了, 要先考虑是不是pickle失败了.
Python Python中可以使用pickle来序列化lambda函数吗|极客教程
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
def square(x):
return x * x
class AsyncProcessPool:
def __init__(self, max_workers=None):
self.pool = ProcessPoolExecutor(max_workers)
async def apply_async(self, func, args=()):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self.pool, func, *args)
async def map_async(self, func, iterable):
loop = asyncio.get_running_loop()
futures = [loop.run_in_executor(self.pool, func, item) for item in iterable]
return await asyncio.gather(*futures)
def close(self):
self.pool.shutdown(wait=False)
async def join(self):
self.pool.shutdown(wait=True)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.close()
await self.join()
return False
# 使用示例
async def main():
async with AsyncProcessPool(2) as pool:
# 单个任务执行
result = await pool.apply_async(square, (10,))
# 多个任务执行
results = await pool.map_async(square, [1, 2, 3, 4])
print(f"结果: {result}, {results}")
if __name__ == "__main__":
asyncio.run(main())
在异步中实现类似功能.
import asyncio
import concurrent.futures
def blocking_io(): # io阻塞
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound(): # cpu密集型
return sum(i * i for i in range(10 7))
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
if __name__ == '__main__':
asyncio.run(main())
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive_task(n):
"""模拟CPU密集型任务( 如大量计算) """
print(f"Processing {n} in process {id()}")
time.sleep(1) # 模拟耗时计算
return n 2
async def async_task(task_id):
"""异步任务: 处理网络请求等IO操作"""
print(f"Starting async task {task_id}")
await asyncio.sleep(0.5) # 模拟IO等待
print(f"Async task {task_id} completed")
return task_id
async def main():
# 创建进程池( CPU核心数-1, 保留1个核心给事件循环)
with ProcessPoolExecutor(max_workers=3) as pool:
loop = asyncio.get_running_loop()
# 1. 提交CPU密集型任务到进程池
cpu_tasks = [loop.run_in_executor(pool, cpu_intensive_task, i) for i in range(5)]
# 2. 定义异步IO任务
io_tasks = [async_task(i) for i in range(5, 8)]
# 3. 并发执行所有任务
all_tasks = cpu_tasks + io_tasks
results = await asyncio.gather(*all_tasks)
# 4. 分离结果
cpu_results = results[:5]
io_results = results[5:]
print("\nCPU密集型任务结果:", cpu_results)
print("异步IO任务结果:", io_results)
if __name__ == "__main__":
start_time = time.time()
asyncio.run(main())
print(f"\n总耗时: {time.time() - start_time:.2f}秒")
2.6 小结
多线程/多进程之间的同步和相互通信.
类型 | 可重入 | 锁机制 | 线程通信方式 | 典型场景 |
---|---|---|---|---|
Lock |
否 | 基础互斥锁 | 无 | 简单资源互斥( 如计数器自增) |
RLock |
是 | 可重入互斥锁 | 无 | 递归函数, 方法嵌套调用 |
Event |
否 | 无锁 | 全局标志位通知 | 线程同步启动, 事件完成通知 |
Condition |
视传入锁 | 基于锁的条件等待 | 条件变量通知 | 生产者 - 消费者模型, 复杂状态依赖场景 |
Barrier |
否 | 内部使用锁( 不可重入) | 集体同步等待 | 并行计算中的阶段同步( 如多个子任务完成后才能进行汇总计算) |
Semaphore |
否 | 基于计数器的信号量机制 | 资源数量通知 | 限制并发访问数量( 如数据库连接池, 限流) |
以下简单介绍三个不是那么"常用/常见"的使用.
2.6.1 Event
手动控制线程的的具体执行.
import threading
import time
def worker(event):
print("Worker waiting for signal...")
event.wait() # 阻塞直到event被设置
print("Worker got the signal!")
def main():
# 初始化事件
event = threading.Event()
# 启动3个等待线程
threads = [threading.Thread(target=worker, args=(event,)) for _ in range(3)]
for t in threads:
t.start()
# 3秒后通知所有线程
time.sleep(3)
print("Main thread setting signal...")
event.set() # 唤醒所有等待的线程
if __name__ == '__main__':
main()
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
threadpool = ThreadPoolExecutor(max_workers=4)
async def task(event, loop):
def handle_task():
time.sleep(1)
loop.call_soon_threadsafe(lambda: event.set())
threadpool.submit(handle_task)
await event.wait()
async def main():
loop = asyncio.get_event_loop()
event = asyncio.Event()
await asyncio.gather(*[task(event, loop) for _ in range(2)])
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
print(time.time() - start)
也可以通过手动控制线程的执行.
2.6.2 Barrier
不需要在主线程显示等待线程的执行完成.
import threading
import time
def worker(barrier, worker_id):
print(f"工作者 {worker_id} 开始工作...")
time.sleep(1)
print(f"工作者 {worker_id} 完成工作")
# 到达屏障点, 等待所有线程
barrier.wait()
def main():
# 创建屏障, 等待3个线程
barrier = threading.Barrier(3)
workers = []
# 启动3个线程
for i in range(3):
t = threading.Thread(target=worker, args=(barrier, i))
workers.append(t)
t.start()
print("主线程继续执行其他任务...")
if __name__ == "__main__":
main()
2.6.3 Condtion
线程之间的通信交互控制.
import threading
from queue import Queue
buffer = Queue(maxsize=5)
condition = threading.Condition()
def producer():
for i in range(10):
with condition:
while buffer.full():
condition.wait()
buffer.put(i)
print(f"生产者生产: {i}")
condition.notify_all() # 通知消费者
def consumer():
for _ in range(10):
with condition:
while buffer.empty():
condition.wait()
item = buffer.get()
print(f"消费者消费: {item}")
condition.notify_all() # 通知生产者
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
2.6.4 小结
类型 | 多线程 (threading ) |
多进程 (multiprocessing ) |
---|---|---|
数据共享 | 线程共享进程内存, 锁直接保护内存数据 | 进程内存隔离, 需通过Manager 或共享内存共享数据 |
底层机制 | 基于线程级互斥量( Mutex) | 基于系统信号量, 管道, 共享内存 |
创建方式 | 直接实例化( 如threading.Lock() ) |
必须使用multiprocessing 模块创建 |
性能开销 | 开销小( 同一进程内) | 开销大( 跨进程通信成本) |
可重入性 | RLock 支持同一线程递归获取 |
RLock 支持同一进程递归获取 |
Condition 依赖 | 内部锁为Lock ( 可传入RLock ) |
内部锁需适配跨进程通信, 通常与Manager 配合 |
三. HTTP版本
现役的各个版本http
协议, 主流1.1/2这两个版本.
需要注意不同版本的http
在浏览器中表现差异, 所以支持的http协议, 这对于选择哪个web框架是很重要的参考指标..
相关内容见: 浏览器并发数量限制 | Lian
特性 | HTTP/1.0 | HTTP/1.1 | HTTP/2 | HTTP/3 |
---|---|---|---|---|
连接管理 | 短连接 | 持久连接( 默认) | 多路复用( 单连接) | 基于QUIC( UDP) |
队头阻塞 | 存在 | 存在( 管道化) | 彻底解决 | 彻底解决 |
头部压缩 | 不支持 | 不支持 | 支持( HPACK) | 支持( 改进版) |
服务器推送 | 不支持 | 不支持 | 支持 | 支持 |
加密要求 | 可选 | 可选( HTTPS常见) | 默认HTTPS | 强制TLS 1.3 |
核心优化目标 | 基础功能 | 性能提升 | 并发效率 | 连接迁移与弱网优化 |
http3- PyPI, 目前python尚未正式支持 http3
HTTP3 is a next-generation HTTP client for Python 3.
Note: This project should be considered as an "alpha" release. It is substantially API complete, but there are still some areas that need more work.
四. 底层依赖
在了解fastapi
之前, 有一连串的概念需要先普及一下.
4.1 Web Server Gateway Interface
WSGI( Web Server Gateway Interface) 和 ASGI( Asynchronous Server Gateway Interface) , Web 服务器与应用通信的标准接口, uWSGI 则是另一个独立的服务器实现. 以下是它们的区别和关联:
4.1.1 WSGI
WSGI, Web Server Gateway Interface, Python 官方定义的同步 Web 服务器接口标准, What is WSGI? - WSGI.org
WSGI is the Web Server Gateway Interface. It is a specification that describes how a web server communicates with web applications, and how web applications can be chained together to process one request.
WSGI is a Python standard described in detail in PEP 3333.
-
作用: 规范 Web 服务器与 Python 应用( 如 Django) 之间的通信.
-
特点:
- 同步阻塞模型, 每个请求由一个线程/进程处理.
- 无法直接处理异步操作( 如 WebSocket) .
-
实现服务器: Gunicorn, uWSGI( 部分模式) .
4.1.2 ASGI
ASGI, Asynchronous Server Gateway Interface, WSGI 的异步扩展, Introduction - ASGI 3.0 documentation
ASGI is a spiritual successor to WSGI, the long-standing Python standard for compatibility between web servers, frameworks, and applications.
WSGI succeeded in allowing much more freedom and innovation in the Python web space, and ASGI’s goal is to continue this onward into the land of asynchronous Python.
-
作用: 允许 Python 应用处理异步请求( 如
WebSocket
) . -
特点:
- 基于事件循环( 如
asyncio
) , 支持并发 I/O 操作. - 兼容 WSGI, 但需应用显式支持异步.
- 基于事件循环( 如
-
实现服务器: Uvicorn, Daphne, Hypercorn.
4.1.3 uWSGI
WSGI扩展, 支持多种协议( 包括 WSGI, uwsgi 协议等) , Quickstart for Python/WSGI applications - uWSGI 2.0 documentation
The uWSGI project aims at developing a full stack for building hosting services.
Application servers (for various programming languages and protocols), proxies, process managers and monitors are all implemented using a common api and a common configuration style.
The " WSGI" part in the name is a tribute to the namesake Python standard, as it has been the first developed plugin for the project.
Versatility, performance, low-resource usage and reliability are the strengths of the project (and the only rules followed).
-
特点:
- 功能全面, 支持多线程, 多进程, 协程等模型.
- 配置复杂, 但性能和灵活性较高.
- 既可作为 WSGI 服务器, 也可作为反向代理或负载均衡器.
-
适用场景: 需要高度定制化的生产环境部署.
4.1.4 小结
组件 | 类型 | 协议支持 | 适用场景 | 性能特点 |
---|---|---|---|---|
Gunicorn | WSGI 服务器 | WSGI | 同步应用( Django/Flask) | 稳定, 配置简单 |
Uvicorn | ASGI 服务器 | ASGI | 异步应用( FastAPI/Starlette) | 高并发, 异步优先 |
Hypercorn | 多协议服务器 | WSGI + ASGI | 同步/异步混合场景 | 功能全面, 性能均衡 |
uWSGI | 通用服务器 | WSGI/uwsgi 等 | 复杂生产环境 | 灵活, 配置复杂 |
WSGI | 接口标准 | 同步通信 | 规范同步应用与服务器交互 | 基础标准, 无异步支持 |
ASGI | 接口标准 | 异步通信 | 规范异步应用与服务器交互 | 支持 WebSocket/异步 I/O |
一般结合nginx组合使用.
4.2 Web Server
Hypercorn/Gunicorn/Uvicorn
, Python 中常用的 Web
服务器(server
)/接口(interface
)实现, 沟通用户端的请求和响应端的web框架, 充当桥梁和入口的作用.
4.2.1 Gunicorn
经典的 WSGI 服务器, 专为同步(同步阻塞) 的 Python Web 应用设计( 如 Django, Flask), Gunicorn - Python WSGI HTTP Server for UNIX, 注意: 不支持Windows
.
Gunicorn 'Green Unicorn' is a Python WSGI HTTP Server for UNIX. It's a pre-fork worker model. The Gunicorn server is broadly compatible with various web frameworks, simply implemented, light on server resources, and fairly speedy.
-
特点:
- 基于预派生工作进程模型( Pre-fork Worker Model) , 通过多进程处理请求.
- 配置简单, 支持同步模式, 但不支持异步.
- 性能稳定, 适合传统同步框架.
-
适用场景: 同步代码( 如 Django ORM 操作) 或简单部署需求.
4.2.2 Uvicorn
Uvicorn is an ASGI web server implementation for Python.
Until recently Python has lacked a minimal low-level server/application interface for async frameworks. The ASGI specification fills this gap, and means we're now able to start building a common set of tooling usable across all async frameworks.
Uvicorn currently supports HTTP/1.1 and WebSockets.
Usage: uvicorn [OPTIONS] APP
Options:
--host TEXT Bind socket to this host. [default:
127.0.0.1]
--port INTEGER Bind socket to this port. If 0, an available
port will be picked. [default: 8000]
--uds TEXT Bind to a UNIX domain socket.
--fd INTEGER Bind to socket from this file descriptor.
--reload Enable auto-reload.
--reload-dir PATH Set reload directories explicitly, instead
of using the current working directory.
--reload-include TEXT Set glob patterns to include while watching
for files. Includes '*.py' by default; these
defaults can be overridden with `--reload-
exclude`. This option has no effect unless
watchfiles is installed.
--reload-exclude TEXT Set glob patterns to exclude while watching
for files. Includes '.*, .py[cod], .sw.*,
~*' by default; these defaults can be
overridden with `--reload-include`. This
option has no effect unless watchfiles is
installed.
--reload-delay FLOAT Delay between previous and next check if
application needs to be. Defaults to 0.25s.
[default: 0.25]
--workers INTEGER Number of worker processes. Defaults to the
$WEB_CONCURRENCY environment variable if
available, or 1. Not valid with --reload.
--loop [auto|asyncio|uvloop] Event loop implementation. [default: auto]
--http [auto|h11|httptools] HTTP protocol implementation. [default:
auto]
--ws [auto|none|websockets|websockets-sansio|wsproto]
WebSocket protocol implementation.
[default: auto]
--ws-max-size INTEGER WebSocket max size message in bytes
[default: 16777216]
--ws-max-queue INTEGER The maximum length of the WebSocket message
queue. [default: 32]
--ws-ping-interval FLOAT WebSocket ping interval in seconds.
[default: 20.0]
--ws-ping-timeout FLOAT WebSocket ping timeout in seconds.
[default: 20.0]
--ws-per-message-deflate BOOLEAN
WebSocket per-message-deflate compression
[default: True]
--lifespan [auto|on|off] Lifespan implementation. [default: auto]
--interface [auto|asgi3|asgi2|wsgi]
Select ASGI3, ASGI2, or WSGI as the
application interface. [default: auto]
--env-file PATH Environment configuration file.
--log-config PATH Logging configuration file. Supported
formats: .ini, .json, .yaml.
--log-level [critical|error|warning|info|debug|trace]
Log level. [default: info]
--access-log / --no-access-log Enable/Disable access log.
--use-colors / --no-use-colors Enable/Disable colorized logging.
--proxy-headers / --no-proxy-headers
Enable/Disable X-Forwarded-Proto,
X-Forwarded-For to populate url scheme and
remote address info.
--server-header / --no-server-header
Enable/Disable default Server header.
--date-header / --no-date-header
Enable/Disable default Date header.
--forwarded-allow-ips TEXT Comma separated list of IP Addresses, IP
Networks, or literals (e.g. UNIX Socket
path) to trust with proxy headers. Defaults
to the $FORWARDED_ALLOW_IPS environment
variable if available, or '127.0.0.1'. The
literal '*' means trust everything.
--root-path TEXT Set the ASGI 'root_path' for applications
submounted below a given URL path.
--limit-concurrency INTEGER Maximum number of concurrent connections or
tasks to allow, before issuing HTTP 503
responses.
--backlog INTEGER Maximum number of connections to hold in
backlog
--limit-max-requests INTEGER Maximum number of requests to service before
terminating the process.
--timeout-keep-alive INTEGER Close Keep-Alive connections if no new data
is received within this timeout. [default:
5]
--timeout-graceful-shutdown INTEGER
Maximum number of seconds to wait for
graceful shutdown.
--ssl-keyfile TEXT SSL key file
--ssl-certfile TEXT SSL certificate file
--ssl-keyfile-password TEXT SSL keyfile password
--ssl-version INTEGER SSL version to use (see stdlib ssl module's)
[default: 17]
--ssl-cert-reqs INTEGER Whether client certificate is required (see
stdlib ssl module's) [default: 0]
--ssl-ca-certs TEXT CA certificates file
--ssl-ciphers TEXT Ciphers to use (see stdlib ssl module's)
[default: TLSv1]
--header TEXT Specify custom default HTTP response headers
as a Name:Value pair
--version Display the uvicorn version and exit.
--app-dir TEXT Look for APP in the specified directory, by
adding this to the PYTHONPATH. Defaults to
the current working directory. [default:
""]
--h11-max-incomplete-event-size INTEGER
For h11, the maximum number of bytes to
buffer of an incomplete event.
--factory Treat APP as an application factory, i.e. a
() -> <ASGI app> callable.
--help Show this message and exit.
-
定位: 高性能的 ASGI 服务器, 专为异步(Async/Await) Python Web 应用设计(如 FastAPI, Starlette) .
-
特点:
- 基于
uvloop
( 高性能事件循环) 和httptools
( HTTP 解析库) 实现. - 支持异步代码( 如 WebSocket, 长轮询, 异步数据库操作) .
- 单线程异步模型, 适合 I/O 密集型任务.
- 基于
-
适用场景: 异步框架或需要高并发, 低延迟的场景.
4.2.3 Hypercorn
支持 WSGI 和 ASGI 的多协议服务器, 功能更全面, Hypercorn documentation - Hypercorn 0.17.3 documentation
Hypercorn is an ASGI web server based on the sans-io hyper, h11, h2, and wsproto libraries and inspired by Gunicorn. Hypercorn supports HTTP/1, HTTP/2, WebSockets (over HTTP/1 and HTTP/2), ASGI/2, and ASGI/3 specifications. Hypercorn can utilise asyncio, uvloop, or trio worker types.
相关使用见于: 浏览器并发数量限制 | Lian, 注意Hypercorn
是支持http/2
的
-
特点:
- 同时支持 WSGI( 同步) 和 ASGI( 异步) 协议.
- 支持 HTTP/1.1, HTTP/2, WebSocket 等协议.
- 配置灵活, 但性能略低于
Uvicorn
( 因功能更复杂) .
-
适用场景: 需要同时支持同步和异步应用, 或需要 HTTP/2 等高级协议.
五. wrk benchmark
http
压力测试工具
wg/wrk: Modern HTTP benchmarking tool
wrk is a modern HTTP benchmarking tool capable of generating significant load when run on a single multi-core CPU. It combines a multithreaded design with scalable event notification systems such as epoll and kqueue.
改进版, giltene/wrk2: A constant throughput, correct latency recording variant of wrk
wrk2 is wrk modifed to produce a constant throughput load, and accurate latency details to the high 9s (i.e. can produce accurate 99.9999%'ile when run long enough). In addition to wrk's arguments, wrk2 takes a throughput argument (in total requests per second) via either the --rate or -R parameters (default is 1000).
Linux下安装使用.
sudo apt install wrk -y
lian@jarvis:~$ wrk --version
wrk debian/4.1.0-4build2 [epoll] Copyright (C) 2012 Will Glozer
Usage: wrk <options> <url>
Options:
-c, --connections <N> Connections to keep open
-d, --duration <T> Duration of test
-t, --threads <N> Number of threads to use
-s, --script <S> Load Lua script file
-H, --header <H> Add header to request
--latency Print latency statistics
--timeout <T> Socket/request timeout
-v, --version Print version details
Numeric arguments may include a SI unit (1k, 1M, 1G)
Time arguments may include a time unit (2s, 2m, 2h)
wrk -t4 -c1000 -d30s
设置 4 线程, 1000 请求, 持续 30 秒.
5.1 FastAPI
不设置workers
参数.
uvicorn 1:app --host '0.0.0.0' --port '8000'
import asyncio
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/async_0")
async def test_async_0():
await asyncio.sleep(0.1)
return {"async": 0}
@app.get("/async_1")
async def test_async_1():
await asyncio.sleep(1)
return {"async": 1}
@app.get("/sync_0")
def test_sync_0():
time.sleep(0.1)
return {"sync": 0}
@app.get("/sync_1")
def test_sync_1():
time.sleep(1)
return {"sync": 0}
Running 30s test @ http://192.168.2.108:8000/async_0
4 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 557.87ms 180.09ms 1.99s 69.12%
Req/Sec 263.74 166.70 0.98k 68.65%
30407 requests in 30.07s, 3.98MB read
Socket errors: connect 0, read 0, write 0, timeout 362
Requests/sec: 1011.22
Transfer/sec: 135.53KB
lian@jarvis:~$ wrk -t4 -c1000 -d30s http://192.168.2.108:8000/sync_0
Running 30s test @ http://192.168.2.108:8000/sync_0
4 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.24s 484.48ms 1.97s 66.21%
Req/Sec 112.06 92.66 450.00 65.78%
9931 requests in 30.07s, 1.28MB read
Socket errors: connect 0, read 0, write 0, timeout 8981
Requests/sec: 330.27
Transfer/sec: 43.62KB
lian@jarvis:~$ wrk -t4 -c1000 -d30s http://192.168.2.108:8000/async_1
Running 30s test @ http://192.168.2.108:8000/async_1
4 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.22s 139.17ms 1.99s 81.67%
Req/Sec 235.74 173.59 1.18k 67.81%
23743 requests in 30.07s, 3.08MB read
Requests/sec: 789.60
Transfer/sec: 104.87KB
lian@jarvis:~$ wrk -t4 -c1000 -d30s http://192.168.2.108:8000/sync_1
Running 30s test @ http://192.168.2.108:8000/sync_1
4 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.10s 27.00ms 1.13s 62.50%
Req/Sec 54.28 77.05 310.00 82.93%
1160 requests in 30.09s, 152.93KB read
Socket errors: connect 0, read 0, write 0, timeout 1120
Requests/sec: 38.55 # 这一组的数据有问题?
Transfer/sec: 5.08KB
5.2 Flask
flask
使用的wsgi
是默认的werkzeug
, 这里只是为了简单看一下对比, 没有使用Gunicorn
# flask_app.py
from flask import Flask
import time
app = Flask(__name__)
@app.route("/sync_0")
def sync_0():
time.sleep(0.1)
return {"status": "success"}
@app.route("/sync_1")
def sync_1():
time.sleep(1)
return {"status": "success"}
if __name__ == "__main__":
app.run(host="0.0.0.0")
lian@jarvis:~$ wrk -t4 -c1000 -d30s http://192.168.2.108:5000/sync_0
Running 30s test @ http://192.168.2.108:5000/sync_0
4 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 441.33ms 145.39ms 1.47s 88.46%
Req/Sec 124.75 65.74 555.00 75.35%
14483 requests in 30.06s, 2.58MB read
Socket errors: connect 0, read 190, write 0, timeout 0
Requests/sec: 481.78
Transfer/sec: 87.99KB
lian@jarvis:~$ wrk -t4 -c1000 -d30s http://192.168.2.108:5000/sync_1
Running 30s test @ http://192.168.2.108:5000/sync_1
4 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.41s 128.74ms 2.00s 71.50%
Req/Sec 124.13 66.29 550.00 73.43%
13736 requests in 30.07s, 2.45MB read
Socket errors: connect 0, read 76, write 0, timeout 214
Requests/sec: 456.79
Transfer/sec: 83.42KB
六. aiohttp测试
aiohttp
并发请求测试, 测试各种状态下的影响.
import asyncio
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/async_0/{index}")
async def test_async_0(index: int):
await asyncio.sleep(0.1)
return {"async": index}
@app.get("/async_1/{index}")
async def test_async_1(index: int):
await asyncio.sleep(1)
return {"async": index}
@app.get("/sync_0/{index}")
def test_sync_0(index: int):
time.sleep(0.1)
return {"sync": index}
@app.get("/sync_1/{index}")
def test_sync_1(index: int):
time.sleep(1)
return {"sync": index}
20 并发, 1000请求, 不同 workers
状态下的时间消耗.
不指定workers
参数
(base) PS D:\Code\python_workspace> python test1.py
async_0 5.302882194519043
async_1 51.881797313690186
sync_0 6.76952052116394
sync_1 51.73477840423584
当请求等待时间越短, 切换线程的劣势将会非常明显.
2 workers
(base) PS D:\Code\python_workspace> python test1.py
async_0 5.568952560424805
async_1 50.979652881622314
sync_0 6.251736640930176
sync_1 51.50024890899658
4 workers
(base) PS D:\Code\python_workspace> python test1.py
async_0 5.548957560424805
async_1 50.721187114715576
sync_0 6.0700507164001465
sync_1 51.02872323989868
workers
数量设置基本不影响上述情况的四组请求响应速度, 从前面线程/进程/协程回顾, 也很容易理解, 上述的并发, 同步可以通过线程池来化解, 异步就更简单, 增加workers
基本不影响上述的请求响应.
6.1 堵塞事件循环
这里重点来看: 异步函数 + 堵塞
的场景
即在事件循环种执行以下任务:
- 直接休眠
- cpu密集型任务
async def test():
time.sleep()
async def test_c():
cpu_intensive()
@app.get("/mix_s_0/{index}")
async def test_mix(index: int):
time.sleep(0.1)
return {"mix": index}
@app.get("/mix_s_1/{index}")
async def test_mix(index: int):
time.sleep(1)
return {"mix": index}
def cpu_intensive_task():
result = 0
for i in range(10**7):
result += i
@app.get("/mix_c/{index}")
async def test_mix():
cpu_intensive_task()
return {"mix": index}
这里不需要大量的请求, 只需要少量请求即可, 设置为: 4 请求, 2并发
不指定workers
参数.
(base) PS D:\Code\python_workspace> python test1.py
mix_0 0.46183228492736816
mix_1 4.069617748260498
mix_c 1.929750919342041
2 workers, 在面对上述场景, 多workers
的作用就会发挥出来.
(base) PS D:\Code\python_workspace> python test1.py
mix_0 0.23683786392211914
mix_1 2.03873348236084
mix_c 0.9997785091400146
(所消耗的时间减半)
再看看两种状态, 异步和同步的差异
@app.get("/sync")
def sync_test():
time.sleep(1)
return {"result": 'sync'}
@app.get("/mix")
async def mix_test():
time.sleep(1)
return {"result": 'sync'} #这是代码A
def cpu_intensive_task():
s = time.time()
result = 0
for i in range(10**7):
result += i
print(time.time() - s)
@app.get("/sync")
def sync_test():
cpu_intensive_task()
return {"result": 'sync'}
@app.get("/mix")
async def mix_test():
cpu_intensive_task()
return {"result": 'sync'} #这是代码B
# 不设置`workers`参数
(base) PS D:\Code\python_workspace> python test1.py
sync 1.9449994564056396
mix 1.960463285446167 #这是代码B服务端时的运行时间消耗
(base) PS D:\Code\python_workspace> python test1.py
sync 2.034728765487671
mix 4.051820755004883 #这是这是代码A作为服务端时的运行时间消耗
对于cpu
密集型阻塞异步, 某种程度相当于将异步函异化成同步函数, 从B代码可以看到二者的最终消耗时间基本一致, 而对于time.sleep
这一组, 异步函数消耗时间是同步函数的两倍.
两种堵塞有什么区别呢
time.sleep
, 这个函数导致的是Suspend execution of the calling thread
, 整个线程被暂停执行, 那么事件循环也彻底暂停.
Suspend execution of the calling thread for the given number of seconds. The argument may be a floating-point number to indicate a more precise sleep time.
If the sleep is interrupted by a signal and no exception is raised by the signal handler, the sleep is restarted with a recomputed timeout.
The suspension time may be longer than requested by an arbitrary amount, because of the scheduling of other activity in the system.
Windows implementation
On Windows, if secs is zero, the thread relinquishes the remainder of its time slice to any other thread that is ready to run. If there are no other threads ready to run, the function returns immediately, and the thread continues execution. On Windows 8.1 and newer the implementation uses a high-resolution timer which provides resolution of 100 nanoseconds. If secs is zero,
Sleep(0)
is used.
相对比, 异步中暂停的是当前任务.
asyncio.sleep(delay, result=None)
Block for delay seconds.If result is provided, it is returned to the caller when the coroutine completes.
sleep()
always suspends the current task, allowing other tasks to run.Setting the delay to 0 provides an optimized path to allow other tasks to run. This can be used by long-running functions to avoid blocking the event loop for the full duration of the function call.
在阻塞事件的时候同时发起另外的请求
不指定workers
参数
(base) PS D:\Code\python_workspace> python test1.py
mix_1 16.300064086914062 # 发起阻塞事件sleep(1) x16
(base) PS D:\Code\python_workspace> python test.py
async_1 18.59671449661255 # 发起非阻塞异步 20并发100请求, 理论消耗时间 5s
2 workers, 分担了负载.
(base) PS D:\Code\python_workspace> python test1.py
mix_1 9.114005088806152
(base) PS D:\Code\python_workspace> python test.py
async_1 8.608720779418945
asyncio.to_thread
, 将阻塞事件的任务转移到其他的线程上.
def await_f():
time.sleep(1)
@app.get("/mix_s_1/{index}")
async def test_mix(index: int):
await asyncio.to_thread(await_f)
return {"mix": index}
当阻塞事件循环的任务被放到其他线程上执行, 事件循环所在的主线程就可以腾空出来处理其他的协程任务.
(base) PS D:\Code\python_workspace> python test1.py
mix_1 4.10187292098999
(base) PS D:\Code\python_workspace> python test.py
async_1 5.200965642929077
二者的时间消耗都将恢复到预期.
再换一种方式(不设置workers参数), 先发起一个同步请求, 需要处理cpu
密集型任务, 再发起请求, 执行异步操作
(base) PS D:\Code\python_workspace> python test1.py
sync_c 19.120626211166382
(base) PS D:\Code\python_workspace> python test.py
async_1 13.808538436889648 # 理论上等待5秒多, 实际消耗13秒多
七. 总结
如何让fastapi
真的fast
起来, 以下做个简单的总结.
7.1 禁用sleep
严禁在事件循环所在的(主)线程使用time.sleep()
7.2 保持主线程的低负载
避免在事务所在的主线程运行太多的阻塞任务, 将同步I/O任务转换为异步任务或者多线程执行, cpu密集型任务则由多进程负责, 或者在设置Uvicorn
的workers
参数相对数值大一点.
以读取本地文件为例:
# 简单例子, 需要在事务asyncio.to_thread()
def read_file_sync(file_path):
with open(file_path, 'r') as f:
print(f.read())
# 在线程池中执行
asyncio.to_thread(...)
亦或者使用专门的异步包来处理, 如: Tinche/aiofiles: File support for asyncio
async with aiofiles.open('filename', mode='r') as f:
contents = await f.read()
简单看一下open
这个函数的实现
return AiofilesContextManager(
_open(
file,
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
closefd=closefd,
opener=opener,
loop=loop,
executor=executor,
)
)
一个上下文管理器包裹的_open
if loop is None:
loop = asyncio.get_running_loop() # 获得当前的事件循环
# 一个偏函数
cb = partial(
sync_open,
file,
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
closefd=closefd,
opener=opener,
)
# 线程池中执行, executor默认为None
f = await loop.run_in_executor(executor, cb)
上下文管理器的实现.
class AiofilesContextManager(Awaitable, AbstractAsyncContextManager):
"""An adjusted async context manager for aiofiles."""
__slots__ = ("_coro", "_obj")
def __init__(self, coro):
self._coro = coro
self._obj = None
# 可等待的函数
def __await__(self):
if self._obj is None:
# 关键 yield
self._obj = yield from self._coro.__await__()
return self._obj
# with装饰器
async def __aenter__(self):
return await self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await get_running_loop().run_in_executor(
None, self._obj._file.__exit__, exc_type, exc_val, exc_tb
)
self._obj = None
7.3 资源控制
如限制开的线程/进程过多, 这可能导致程序崩溃.
semaphore = asyncio.Semaphore(20) # 限制信号量
async def fetch_data():
async with semaphore:
# 只有获取到信号量的协程才能进入线程池
return await asyncio.to_thread(some_blocking_io_function)
# -------
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
# 进程池处理 CPU 密集型任务
process_pool = ProcessPoolExecutor(max_workers=os.cpu_count())
# 线程池处理 I/O 密集型任务
thread_pool = ThreadPoolExecutor(max_workers=5)
async def handle_request():
# CPU 密集型
cpu_result = await loop.run_in_executor(process_pool, cpu_task)
# I/O 密集型
io_result = await loop.run_in_executor(thread_pool, io_task)
return cpu_result, io_result
7.4 小结
我用过
fastapi
, 这东西理论上限还行, 能跑出10K以上的秒响应, 不过其实python的协程架构一直都能跑出这样的性能, tornado早就能做到. 但是无论tornado还是fastapi
, 实际项目中都很难接近理论上限, 因为大部分程序员根本写不好协作式的并发逻辑, 更何况这种编程风格对一些业务很不友好, 强行要求业务适配技术这就削足适履了.
写过python
的异步和JavaScript
的异步, 会多少发现二者的差异, 就我个人而言, 简单来说就是写js
对于异步这个概念几乎是无感的, 就是不会感觉到异步和同步的明显差异, 但是对于python
的异步可谓是无处不在强调着这是异步的代码, 这种感觉总会让人很不舒服, 可谓一步异步步步异步, 对于如何组织代码的结构和梳理作业流程都异于同步作业的代码, 这意味着编写异步的代码对于使用者提出了较高要求.
import asyncio
import asyncmy
from faker import Faker
from typing import List, Tuple
import time
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'database': 'test',
'port': 3306,
}
def generate_test_data(rows: int) -> List[Tuple]:
fake = Faker()
data = []
for _ in range(rows):
record = (
fake.uuid4(),
fake.name(),
fake.email(),
fake.phone_number(),
fake.address().replace('\n', ', '),
fake.date_of_birth(minimum_age=18, maximum_age=90),
fake.random_element(elements=('M', 'F', 'O')),
fake.random_int(min=1000, max=999999),
fake.pyfloat(min_value=0, max_value=100, right_digits=2),
fake.boolean(),
)
data.append(record)
return data
# 限制并发数量
async def batch_insert_data(pool: asyncmy.Pool,
data: List[Tuple],
batch_size: int = 100,
max_concurrency: int = 5
):
semaphore = asyncio.Semaphore(max_concurrency)
async def insert_batch_with_semaphore(batch):
async with semaphore:
return await insert_batch(pool, batch)
tasks = []
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
tasks.append(insert_batch_with_semaphore(batch))
await asyncio.gather(*tasks)
async def insert_batch(pool: asyncmy.Pool, batch: List[Tuple]):
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
sql = """
INSERT INTO test_asyncmy (id, name, email, phone, address, birth_date, gender, random_int,
random_float, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) \
"""
await cursor.executemany(sql, batch)
await conn.commit()
async def main():
print("正在生成测试数据...")
test_data = generate_test_data(10000)
# 增大连接池并限制并发数
async with asyncmy.create_pool(**DB_CONFIG, maxsize=20) as pool:
print("\n开始测试异步批量插入性能...")
start_time = time.time()
# 限制最大并发数为5, 与连接池大小匹配
await batch_insert_data(pool, test_data, batch_size=100, max_concurrency=5)
total_time = time.time() - start_time
print(f"异步插入 {len(test_data)} 条数据, 耗时: {total_time:.4f} 秒")
if __name__ == "__main__":
asyncio.run(main())
并不会因为异步 + IO组合就能获得更好的性能, 但是换一种场景下就不一样: " MySQL性能大揭秘: 同步 vs 异步插入, 谁更快? " | 肥肥旭手记.
需要结合场景的合适选择方案, 而是如上所述的削足适履.
更多内容阅读扩展:
syrusakbary/promise: Ultra-performant Promise implementation in Python
7.5 one more thing
正如开头所说的这是探讨性能, 探讨fast
的文章, 下面看看和其他语言(JavaScript)的差距.
import random
import time
import string
def generate_random_strings(count, length, charset=string.ascii_letters + string.digits):
return [''.join(random.choice(charset) for _ in range(length)) for _ in range(count)]
data = generate_random_strings(10000, 9)
a = generate_random_strings(1000, 9)
sc = time.time()
r = [e in data for e in a]
print(time.time() - sc)
# 0.14486932754516602 * 1000 144.86ms
{
function generate_random_strings(count, length, charset = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') {
return Array.from({length: count}, () =>
Array.from({length: length}, () =>
charset.charAt(Math.floor(Math.random() * charset.length))
).join('')
);
}
const data = generate_random_strings(10000, 9);
const a = generate_random_strings(1000, 9);
console.time()
s = a.map(e => data.includes(e))
console.timeEnd()
}
// default: 34.965087890625 ms
两段相同的代码, 性能的差异, js
碾压式(浏览器环境执行)胜出.
再来看看在实际的请求处理, 由于遍历列表/数组太慢, 实际不会这样操作, 改成集合(set)
data = set(generate_random_strings(10000, 9))
npm install express cors fastify # express是另一个webframwork
npm i @fastify/cors # 组件需要额外安装
npm init -y # 初始化项目
nodejs
使用web框架为: Fastify - 快速并且低开销的 web 框架,专为 Node.js 平台量身打造 | Fastify中文网
启用http2
import Fastify from 'fastify';
import fs from 'fs';
import cors from '@fastify/cors';
// 启用http2支持
const app = Fastify({
http2: true,
https: {
key: fs.readFileSync('127.0.0.1-key.pem'),
cert: fs.readFileSync('127.0.0.1.pem'),
},
});
// 允许跨域
const origins = ["http://192.168.2.10:3002"];
await app.register(cors, {
origin: origins,
credentials: true,
methods: ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], // 允许的请求方法
});
const generate_random_string = (count, length) => {
const charset = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789',
charsetLength = charset.length;
return new Array(count).fill().map(() => new Array(length).fill().map(() => charset.charAt(Math.floor(Math.random() * charsetLength))).join(''))
}
const data = new Set(...generate_random_string(10000, 9));
// api接口
app.get('/key/:item', async (request, _reply) => {
const { item } = request.params;
return { data: data.has(item) };
});
// 启动服务器
const server = async () => {
try {
const configs = { port: 8012, host: '0.0.0.0' }
await app.listen(configs);
console.log(`server listening on https://${configs.host}:${configs.port} with HTTP/2 enabled`);
} catch (err) {
app.log.error(err);
process.exit(1);
}
};
server();
不设置workers
参数.
import random
import string
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI
app = FastAPI()
origins = [
"http://192.168.2.10:3002",
]
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def generate_random_strings(count, length, charset=string.ascii_letters + string.digits):
return [''.join(random.choice(charset) for _ in range(length)) for _ in range(count)]
data = set(generate_random_strings(10000, 9))
@app.get("/key/{item}")
async def test(item: str):
return {"data": item in data}
设置为32个请求(32并发)
const randomStrings = generateRandomStrings(32, 9);
console.time();
const urls = Array.from({length: 32}, (_, i) =>
`https://127.0.0.1:8000/key/${randomStrings[i]}`
);
const cr = new concurrentRequests(urls, 32);
const r = await cr.handle_fetch();
console.log(r);
console.timeEnd();
// default: 110.41015625 ms python - fastapi
// default: 131.510986328125 ms nodejs - fastify
在这一局中fastapi
为 python
扳回一局.