Таблицы, выражения, выполнение запросов через Core API
Core — это фундамент SQLAlchemy. Это «SQL в Python»: выразительный, типизированный, безопасный. Понимание Core сделает вас лучшим ORM-разработчиком.
Даже если вы планируете использовать ORM, понимание Core критически важно:
select(), insert(), update() работают одинаковоMetaData — это коллекция объектов Table, описывающих схему базы данных.
from sqlalchemy import MetaData
metadata = MetaData()
# Метаданные — это контейнер
# Таблицы добавляются через Table()from sqlalchemy import Table, Column, Integer, String, DateTime, ForeignKey
from datetime import datetime
users = Table(
'users', metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('email', String(255), nullable=False, unique=True),
Column('name', String(100), nullable=False),
Column('created_at', DateTime, default=datetime.utcnow),
)
posts = Table(
'posts', metadata,
Column('id', Integer, primary_key=True),
Column('title', String(200), nullable=False),
Column('content', String, nullable=True), # String без длины = TEXT
Column('user_id', Integer, ForeignKey('users.id'), nullable=False),
Column('published', Boolean, default=False),
)SQLAlchemy сопоставляет Python-типы с SQL-типами:
| SQLAlchemy | PostgreSQL | Python |
|---|---|---|
Integer | INTEGER | int |
String(n) | VARCHAR(n) | str |
String (без n) | TEXT | str |
Text | TEXT | str |
Boolean | BOOLEAN | bool |
DateTime | TIMESTAMP | datetime |
Date | DATE | date |
Time | TIME | time |
Float | FLOAT | float |
Numeric(p, s) | NUMERIC(p, s) | Decimal |
JSON | JSON | dict/list |
LargeBinary | BYTEA | bytes |
from sqlalchemy import Integer, String, Boolean, DateTime, Date, Time
from sqlalchemy import Float, Numeric, JSON, LargeBinary, Text
from decimal import Decimalfrom sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg://user:pass@localhost/db')
# Создать все таблицы из metadata
metadata.create_all(engine)
# Создать конкретную таблицу
users.create(engine)
# Создать только отсутствующие таблицы
metadata.create_all(engine, checkfirst=True) # по умолчанию True# Удалить все таблицы (осторожно!)
metadata.drop_all(engine)
# Удалить конкретную
users.drop(engine)
# Порядок важен: сначала дочерние таблицы с FK
# SQLAlchemy автоматически определяет порядок через foreign keysfrom sqlalchemy import select
# SELECT * FROM users
stmt = select(users)
# SELECT id, email FROM users
stmt = select(users.c.id, users.c.email)
# SELECT * FROM users WHERE id = 1
stmt = select(users).where(users.c.id == 1)
# WHERE email = 'test@example.com'
stmt = select(users).where(users.c.email == 'test@example.com')from sqlalchemy import and_, or_, not_
# AND: WHERE age > 18 AND active = true
stmt = select(users).where(
and_(users.c.age > 18, users.c.active == True)
)
# OR: WHERE id = 1 OR id = 2
stmt = select(users).where(
or_(users.c.id == 1, users.c.id == 2)
)
# IN: WHERE id IN (1, 2, 3)
stmt = select(users).where(users.c.id.in_([1, 2, 3]))
# LIKE: WHERE email LIKE '%@gmail.com'
stmt = select(users).where(users.c.email.like('%@gmail.com'))
# IS NULL: WHERE deleted_at IS NULL
stmt = select(users).where(users.c.deleted_at.is_(None))
# BETWEEN: WHERE age BETWEEN 18 AND 65
stmt = select(users).where(users.c.age.between(18, 65))from sqlalchemy import insert
# INSERT INTO users (email, name) VALUES ('test@example.com', 'Alice')
stmt = insert(users).values(email='test@example.com', name='Alice')
# Массовая вставка
stmt = insert(users).values([
{'email': 'a@example.com', 'name': 'Alice'},
{'email': 'b@example.com', 'name': 'Bob'},
])from sqlalchemy import update
# UPDATE users SET name = 'Alice Updated' WHERE id = 1
stmt = update(users).where(users.c.id == 1).values(name='Alice Updated')
# UPDATE users SET active = true WHERE active = false
stmt = update(users).where(users.c.active == False).values(active=True)from sqlalchemy import delete
# DELETE FROM users WHERE id = 1
stmt = delete(users).where(users.c.id == 1)
# DELETE FROM users WHERE created_at < '2023-01-01'
stmt = delete(users).where(users.c.created_at < datetime(2023, 1, 1))from sqlalchemy import text
# Получение соединения
with engine.connect() as conn:
# Выполнение SELECT
result = conn.execute(select(users))
# Получение всех строк
rows = result.fetchall()
# Получение одной строки
row = result.fetchone()
# Итерация
for row in result:
print(row.id, row.email)result = conn.execute(select(users.c.id, users.c.email))
for row in result:
# По индексу
print(row[0], row[1])
# По имени атрибута
print(row.id, row.email)
# По имени колонки (как dict)
print(row['id'], row['email'])# Явная транзакция
with engine.connect() as conn:
trans = conn.begin()
try:
conn.execute(insert(users).values(email='test@example.com', name='Test'))
trans.commit() # Явный commit
except:
trans.rollback() # Явный rollback
# Context manager (рекомендуется)
with engine.begin() as conn:
# Автоматический commit при успехе, rollback при ошибке
conn.execute(insert(users).values(email='test@example.com', name='Test'))# Один скаляр (одно значение)
result = conn.execute(select(users.c.name).where(users.c.id == 1))
name = result.scalar() # 'Alice'
# Все скаляры (список)
result = conn.execute(select(users.c.name))
names = result.scalars().all() # ['Alice', 'Bob', ...]
# Первая строка
result = conn.execute(select(users))
row = result.first()
# Одна строка или None
result = conn.execute(select(users).where(users.c.id == 999))
row = result.one_or_none() # None если не найдено
# Одна строка или ошибка
result = conn.execute(select(users).where(users.c.id == 1))
row = result.one() # Ошибка если 0 или >1 строкPostgreSQL поддерживает RETURNING для получения вставленных/обновлённых данных:
# INSERT с возвратом id
stmt = insert(users).values(email='test@example.com', name='Test').returning(users.c.id)
result = conn.execute(stmt)
new_id = result.scalar()
# UPDATE с возвратом
stmt = update(users).where(users.c.id == 1).values(name='Updated').returning(users.c.name)
result = conn.execute(stmt)
new_name = result.scalar()from sqlalchemy import join
# INNER JOIN
stmt = select(users, posts).select_from(
users.join(posts, users.c.id == posts.c.user_id)
)
# LEFT OUTER JOIN
stmt = select(users, posts).select_from(
users.outerjoin(posts, users.c.id == posts.c.user_id)
)
# С условием
stmt = select(users, posts).select_from(
users.join(
posts,
and_(users.c.id == posts.c.user_id, posts.c.published == True)
)
)from sqlalchemy import func, select
# COUNT
stmt = select(func.count(users.c.id))
count = conn.execute(stmt).scalar()
# COUNT с GROUP BY
stmt = select(
posts.c.user_id,
func.count(posts.c.id).label('post_count')
).group_by(posts.c.user_id)
# HAVING
stmt = select(
posts.c.user_id,
func.count(posts.c.id).label('post_count')
).group_by(posts.c.user_id).having(
func.count(posts.c.id) > 5
)
# AVG, SUM, MIN, MAX
stmt = select(
func.avg(posts.c.rating),
func.sum(posts.c.views),
func.min(posts.c.created_at),
func.max(posts.c.created_at),
)# ORDER BY name ASC
stmt = select(users).order_by(users.c.name)
# ORDER BY created_at DESC
stmt = select(users).order_by(users.c.created_at.desc())
# ORDER BY name ASC, created_at DESC
stmt = select(users).order_by(users.c.name, users.c.created_at.desc())
# LIMIT 10
stmt = select(users).limit(10)
# OFFSET 20 (пропустить 20)
stmt = select(users).offset(20)
# LIMIT 10 OFFSET 20 (пагинация)
stmt = select(users).limit(10).offset(20)Автоматическая загрузка существующей схемы:
from sqlalchemy import MetaData, Table
metadata = MetaData()
# Загрузить все таблицы
metadata.reflect(engine)
# Доступ к загруженным таблицам
users = metadata.tables['users']
posts = metadata.tables['posts']
# Загрузить конкретную таблицу
users = Table('users', metadata, autoload_with=engine)
# Инспектор для детальной информации
from sqlalchemy import inspect
inspector = inspect(engine)
# Список таблиц
table_names = inspector.get_table_names()
# Колонки таблицы
columns = inspector.get_columns('users')
for col in columns:
print(col['name'], col['type'], col['nullable'])
# Foreign keys
fks = inspector.get_foreign_keys('posts')
# Indexes
indexes = inspector.get_indexes('users')# Используйте bindparam для параметров
stmt = select(users).where(users.c.id == bindparam('user_id'))
result = conn.execute(stmt, {'user_id': 1})
# Используйте .returning() для получения данных после INSERT/UPDATE
stmt = insert(users).values(email='test@example.com').returning(users.c.id)
new_id = conn.execute(stmt).scalar()
# Проверяйте существование таблиц перед drop
metadata.drop_all(engine, checkfirst=True)
# Используйте context manager для транзакций
with engine.begin() as conn:
conn.execute(...)# Никогда не используйте f-strings для SQL (SQL injection!)
# ПЛОХО:
stmt = text(f"SELECT * FROM users WHERE id = {user_id}")
# Используйте параметры:
stmt = text("SELECT * FROM users WHERE id = :id")
result = conn.execute(stmt, {"id": user_id})
# Не создавайте Table с тем же именем дважды в metadata
# ПЛОХО:
users1 = Table('users', metadata, ...)
users2 = Table('users', metadata, ...) # Ошибка!В следующей теме вы изучите ORM Basics — декларативные модели, сессии и CRUD операции. ORM построен на Core, поэтому вы увидите много знакомых концепций.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.