August Rush

一个还在努力成长的小火汁!

游龙当归海,海不迎我自来也。

We create our own demons.

You can reach me at augustrush0923@gmail.com
SQLAlchemy ORM 进阶
发布:2022年02月23日 | 作者:augustrush | 阅读量: 966

相关模块

# 创建连接相关
from sqlalchemy import create_engine

# 和 sqlapi 交互,执行转换后的 sql 语句,用于创建基类
from sqlalchemy.ext.declarative import declarative_base

# 创建表中的字段(列)
from sqlalchemy import Column

# 表中字段的属性
from sqlalchemy import Column, String, Integer, BigInteger, Boolean, Text, SmallInteger, Float, DateTime, Date
from sqlalchemy import UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship

创建连接对象,并使用 pymysql 引擎

conn_str = '{mysql_conn}://{username}:{password}@{host}/{dbname}?charset=utf8&autocommit=true'.format(
    mysql_conn=settings.DB_CONN if settings.DB_CONN else 'mysql',
    username=settings.DB_USERNAME,
    password=settings.DB_PASSWORD,
    host=settings.DB_HOST,
    dbname=settings.DB_NAME)

engine = create_engine(conn_str, pool_size=50, max_overflow=50,
                       pool_recycle=3600, echo_pool=False, echo=settings.DB_ECHO)

创建表

# 创建基类
Base = declarative_base()

# 创建单表
class Person(Base):
    __tablename__ = 'person' # 表名 - 必有属性
    id = Column(Integer, primary_key=True) # 每个表都要设定一个主键 - 必有属性

    # 设置单表字段
    name = Column(String(32))
    age = Column(Integer, default=0, nullable=True)  # 设置字段属性
    __table_args__ = (
    # 设置联合唯一
    UniqueConstraint('id', 'name', name='uix_id_name'),

    # 建立索引   
    Index('uix_id_name', 'name'),
    )


def int_db():
    """创建所有定义的表到数据库中"""
    Base.metadata.create_all(engine)


def drop_db():
    """从数据库中删除所有定义的表"""
    Base.metadata.drop_all(engine)


if __name__ == "__main__":
    # 执行创建表
    #init_db()

    # 创建会话实例对象
    Session = sessionmaker(bind=engine)
    session = Session()    

查询数据

  • session.query(类名/表名): 返回的是对象

  • session.query(类名.字段名): 返回的是含有该字段的元组对象

# 所有数据,且结果集中是一个一个的对象
ret = session.query(Person).all()
# 结果 [obj1, obj2, obj3]

#  指定字段查询,返回所有的数据,是一个列表,列表内是一个一个的元组
ret = session.query(Person.name, Person.age).all()
# 结果 [('augustrush', '18'), ('taylorswift', '19'), ('Jaychou', '23')]

迭代查询结果集

for name, age in session.query(Person.name, Person.age).all():
    print(name, age)
'''
# 输出结果
yangge 18
qiangge 19
shark 23
'''

给列起别名

可以使用label()给每个列名起别名

for row in session.query(Person.name.label('p_name')).all():
    print(row.p_name)

条件查询

filter_by()接收的是关键字参数, filter_by本质上最后还是要调用filter。

filter()允许使用Python的比较或关系运算符,实现更灵活的查询

# filter_by()
ret = session.query(Person).filter_by(name='yangge').first()
# 结果 Person(id=2, name=yangge, age=18, city=BeiJing)

# filter()
ret = session.query(Person).filter(Person.name=='yangge').first()
# 结果 Person(id=2, name=yangge, age=18, city=BeiJing)

关系运算符的查询

以下适用于filter()

query = session.query(Person)

以下查询都是以这个查询对象为基础的过滤

相等

results = query.filter(Person.name == 'august rush').all()

不相等

results = query.filter(Person.name != 'august rush').all()

LIKE

在某些数据库中,这个可能会不区分大小写,也有可能区分大小写。

results = query.filter(Person.name.like('%august%')).all()

ILIKE

确保忽略大小写, 大部分数据库不支持 ilike

results = query.filter(Person.name.ilike('%AUGUST%')).all()

IN

results = query.filter(Person.id.in_([1, 2])).all()

NOT IN

使用波浪号~ 表示非

results = query.filter(~Person.id.in_([2, 3])).all()

BETWEEN

使用 between 表示范围

results = query.filter(Person.id.between(1, 3)).all()  # 在这个范围内

results = query.filter(~Person.id.between(1, 3)).all()  # 不在这个范围内

IS NULL

数据库中的空字符串不是 NUll , python 中的 None 存到数据库中是 NULL。

results = query.filter(Person.name == None).all()
# 或者
results = query.filter(Person.name.is_(None)).all()

IS NOT NULL

results = query.filter(Person.name != None).all()
# 或者
results = query.filter(~Person.name.is_(None)).all()
# 或者
results = query.filter(Person.name.isnot(None)).all()

AND

# 使用 and_()
from sqlalchemy import and_
results = query.filter(and_(Person.name == 'august rush', Person.age == 23)).all()

# 或者使用逗号
query.filter(Person.name == 'august rush', Person.age == 23).all()

OR

from sqlalchemy import or_
results = query.filter(or_(Person.name == 'august rush', Person.age == 33)).all()

AND 和 OR 的综合使用

results = query.filter(or_(
    Person.id > 1,
    and_(Person.name == 'august rush', Person.id < 10)
)).all()

