-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmysql_pool.py
105 lines (88 loc) · 3.89 KB
/
mysql_pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
------------------------------------------
@File : mysql_pool.py
@CreatedOn : 2022/9/16 9:52
------------------------------------------
MySQL 连接池
参考文章:https://blog.csdn.net/weixin_41447636/article/details/110453039
"""
from contextlib import contextmanager
import pymysql
from dbutils.pooled_db import PooledDB
from pymysql.cursors import DictCursor
class MySQLConnectionPool:
"""MySQL 基本功能封装 """
def __init__(self,
host: str,
port: int,
user: str,
passwd: str,
db: str):
# utf8mb4 是utf8的超集
self.__pool = self.gen_pool(host, port, user, passwd, db) # 返回类字典类型游标
@staticmethod
def gen_pool(host, port, user, passwd, db, charset='utf8'):
pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
mincached=0, # 初始化连接池时创建的连接数。默认为0,即初始化时不创建连接(建议默认0,假如非0的话,在某些数据库不可用时,整个项目会启动不了)
maxcached=0, # 池中空闲连接的最大数量。默认为0,即无最大数量限制(建议默认)
maxshared=0, # 池中共享连接的最大数量。默认为0,即每个连接都是专用的,不可共享(不常用,建议默认)
maxconnections=0, # 被允许的最大连接数。默认为0,无最大数量限制
blocking=True, # 连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。(建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接)
maxusage=0, # 连接的最大使用次数。默认0,即无使用次数限制。(建议默认)
reset=True, # 当连接返回到池中时,重置连接的方式。默认True,总是执行回滚
ping=1, # 确定何时使用ping()检查连接。默认1,即当连接被取走,做一次ping操作。0是从不ping,1是默认,2是当该连接创建游标时ping,4是执行sql语句时ping,7是总是ping
host=host,
port=port,
user=user,
passwd=passwd,
db=db,
charset=charset,
use_unicode=True
)
return pool
@property
@contextmanager
def pool(self):
_conn = None
_cursor = None
try:
_conn = self.__pool.connection()
_cursor = _conn.cursor()
yield _cursor
finally:
_conn.commit()
_cursor.close()
_conn.close()
def execute(self, sql, args=None):
with self.pool as cursor:
cursor.execute(sql, args)
def executemany(self, sql, args):
with self.pool as cursor:
cursor.executemany(sql, args)
def fetchall(self, sql, args=None):
with self.pool as cursor:
cursor.execute(sql, args)
return cursor.fetchall()
def fetchone(self, sql, args=None):
with self.pool as cursor:
cursor.execute(sql, args)
return cursor.fetchone()
def has_table(self, table_name: str) -> bool:
"""
该用户下是否存在表table_name
"""
sql = "SELECT count(*) total FROM information_schema.TABLES WHERE table_name =%(table_name)s"
arg = {'table_name': table_name}
return self.fetchone(sql, arg).get('total') == 1
def exist_data_by_kw(self, table_name: str, data: dict):
"""表table_name中是否存在where key = value的数据"""
k, = data
exist_sql = "select * from {0} where {1} = %({1})s"
sql = exist_sql.format(table_name, k)
f_data = self.fetchone(sql, data)
return f_data
def close(self):
self.__pool.close()
def __del__(self):
self.close()