python封装mysql类(pymysql)
发表于:2019-10-30 10:30:55浏览:77次
安装 PyMySQL
pip3 install PyMySQL
官方包 https://pypi.org/project/PyMySQL/
封装类
文件路径: library/mysql.py
# -*- coding: utf-8 -*-
# @Author: yuchong
# @Date: 2019-10-29 15:43:55
import pymysql
class MysqlHelper:
'''python操作mysql的增删改查的封装'''
# 存储 pymysql.connect 对象
conn = None
# 存储 pymysql.connect.cursor()对象
cur = None
# 插入(insert)时返回自增主键字段的值
last_id = 0
def __init__(self, host, user, password, database, port=3306, charset='utf8'):
'''
初始化参数
:param host: 主机
:param user: 用户名
:param password: 密码
:param database: 数据库
:param port: 端口号,默认是3306
:param charset: 编码,默认是utf8
'''
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self.charset = charset
def _connect(self):
'''
获取连接对象和执行对象
:return:
'''
self.conn = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
port=self.port,
charset=self.charset
)
self.cur = self.conn.cursor()
def fetchone(self, sql, params=None):
'''
根据sql和参数获取一行数据
:param sql: sql语句
:param params: sql语句对象的参数元组,默认值为None
:return: 查询的一行数据
'''
dataOne = None
try:
self._connect()
count = self.cur.execute(sql, params)
if count != 0:
dataOne = self.cur.fetchone()
except Exception as ex:
print(ex)
finally:
self._close()
return dataOne
def fetchall(self, sql, params=None):
'''
根据sql和参数获取一行数据
:param sql: sql语句
:param params: sql语句对象的参数列表,默认值为None
:return: 查询的一行数据
'''
dataall = None
try:
self._connect()
count = self.cur.execute(sql, params)
if count != 0:
dataall = self.cur.fetchall()
except Exception as ex:
print(ex)
finally:
self._close()
return dataall
def insert(self, sql, params=None):
'''
执行新增
:param sql: sql语句
:param params: sql语句对象的参数列表,默认值为None
:return: 受影响的行数
'''
count = 0
try:
self._connect()
count = self.cur.execute(sql, params)
self.conn.commit()
self.last_id = self.cur.lastrowid
except Exception as ex:
print(ex)
finally:
self._close()
return count
def _execute(self, sql, params=None):
'''
执行删改
:param sql: sql语句
:param params: sql语句对象的参数列表,默认值为None
:return: 受影响的行数
'''
count = 0
try:
self._connect()
count = self.cur.execute(sql, params)
self.conn.commit()
except Exception as ex:
print(ex)
finally:
self._close()
return count
def update(self, sql, params=None):
'''
执行修改
:param sql: sql语句
:param params: sql语句对象的参数列表,默认值为None
:return: 受影响的行数
'''
return self._execute(sql, params)
def delete(self, sql, params=None):
'''
执行删除
:param sql: sql语句
:param params: sql语句对象的参数列表,默认值为None
:return: 受影响的行数
'''
return self._execute(sql, params)
def _close(self):
'''
关闭执行工具和连接对象
'''
if self.cur is not None:
self.cur.close()
if self.conn is not None:
self.conn.close()
使用
增
# -*- coding: utf-8 -*-
import time
from library.mysql import MysqlHelper
dbmysql = MysqlHelper(
'127.0.0.1', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
)
res = dbmysql.insert(
"insert into `user` `nickname`, `create_time`) values (%s, %s, %s)",
params=['admin', '管理员', int(time.time())]
)
print(res)
print(dbmysql.last_id)
删
# -*- coding: utf-8 -*-
from library.mysql import MysqlHelper
dbmysql = MysqlHelper(
'127.0.0.1', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
)
res = dbmysql.delete(
"delete from `user` where `uid`=%s",
params=[1]
)
print(res)
改
# -*- coding: utf-8 -*-
from library.mysql import MysqlHelper
dbmysql = MysqlHelper(
'127.0.0.1', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
)
res = dbmysql.delete(
"update `user` set `nickname`='andy' where `uid`=%s",
params=[2]
)
print(res)
查
# -*- coding: utf-8 -*-
from library.mysql import MysqlHelper
dbmysql = MysqlHelper(
'127.0.0.1', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
)
user = dbmysql.fetchone(
"select * from `user` where `uid`=%s",
params=[2]
)
print(user)
user_list = dbmysql.fetchall(
"select * from `user` limit 10"
)
print(user_list)
继承
使用user类来封装和继承mysql类
# -*- coding: utf-8 -*-
# @Author: yuchong
# @Date: 2019-10-29 17:08:00
from library.mysql import MysqlHelper
class User(MysqlHelper):
# 存储db类
db = None
# 存储表名
table_name = None
def __init__(self, table_name):
self.db = MysqlHelper(
'192.168.80.24', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
)
self.table_name = table_name
# :uid 用户id
# reutrn: 查询一个用户数据
def get_user_info(self, uid):
return self.db.fetchone(
"select * from `{table_name}` where `uid`=%s"
.format(table_name=self.table_name),
params=[int(uid)]
)
使用
# -*- coding: utf-8 -*-
from user import User
User = User('user')
# 常规查询
u = User.db.fetchone("select * from `user` where `uid`=%s", params=[2])
print(u)
# 使用 user类封装的方法
u = User.get_user_info(uid=2)
print(u)