控制返回的查询结果集

all() 返回的是所有的结果集,是列表

first() 返回的是所有结果集中的第一个数据

ret = session.query(Person).all()
# 结果 [obj1,obj2,obj3]

ret = session.query(Person).first()
# 结果 obj1

one

​提取结果集中的所有数据,假如没有或者数据多于一条则会报错

​找到后返回的是一个元组

try:
    results = query.filter(Person.id == 3).one()
    print(results)
except sqlalchemy.exc.MultipleResultsFound as e:
    print(e)
except sqlalchemy.exc.NoResultFound as e:
    print(e)

one_or_none

和 one() 一样,但是没找到返回 None

try:
    results = query.filter(Person.id == 3).one_or_none()
    print(results)
except sqlalchemy.exc.MultipleResultsFound as e:
    print(e)

scalar

​scalar() 调用 one() 方法,找不到,返回 None

​找到后返回的是赤裸裸的数据

try:
    results = query.filter(Person.id > 0).scalar()
    print(results)
except sqlalchemy.exc.MultipleResultsFound as e:
    print(e)

limit

使用 python 的切片控制输出多少行

results = query.all()[0:1]
print(results)

order by 排序

# 正序
results = query.order_by(Person.id).all()

# 倒序
results = query.order_by(Person.id.desc()).all()

# 先按名字排序,假如有相同的再按照 id 排序
results = query.order_by(Person.name, Person.id).all()

count 统计

count = query.count()

session.query(Person).filter(Person.age=='18').count()

嵌套的查询

results = query.filter(Person.id.in_(
    session.query(Person.id).filter(Person.id > 0)
)).all()

分组统计查询

from sqlalchemy.sql import func
# 统计表中所有的数据

results = session.query(func.count('*')).select_from(Person).first()

# 以年龄分组,并统计每组的数据数量
results = session.query(func.count(Person.age)).group_by(Person.age).all()

# 以年龄为分组,并统计每组的最大/最小 id 号,年龄总和/平均值,
results = session.query(func.max(Person.id), func.min(Person.id), func.sum(Person.age), func.avg(Person.age))\
     .group_by(Person.age).all()

# 从分组的数据中再查找需要的数据
results = session.query(func.max(Person.id), func.min(Person.id), func.sum(Person.age), func.avg(Person.age))\
    .group_by(Person.age).having(func.min(Person.id) > 1).all()

组合

# 再创建一个表
class Student(Base):
    __tablename__ = 'student'
    id = Column(Integer,primary_key=True)
    name = Column(String(12))
    age = Column(String(2))
    city = Column(String(16))

# 组合  用一条数据将两个表中的要查询的数据组合在一张表里展示出来
q1 = session.query(Teacher.name).filter(Teacher.id > 2)
q2 = session.query(Student.name).filter(Student.id < 2)
## 去重
ret = q1.union(q2).all()
## 不去重 
q1 = session.query(Teacher.name).filter(Teacher.id > 2)
q2 = session.query(Student.name).filter(Student.id < 2)
ret = q1.union_all(q2).all()

关于union联合查询有一个说法很形象:join查询就像是横向扩展,将多张表的数据横向组合在一起,而union像是纵向扩展,将多张表数据纵向排列起来

join

# 多表联查
results = session.query(Person, Job).filter(Person.job_id == Job.id).all()
''' 结果
[(Person(id=1, name=august rush, age=23), Job(id=1, name=python developer, salary=10000.0, location=zhengzhou)), (Person(id=2, name=taylor swift, age=33), Job(id=5, name=singer, salary=100000.0, location=new york))]
'''

# 使用join() inner join
results = session.query(Person, Job).join(Job, Person.job_id == Job.id).all()

''' sql
SELECT person.id AS person_id, person.job_id AS person_job_id, person.name AS person_name, person.age AS person_age, job.id AS job_id, job.name AS job_name, job.salary AS job_salary, job.location AS job_location 
FROM person INNER JOIN job ON person.job_id = job.id
'''
''' 结果
[(Person(id=1, name=august rush, age=23), Job(id=1, name=python developer, salary=10000.0, location=zhengzhou)), (Person(id=2, name=taylor swift, age=33), Job(id=5, name=singer, salary=100000.0, location=new york))]
'''

# 使用join() left join
results = session.query(Person, Job).join(Job, Person.job_id == Job.id, isouter=True).all()

results = session.query(Person, Job).outerjoin(Job, Person.job_id == Job.id).all()
'''sql
SELECT person.id AS person_id, person.job_id AS person_job_id, person.name AS person_name, person.age AS person_age, job.id AS job_id, job.name AS job_name, job.salary AS job_salary, job.location AS job_location 
FROM person LEFT OUTER JOIN job ON person.job_id = job.id
'''
'''结果
[(Person(id=1, name=august rush, age=23), Job(id=1, name=python developer, salary=10000.0, location=zhengzhou)), (Person(id=2, name=taylor swift, age=33), Job(id=5, name=singer, salary=100000.0, location=new york))]
'''


  • 标签云

  • 支付宝扫码支持一下

  • 微信扫码支持一下



基于Nginx+Supervisord+uWSGI+Django1.11.1+Python3.6.5构建

京ICP备20007446号-1 & 豫公网安备 41100202000460号

网站地图 & RSS | Feed