目录

PythonFlask-使用-DBUtils-创建通用连接池

Python&Flask 使用 DBUtils 创建通用连接池

DBUtils 是 Python 中一个轻量级且高效的数据库连接池工具,支持多种数据库

1、安装

首先,需要确保安装了 DBUtils 库以及你的数据库驱动,比如 pymysql


pip install DBUtils pymysql

2、创建连接池

以下是使用 DBUtils 创建连接池的基本步骤和示例代码:


from DBUtils.PooledDB import PooledDB
import pymysql

# 配置数据库连接参数
db_config = {
    'host': 'localhost',      # 数据库主机
    'user': 'your_username',  # 数据库用户名
    'password': 'your_password',  # 数据库密码
    'database': 'your_db',    # 数据库名称
    'charset': 'utf8mb4'      # 字符编码
}

# 创建连接池
pool = PooledDB(
    creator=pymysql,          # 使用 pymysql 驱动
    maxconnections=5,         # 最大连接数
    mincached=2,              # 初始化时的最小连接数
    maxcached=5,              # 连接池中最大的空闲连接数
    blocking=True,            # 当连接数耗尽时是否阻塞等待
    **db_config               # 数据库连接配置参数
)
2.1连接池配置参数详解
参数说明推荐值必填
creator数据库驱动模块pymysql/psycopg2等
maxconnections最大连接数10-100(根据服务器配置)
mincached初始空闲连接数2-5
maxcached最大空闲连接数5-10
maxshared最大共享连接数3-5
blocking连接池满时是否阻塞True(推荐)
ping连接检查方式0(不检查)/1(使用前检查)/2/3/4/7
setsession初始化SQL命令[‘SET NAMES utf8mb4’]
reset连接返回池时是否重置True/False

3、使用连接池


# 使用连接池获取连接
def perform_query():
    conn = pool.connection()  # 取连接
    try:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM your_table")  # 执行查询
        result = cursor.fetchall()
        print(result)
    finally:
        cursor.close()
        conn.close()  # 将连接归还连接池

# 执行数据库查询
perform_query()

4、Flask集成示例:


from flask import Flask, g
# g是 Flask 提供的请求级别的"临时全局存储",特别适合存储数据库连接、用户认证信息等需要在同一个请求的不同函数间共享的数据。

app = Flask(__name__)

# 初始化连接池
mysql_pool = PooledDB(
    creator=pymysql,
    maxconnections=20,
    host='localhost',
    user='flask_user',
    password='password',
    database='flask_db'
)

# 获取数据库连接
def get_db():
    if 'db' not in g:
        g.db = mysql_pool.connection()
    return g.db

# 关闭数据库连接
@app.teardown_appcontext
def close_db(error):
    db = g.pop('db', None)
    if db is not None:
        db.close()

# 路由示例
@app.route('/users/<int:user_id>')
def get_user(user_id):
    db = get_db()
    with db.cursor() as cursor:
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        user = cursor.fetchone()
    return {'user': user} if user else (