一. 前言
在python
中操作MySQL
的包有很多, 以下是相对常见的.
库名称 | 扩展功能 | 适用场景 |
---|---|---|
PyMySQL | 基础连接池 | 通用中小型项目 |
mysql-connector | 官方支持/事务管理 | 需要官方驱动保障的场景 |
mysqlclient | 高性能 C 扩展 | 对性能要求极高的 OLTP 系统 |
SQLAlchemy | ORM/多数据库支持 | 复杂业务逻辑/需要快速开发的项目 |
DBUtils | 连接池管理 | 高并发数据库访问优化 |
个人相对亲睐官办的connector
, 但需要小心官版在异步挖的坑, 但是由于官办connector
出现的太迟以及mysql
背后的甲骨文不是那么讨人喜欢, 这个connector
反而不是热度那么高.
MySQL :: MySQL Connector/Python Developer Guide
1.1 mysql-connector-python
支持两种不同的类型的connector
.
>>> import mysql.connector.version as v
>>>
>>> print(v.VERSION)
(8, 0, 33, '', 1)
使用mysql installer
/或者pip
默认安装的还是传统版本.
$ pip install mysql-connector-python
1.1.1 xdevapi
from mysqlsh import mysqlx
# Connect to server
mySession = mysqlx.get_session( {
'host': 'localhost', 'port': 33060,
'user': 'user', 'password': 'password'} )
myDb = mySession.get_schema('test')
# Create a new collection 'my_collection'
myColl = myDb.create_collection('my_collection')
# Insert documents
myColl.add({ 'name': 'Laurie', 'age': 19 }).execute()
myColl.add({ 'name': 'Nadya', 'age': 54 }).execute()
myColl.add({ 'name': 'Lukas', 'age': 32 }).execute()
# Find a document
docs = myColl.find('name like :param1 AND age < :param2') \
.limit(1) \
.bind('param1','L%') \
.bind('param2',20) \
.execute()
# Print document
doc = docs.fetch_one()
print(doc)
# Drop the collection
myDb.drop_collection('my_collection')
对于使用过MongoDB
的应该对于上述的语法不会感到陌生.
或者是sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String,select
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import func
# 创建数据库连接
engine = create_engine('sqlite:///users_example.db')
Base = declarative_base()
# 定义模型类
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
# 创建数据表
Base.metadata.create_all(engine)
# 创建 Session 类
Session = sessionmaker(bind=engine)
session = Session()
# 事先清空数据
session.query(User).delete()
session.commit()
# 插入更多数据
users_data = [
{'name': 'Charlie', 'age': 28},
{'name': 'David', 'age': 32},
{'name': 'Eve', 'age': 27},
{'name': 'gemm', 'age': 43},
{'name': 'riyu', 'age': 43}
]
for user_data in users_data:
user = User(user_data)
session.add(user)
session.commit()
# 查询数据
query = session.query(User)
# 使用 filter() 方法添加过滤条件
result = query.filter(User.age > 25).all()
print("年龄大于25的用户: ", [(user.name, user.age) for user in result])
# 使用 func.group_concat() 函数将同一年龄下的用户名合并成一个字符串
result = session.query(User.age, func.group_concat(User.name)).group_by(User.age).all()
print("按年龄分组查询: ")
for age, names in result:
name_list = names.split(',')
print(f"年龄 {age} 的用户有: {', '.join(name_list)}")
result = query.order_by(User.age).all()
print("按年龄排序: ", [(user.name, user.age) for user in result])
# 统计查询结果的数量
count = query.count()
print("查询结果的数量: ", count)
# 删除满足条件的记录
session.query(User).filter(User.name == 'Charlie').delete()
session.commit()
# 更新满足条件的记录
session.query(User).filter(User.name == 'David').update({User.age: 35})
session.commit()
# 执行 SELECT 查询
# 使用 select(User) 会创建一个 SELECT 查询, 查询的对象是 User 这个表格( 模型类) . 这意味着你将检索 User 表中的所有列
with engine.connect() as connection:
stmt = select(User).where(User.age > 25)
result = connection.execute(stmt)
for row in result:
print(row)
python
运行
在js
中, 这种api
的使用更为"自然"
const mysqlx = require('@mysql/xdevapi');
const config = { collection: 'myCollection', schema: 'mySchema', user: 'root' };
mysqlx.getSession({ user: config.user })
.then(session => {
const schema = session.getSchema(config.schema);
return schema.existsInDatabase()
.then(exists => {
if (exists) {
return schema;
}
return session.createSchema(config.schema);
})
.then(schema => {
return schema.createCollection(config.collection, { reuseExisting: true });
})
.then(collection => {
return collection.add([{ name: 'foo', age: 42 }])
.execute()
.then(() => {
return collection.find()
.fields('name', 'age')
.execute();
})
.then(res => {
console.log(res.fetchOne()); // { name: 'foo', age: 42 }
})
.then(() => {
return collection.modify('age = :value')
.bind('value', 42)
.set('name', 'bar')
.execute();
})
.then(() => {
return collection.find()
.fields('name', 'age')
.execute();
})
.then(res => {
console.log(res.fetchOne()); // { name: 'bar', age: 42 }
})
.then(() => {
return collection.remove('true')
.execute();
})
.then(() => {
return collection.find()
.fields('name', 'age')
.execute();
})
.then(res => {
console.log(res.fetchAll()); // []
});
})
.then(() => {
return schema.dropCollection(config.collection);
})
.then(() => {
return session.dropSchema(config.schema);
})
.then(() => {
return session.close();
});
});
二. ORM
ORM框架详解: 为什么不直接写SQL? _orm和sql-CSDN博客
一般而言, 相对于传统的sql
语句, orm
的相对优势如下:
-
开发效率革命
-
自动生成 SQL: ORM 通过对象操作自动映射为 SQL 语句( 如
user.save()
生成INSERT
) , 减少 80%+ 的手写 SQL 量. -
CRUD 简化
# ORM 方式 user = User.query.filter_by(name="Alice").first() user.email = "new@email.com" db.session.commit() # 对比原生 SQL cursor.execute("UPDATE users SET email = %s WHERE name = %s", ("new@email.com", "Alice")) conn.commit()
但是假如你的
sql
语句不单止在python
代码中使用, 而是需要在其他地方也是用呢?
-
-
代码可维护性飙升
- 业务逻辑聚焦: 将数据库操作封装为对象方法, 使代码更贴近业务语义( 如
order.calculate_total()
而非手写聚合函数) . - 降低耦合度: 数据库表结构变更时, 只需修改 ORM 模型, 无需全局搜索替换 SQL 字符串.
- 业务逻辑聚焦: 将数据库操作封装为对象方法, 使代码更贴近业务语义( 如
-
跨数据库兼容性
- 无缝切换数据库: 通过修改配置即可从 MySQL 迁移到 PostgreSQL, ORM 自动处理方言差异( 如
LIMIT
vsTOP
) . - 多数据库支持: 同一项目可同时操作 MySQL, SQLite 等多种数据库( 适合 SaaS 多租户场景) .
- 无缝切换数据库: 通过修改配置即可从 MySQL 迁移到 PostgreSQL, ORM 自动处理方言差异( 如
-
安全性强化
-
防 SQL 注入: ORM 默认使用参数化查询, 自动转义特殊字符, 彻底杜绝
' OR 1=1 --
这类注入攻击.SELECT * FROM article WHERE id = -1 OR 1=1;
-
权限控制: 可通过 ORM 层统一管理字段级权限( 如禁止直接修改
admin
字段) .
-
-
学习曲线平缓
- 面向对象思维: 开发者无需精通 SQL 语法即可操作数据库, 降低新手入门门槛.
- 渐进式学习: 团队可先使用 ORM 快速开发, 后期再针对性能瓶颈优化特定 SQL.
场景 | 推荐方案 | 典型案例 |
---|---|---|
快速原型开发 | 纯 ORM | MVP 产品验证, 内部工具开发 |
中大型企业应用 | ORM + 原始 SQL 混合 | 核心业务用 ORM, 报表查询用手写 SQL |
数据密集型应用 | 原始 SQL 为主 | 金融风控系统, 实时数据分析平台 |
多数据库支持需求 | ORM | SaaS 多租户系统, 开源项目 |
对于orm
不需要神话, 按需使用, 对需要反复和数据打交道的, 还是以传统的sql
语句为主(最简单的, 假如需要在navicat上使用), 不需要强行为了实现而实现, 与其缘木求鱼, 不如适时而变.
三. 异步
和MySQL
的官办connector
一样, 这个新的特性几乎在简中互联网几乎没有存在感, 那怕是最喜欢偷东西的csdn
也没有检索到相关信息.
这个新特性在2024
年1月份就增加了, 但是在搜索引擎中只有零星的相关内容, mysql
的开发者还是这么慢腾腾...未来不可期啊!.
3.1 MySQL Installer注意事项
MySQL Asynchronous Connectivity with MySQL Connector/Python
需要注意的是使用MySQL installer
安装的connector
版本并不是当前最新版本的, 而是大概率是对应MySQL server
的版本的connector
.
虽然版本发布时, connector
已经存在更新的版本.
而且这个版本已经开始支持异步这个特性.
- Added support for asynchronous execution as per the Python Database API Specification v2.0 (PEP 249). The MySQL Connector/Python asyncio implementation allows non-blocking asynchronous interaction with a MySQL server using a new package named
mysql.connector.aio
. This new package only supports the pure Python implementation.The
mysql.connector.aio
package is fully compatible with the existingmysql.connector
package implementation, and exposes the same interface but with the ability to use asynchronous execution. Asynchronous methods return asyncio coroutines that can await results. (WL #15523)
但是安装的还是老版本的.
3.2 异步操作MySQL库
基于gemini.google.com生成.
Package Name | Description | Key Features |
---|---|---|
mysql.connector.aio |
The official asynchronous extension for mysql-connector-python , provided by Oracle. It's part of the standard mysql-connector-python installation. |
Pure Python implementation, integrates with asyncio , official support. |
aiomysql |
A library for accessing MySQL databases from the asyncio framework. It's based on PyMySQL and aims for an API similar to aiopg . |
Reuses PyMySQL components, supports connection pooling, SQLAlchemy integration. |
asyncmy |
A fast asyncio MySQL/MariaDB driver. It reuses most of PyMySQL and aiomysql but rewrites the core protocol with Cython for speed. |
Focus on performance (Cython optimized), API compatible with aiomysql , supports MySQL replication protocol with asyncio . |
SQLAlchemy (asyncio) |
While not a direct MySQL driver, SQLAlchemy (version 1.4+) provides an asyncio extension that works with asynchronous database drivers like aiomysql or asyncmy . |
ORM capabilities, works with various async drivers, consistent API for different databases. |
(通义生成)
通义, 百度, ....通义这什么鬼!(均开启深度思考)
(百度生成)
豆包还正常
场景 | 推荐库 |
---|---|
高性能原生 SQL | aiomysql/asyncmy |
ORM 偏好 | peewee-async |
复杂业务逻辑 | SQLAlchemy Async |
轻量级部署 / 跨平台 | asyncmy |
3.3 mysql.connector.aio坑
先来看看pymongo
的异步支持, mg
不支持事务.
import asyncio
import time
from pymongo import AsyncMongoClient, MongoClient
from faker import Faker
from typing import List,Dict
def generate_test_data(rows: int) -> List[dict]:
fake = Faker()
data = []
for _ in range(rows):
record = {
"id": fake.uuid4(),
"name": fake.name(),
"email": fake.email(),
"phone": fake.phone_number(),
"address": fake.address().replace('\n', ', '),
"birth_date": fake.date_of_birth(minimum_age=18, maximum_age=90).isoformat(),
"gender": fake.random_element(elements=('M', 'F', 'O')),
"random_int": fake.random_int(min=1000, max=999999),
"random_float": fake.pyfloat(min_value=0, max_value=100, right_digits=2),
"is_active": fake.boolean(),
}
data.append(record)
return data
async def exe(semaphore, table, data):
async with semaphore:
await table.insert_many(data)
async def main():
host = "localhost"
port = 27017
data = generate_test_data(10000)
try:
client = AsyncMongoClient(host, port, maxPoolSize=50)
semaphore = asyncio.Semaphore(25)
table = client['test']['test_table1']
batch_size = 100
batches = [data[i:i + batch_size] for i in range(0, len(data), batch_size)]
s = time.time()
await asyncio.gather(*[exe(semaphore, table, batch) for batch in batches])
print(time.time() - s)
except Exception as error:
print(error)
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())
table.insert_many
关键的一步, 查看源码, 返回的是独立的实例化对象InsertManyResult
await blk.execute(write_concern, session, _Op.INSERT)
return InsertManyResult(inserted_ids, write_concern.acknowledged)
但在MySQL
这里则不行.
import time
import mysql.connector as sync_c
from mysql.connector import Error
from mysql.connector.aio import connect as async_c
from faker import Faker
from typing import List, Tuple
# 配置数据库连接信息
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'database': 'test',
'port': 3306,
}
# 生成测试数据
def generate_test_data(rows: int) -> List[Tuple]:
fake = Faker()
data = []
for _ in range(rows):
record = (
fake.uuid4(), # UUID
fake.name(), # 姓名
fake.email(), # 邮箱
fake.phone_number(), # 电话
fake.address().replace('\n', ', '), # 地址
fake.date_of_birth(minimum_age=18, maximum_age=90), # 生日
fake.random_element(elements=('M', 'F', 'O')), # 性别
fake.random_int(min=1000, max=999999), # 随机整数
fake.pyfloat(min_value=0, max_value=100, right_digits=2), # 随机浮点数
fake.boolean(), # 布尔值
)
data.append(record)
return data
async def insert_async(conn, cursor, batch, lock =None):
sql = """
INSERT INTO test_async (id, name, email, phone, address, birth_date, gender, random_int,
random_float, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) \
"""
await cursor.executemany(sql, batch)
await conn.commit()
# 使用 mysqlclient 批量插入
async def test_async_bulk_insert(data: List[Tuple], batch_size: int = 100):
try:
conn = await async_c(**DB_CONFIG)
cursor = await conn.cursor()
# lock = asyncio.Lock()
# 分批插入数据
start_time = time.time()
tasks = [
insert_async(conn, cursor, data[i:i + batch_size]) for i in range(0, len(data), batch_size)
]
await asyncio.gather(*tasks)
total_time = time.time() - start_time
print(f"mysqlclient 插入 {len(data)} 条数据, 耗时: {total_time:.4f} 秒")
except Exception as e:
print(f"mysqlclient 插入错误: {e}")
finally:
if conn:
await cursor.close()
await conn.close()
if __name__ == "__main__":
# 生成10000行测试数据
import asyncio
print("正在生成测试数据...")
test_data = generate_test_data(10000)
# 执行测试
print("\n开始测试 mysqlclient 批量插入性能...")
asyncio.run(test_async_bulk_insert(test_data, batch_size=100))
Failed processing format-parameters; read() called while another coroutine is already waiting for incoming data
将出现上述的错误, 来看源码
cursor.executemany()
await self.execute(operation, params)
if self.with_rows and self._have_unread_result():
await self.fetchall()
由于cursor
属于共享内存变量, 这里出现竞态, 导致的异常.
async def insert_async(conn, cursor, batch, lock):
async with lock:
sql = """
INSERT INTO test_async (id, name, email, phone, address, birth_date, gender, random_int,
random_float, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) \
"""
await cursor.executemany(sql, batch)
await conn.commit()
尽管可以通过异步锁来解决上述问题, 但是异步也将变成同步代码.
这时候会想到, 能否用连接池来解决呢? 在web
环境中的解决方案.
import asyncio
import asyncmy
from faker import Faker
from typing import List, Tuple
import time
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'database': 'test',
'port': 3306,
}
def generate_test_data(rows: int) -> List[Tuple]:
fake = Faker()
data = []
for _ in range(rows):
record = (
fake.uuid4(),
fake.name(),
fake.email(),
fake.phone_number(),
fake.address().replace('\n', ', '),
fake.date_of_birth(minimum_age=18, maximum_age=90),
fake.random_element(elements=('M', 'F', 'O')),
fake.random_int(min=1000, max=999999),
fake.pyfloat(min_value=0, max_value=100, right_digits=2),
fake.boolean(),
)
data.append(record)
return data
# 限制并发数量
async def batch_insert_data(pool: asyncmy.Pool,
data: List[Tuple],
batch_size: int = 100,
max_concurrency: int = 5
):
semaphore = asyncio.Semaphore(max_concurrency)
async def insert_batch_with_semaphore(batch):
async with semaphore:
return await insert_batch(pool, batch)
tasks = []
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
tasks.append(insert_batch_with_semaphore(batch))
await asyncio.gather(*tasks)
async def insert_batch(pool: asyncmy.Pool, batch: List[Tuple]):
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
sql = """
INSERT INTO test_asyncmy (id, name, email, phone, address, birth_date, gender, random_int,
random_float, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) \
"""
await cursor.executemany(sql, batch)
await conn.commit()
async def main():
print("正在生成测试数据...")
test_data = generate_test_data(10000)
# 增大连接池并限制并发数
async with asyncmy.create_pool(**DB_CONFIG, maxsize=20) as pool:
print("\n开始测试异步批量插入性能...")
start_time = time.time()
# 限制最大并发数为5, 与连接池大小匹配
await batch_insert_data(pool, test_data, batch_size=100, max_concurrency=5)
total_time = time.time() - start_time
print(f"异步插入 {len(test_data)} 条数据, 耗时: {total_time:.4f} 秒")
if __name__ == "__main__":
asyncio.run(main())
但, mysql-conntor
却是这样的
async def connect(*args: Any, **kwargs: Any) -> MySQLConnectionAbstract:
源码的开头有这样的注释
When any connection pooling arguments are given, for example
pool_name
orpool_size
, a pool is created or a previously one is used to return
aPooledMySQLConnection
.
实际是这样的
import mysql.connector.aio as cpy_async
from faker import Faker
from typing import List, Tuple
import asyncio
import time
def generate_test_data(rows: int) -> List[Tuple]:
fake = Faker()
data = []
for _ in range(rows):
record = (
fake.uuid4(),
fake.name(),
fake.email(),
fake.phone_number(),
fake.address().replace('\n', ', '),
fake.date_of_birth(minimum_age=18, maximum_age=90),
fake.random_element(elements=('M', 'F', 'O')),
fake.random_int(min=1000, max=999999),
fake.pyfloat(min_value=0, max_value=100, right_digits=2),
fake.boolean(),
)
data.append(record)
return data
async def exe(semaphore, con, table, data):
sql = """
INSERT INTO test_mysqlconnector (id, name, email, phone, address, birth_date, gender, random_int, \
random_float, is_active) \
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) \
"""
async with semaphore:
await table.execute(sql, data)
await con.commit()
async def main():
db_config = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'database': 'test',
'port': 3306,
'pool_size': 20
}
con = await cpy_async.connect(**db_config)
cursor = await con.cursor()
data = generate_test_data(1000)
semaphore = asyncio.Semaphore(10)
batch_size = 100
batches = [data[i:i + batch_size] for i in range(0, len(data), batch_size)]
try:
s = time.time()
await asyncio.gather(*[exe(semaphore, con, cursor, batch) for batch in batches])
print(time.time() - s)
except Exception as ex:
print(ex)
finally:
if con:
await cursor.close()
await con.close()
if __name__ == '__main__':
asyncio.run(main())
TypeError: MySQLConnectionAbstract.__init__() got an unexpected keyword argument 'pool_size'
根本不支持这个pool_size
参数.
翻遍了MySQL
的文档, 没有看到关于异步状态下的多连接池的使用示例.
四. 简单测试
分别对mysqlclient
和 mysql-connector
进行简单的性能测试.
CREATE TABLE
IF NOT EXISTS test_mysqlconnector (
id VARCHAR (36) PRIMARY KEY,
NAME VARCHAR (100),
email VARCHAR (100),
phone VARCHAR (32),
address TEXT,
birth_date DATE,
gender ENUM ('M', 'F', 'O'),
random_int INT,
random_float FLOAT,
is_active BOOLEAN
)
CREATE TABLE IF NOT EXISTS test_mysqlclient (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
phone VARCHAR(32),
address TEXT,
birth_date DATE,
gender ENUM('M', 'F', 'O'),
random_int INT,
random_float FLOAT,
is_active BOOLEAN
)
使用faker
随机生成10组数据.
import time
import random
import mysql.connector
from mysql.connector import Error
import MySQLdb
from faker import Faker
from typing import List, Dict, Tuple
# 配置数据库连接信息
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'database': 'test',
'port': 3306,
}
# 生成测试数据
def generate_test_data(rows: int) -> List[Tuple]:
fake = Faker()
data = []
for _ in range(rows):
record = (
fake.uuid4(), # UUID
fake.name(), # 姓名
fake.email(), # 邮箱
fake.phone_number(), # 电话
fake.address().replace('\n', ', '), # 地址
fake.date_of_birth(minimum_age=18, maximum_age=90), # 生日
fake.random_element(elements=('M', 'F', 'O')), # 性别
fake.random_int(min=1000, max=999999), # 随机整数
fake.pyfloat(min_value=0, max_value=100, right_digits=2), # 随机浮点数
fake.boolean(), # 布尔值
)
data.append(record)
return data
# 使用 mysqlclient 批量插入
def test_mysqlclient_bulk_insert(data: List[Tuple], batch_size: int = 100):
try:
conn = MySQLdb.connect(**DB_CONFIG)
cursor = conn.cursor()
# 分批插入数据
sql = """
INSERT INTO test_mysqlclient (
id, name, email, phone, address, birth_date, gender, random_int, random_float, is_active
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
start_time = time.time()
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
cursor.executemany(sql, batch)
conn.commit()
total_time = time.time() - start_time
print(f"mysqlclient 插入 {len(data)} 条数据, 耗时: {total_time:.4f} 秒")
except Exception as e:
print(f"mysqlclient 插入错误: {e}")
finally:
if conn:
cursor.close()
conn.close()
# 使用 mysql-connector 批量插入
def test_mysql_connector_bulk_insert(data: List[Tuple], batch_size: int = 100):
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
# 分批插入数据
sql = """
INSERT INTO test_mysqlconnector (
id, name, email, phone, address, birth_date, gender, random_int, random_float, is_active
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
start_time = time.time()
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
cursor.executemany(sql, batch)
conn.commit()
total_time = time.time() - start_time
print(f"mysql-connector 插入 {len(data)} 条数据, 耗时: {total_time:.4f} 秒")
except Error as e:
print(f"mysql-connector 插入错误: {e}")
finally:
if conn:
cursor.close()
conn.close()
if __name__ == "__main__":
# 生成10000行测试数据
print("正在生成测试数据...")
test_data = generate_test_data(10000)
# 执行测试
print("\n开始测试 mysqlclient 批量插入性能...")
test_mysqlclient_bulk_insert(test_data, batch_size=100)
print("\n开始测试 mysql-connector 批量插入性能...")
test_mysql_connector_bulk_insert(test_data, batch_size=100)
随机生成10000 组数据, 分成100组, 分批次插入.
还是存在一定的性能差异, mysqlclient
略胜一筹, 在这个简单测试中.
需要注意mysqlclient
的安装.
直接使用whl
安装, 否则可能出现错误.
五. 小结
假如不是需要考虑特别的场景, 如高性能, 异步, 可以灵活根据需要使用connector
.