您的当前位置:首页>全部文章>文章详情

python封装mysql类(pymysql)

发表于:2019-10-30 10:30:55浏览:77次TAG: #python

安装 PyMySQL

pip3 install PyMySQL

官方包 https://pypi.org/project/PyMySQL/

封装类

文件路径: library/mysql.py

# -*- coding: utf-8 -*-
# @Author: yuchong
# @Date:   2019-10-29 15:43:55

import pymysql


class MysqlHelper:
    '''python操作mysql的增删改查的封装'''

    # 存储 pymysql.connect 对象
    conn = None
    # 存储 pymysql.connect.cursor()对象
    cur = None
    # 插入(insert)时返回自增主键字段的值
    last_id = 0

    def __init__(self, host, user, password, database, port=3306, charset='utf8'):
        '''
        初始化参数
        :param host: 主机
        :param user: 用户名
        :param password: 密码
        :param database: 数据库
        :param port: 端口号,默认是3306
        :param charset: 编码,默认是utf8
        '''
        self.host = host
        self.port = port
        self.database = database
        self.user = user
        self.password = password
        self.charset = charset

    def _connect(self):
        '''
        获取连接对象和执行对象
        :return:
        '''
        self.conn = pymysql.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            database=self.database,
            port=self.port,
            charset=self.charset
        )

        self.cur = self.conn.cursor()

    def fetchone(self, sql, params=None):
        '''
        根据sql和参数获取一行数据
        :param sql: sql语句
        :param params: sql语句对象的参数元组,默认值为None
        :return: 查询的一行数据
        '''
        dataOne = None
        try:
            self._connect()
            count = self.cur.execute(sql, params)
            if count != 0:
                dataOne = self.cur.fetchone()
        except Exception as ex:
                print(ex)
        finally:
                self._close()
        return dataOne

    def fetchall(self, sql, params=None):
        '''
        根据sql和参数获取一行数据
        :param sql: sql语句
        :param params: sql语句对象的参数列表,默认值为None
        :return: 查询的一行数据
        '''
        dataall = None
        try:
            self._connect()
            count = self.cur.execute(sql, params)
            if count != 0:
                dataall = self.cur.fetchall()
        except Exception as ex:
            print(ex)
        finally:
            self._close()
        return dataall

    def insert(self, sql, params=None):
        '''
        执行新增
        :param sql: sql语句
        :param params: sql语句对象的参数列表,默认值为None
        :return: 受影响的行数
        '''
        count = 0
        try:
            self._connect()
            count = self.cur.execute(sql, params)
            self.conn.commit()
            self.last_id = self.cur.lastrowid
        except Exception as ex:
            print(ex)
        finally:
            self._close()
        return count

    def _execute(self, sql, params=None):
        '''
        执行删改
        :param sql: sql语句
        :param params: sql语句对象的参数列表,默认值为None
        :return: 受影响的行数
        '''
        count = 0
        try:
            self._connect()
            count = self.cur.execute(sql, params)
            self.conn.commit()
        except Exception as ex:
            print(ex)
        finally:
            self._close()
        return count

    def update(self, sql, params=None):
        '''
        执行修改
        :param sql: sql语句
        :param params: sql语句对象的参数列表,默认值为None
        :return: 受影响的行数
        '''
        return self._execute(sql, params)

    def delete(self, sql, params=None):
        '''
        执行删除
        :param sql: sql语句
        :param params: sql语句对象的参数列表,默认值为None
        :return: 受影响的行数
        '''
        return self._execute(sql, params)

    def _close(self):
        '''
        关闭执行工具和连接对象
        '''
        if self.cur is not None:
            self.cur.close()

        if self.conn is not None:
            self.conn.close()

使用

# -*- coding: utf-8 -*-
import time
from library.mysql import MysqlHelper

dbmysql = MysqlHelper(
    '127.0.0.1', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
)

res = dbmysql.insert(
    "insert into `user`  `nickname`, `create_time`) values (%s, %s, %s)",
    params=['admin', '管理员', int(time.time())]
)
print(res)
print(dbmysql.last_id)

# -*- coding: utf-8 -*-
from library.mysql import MysqlHelper

dbmysql = MysqlHelper(
    '127.0.0.1', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
)

res = dbmysql.delete(
    "delete from `user` where `uid`=%s",
    params=[1]
)
print(res)

# -*- coding: utf-8 -*-
from library.mysql import MysqlHelper

dbmysql = MysqlHelper(
    '127.0.0.1', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
)

res = dbmysql.delete(
    "update `user` set `nickname`='andy' where `uid`=%s",
    params=[2]
)
print(res)

# -*- coding: utf-8 -*-
from library.mysql import MysqlHelper

dbmysql = MysqlHelper(
    '127.0.0.1', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
)

user = dbmysql.fetchone(
    "select * from `user` where `uid`=%s",
    params=[2]
)
print(user)

user_list = dbmysql.fetchall(
    "select * from `user` limit 10"
)
print(user_list)

继承

使用user类来封装和继承mysql类

# -*- coding: utf-8 -*-
# @Author: yuchong
# @Date:   2019-10-29 17:08:00
from library.mysql import MysqlHelper


class User(MysqlHelper):
    # 存储db类
    db = None
    # 存储表名
    table_name = None

    def __init__(self, table_name):
        self.db = MysqlHelper(
            '192.168.80.24', 'root', '123456', 'yuchong_test', charset='utf8', port=3306
        )
        self.table_name = table_name

    # :uid 用户id
    # reutrn: 查询一个用户数据
    def get_user_info(self, uid):
        return self.db.fetchone(
            "select * from `{table_name}` where `uid`=%s"
            .format(table_name=self.table_name),
            params=[int(uid)]
        )

使用

# -*- coding: utf-8 -*-
from user import User

User = User('user')

# 常规查询
u = User.db.fetchone("select * from `user` where `uid`=%s", params=[2])
print(u)

# 使用 user类封装的方法
u = User.get_user_info(uid=2)
print(u)
栏目分类全部>
腾讯云采购季云服务器一折促销