游乐游手机版
首页/数据库/文章详情

SQLAlchemy中UPSERT操作的实现方法与使用技巧

时间:2026-06-14 07:02
前言 SQLite 和 PostgreSQL 都支持 UPSERT 操作,简单说就是“有则更新,无则新增”。不过要注意,冲突列必须设置唯一约束,否则这个功能不会生效。 两者的语法长得非常像,但细节上藏着小坑。直接上语法对比: PostgreSQL: INSERT ON CONFLICT (c

前言

SQLite 和 PostgreSQL 都支持 UPSERT 操作,简单说就是“有则更新,无则新增”。不过要注意,冲突列必须设置唯一约束,否则这个功能不会生效。

SQLAlchemy中使用UPSERT的操作方法

两者的语法长得非常像,但细节上藏着小坑。直接上语法对比:

  • PostgreSQL: INSERT ... ON CONFLICT (column) DO UPDATE/NOTHING
  • SQLite: INSERT ... ON CONFLICT(column) DO UPDATE/NOTHING。括号位置紧贴CONFLICT,这个区别要是搞混了,SQLite会直接报错。
场景PostgreSQLSQLite说明
基本 UPSERTON CONFLICT (col) DO UPDATE SET ...ON CONFLICT(col) DO UPDATE SET ...就这一个括号位置的差别
冲突忽略ON CONFLICT (col) DO NOTHINGON CONFLICT(col) DO NOTHING行为完全一致
引用新值EXCLUDED.colexcluded.col大小写要区分:PG大写,SQLite小写
返回结果RETURNING *RETURNING *语法一致,但SQLite需要3.35+版本
条件更新WHERE condition不支持 WHERE这是SQLite的一个明确限制

注意事项

  • 冲突列必须有唯一约束——这个点再怎么强调也不为过,很多人踩坑就是忘了建唯一索引。
  • 语法看着相似,但细节差异容易翻车。如果在不同数据库之间切换,原生SQL一定要逐句校对。
  • SQLite不支持UPSERT后面的WHERE子句,必须用CASE表达式或者在应用层做过滤。
  • SQLite 3.35+版本才开始支持RETURNING,如果还在用旧版本,这一块得留意。

EXCLUDED 和 RETURNING

EXCLUDED

EXCLUDED 是什么?它代表冲突发生时、被“拦截”下来的那批新数据。换句话说,就是你想往表里插入、但因为唯一约束冲突而被迫转向更新的那组值。

INSERT INTO users (email, name, age)VALUES ('test@example.com', '新名字', 30)ON CONFLICT (email) DO UPDATE SET    name = EXCLUDED.name,   -- ← 这里引用的就是新值 "新名字"    age = EXCLUDED.age      -- ← 这里引用的是新值 30
场景表达式含义示例值
原表字段users.name冲突行的当前值"老名字"
新值字段EXCLUDED.name试图插入的新值"新名字"
混合计算users.age + EXCLUDED.age原值 + 新值25 + 30 = 55

示例 1:累加库存

-- 商品库存累加:原库存 100 + 新增 50 = 150INSERT INTO products (sku, stock)VALUES ('IPHONE15', 50)ON CONFLICT (sku) DO UPDATE SET    stock = products.stock + EXCLUDED.stock  -- 100 + 50RETURNING stock;

示例 2:仅更新非空字段

-- 如果新值为 NULL,保留原值,这种写法在实际业务中很常用INSERT INTO users (email, name, age)VALUES ('test@example.com', '新名字', NULL)ON CONFLICT (email) DO UPDATE SET    name = COALESCE(EXCLUDED.name, users.name),  -- 新名字    age = COALESCE(EXCLUDED.age, users.age)      -- 保留原 age

示例 3:时间戳更新

-- 更新时顺手刷新 updated_atINSERT INTO users (email, name)VALUES ('test@example.com', '新名字')ON CONFLICT (email) DO UPDATE SET    name = EXCLUDED.name,    updated_at = NOW()  -- PostgreSQL 写法    -- updated_at = CURRENT_TIMESTAMP  -- SQLite 写法

RETURNING

RETURNING 是少跑一趟 SELECT 的利器。在执行 INSERT/UPDATE/DELETE 之后,直接返回你想要的列。

INSERT INTO users (email, name)VALUES ('test@example.com', '张三')RETURNING id, email, name, created_at;

示例 1:插入后立即获取 ID

# PostgreSQL / SQLite 3.35+sql = text("""    INSERT INTO users (email, name)    VALUES (:email, :name)    RETURNING id, email, created_at""")result = await session.execute(sql, {"email": "test@example.com", "name": "张三"})user = result.mappings().first()print(user["id"])  # 直接获取 ID

示例 2:UPSERT 后统一返回

-- 无论插入还是更新,都返回最终状态INSERT INTO users (email, name, login_count)VALUES ('test@example.com', '张三', 1)ON CONFLICT (email) DO UPDATE SET    name = EXCLUDED.name,    login_count = users.login_count + 1  -- 累加登录次数RETURNING     id,    email,    name,    login_count,    CASE         WHEN xmax = 0 THEN 'inserted'  -- PostgreSQL 特有:xmax=0 表示插入        ELSE 'updated'    END AS action

示例 3:批量操作返回所有结果

-- PostgreSQL 支持批量 RETURNINGINSERT INTO users (email, name)VALUES     ('a@example.com', 'A'),    ('b@example.com', 'B')ON CONFLICT (email) DO UPDATE SET    name = EXCLUDED.nameRETURNING id, email, name;

Python 处理批量返回:

result = await session.execute(sql)users = [dict(row) for row in result.mappings().all()]# [{'id': 1, 'email': 'a@example.com', 'name': 'A'}, ...]

示例:用户登录计数器

async def record_user_login(session: AsyncSession, email: str, name: str) -> dict:    """    用户登录计数器:    - 新用户:插入,login_count = 1    - 老用户:更新,login_count += 1    - 返回最终状态 + 操作类型    """    sql = text("""        INSERT INTO users (            email, name, login_count, last_login, created_at        ) VALUES (            :email, :name, 1, :now, :now        )        ON CONFLICT (email) DO UPDATE SET            name = EXCLUDED.name,                          -- 更新用户名            login_count = users.login_count + 1,           -- 累加登录次数            last_login = EXCLUDED.last_login               -- 更新最后登录时间        RETURNING            id,            email,            name,            login_count,            last_login,            created_at,            CASE                 WHEN xmax = 0 THEN 'inserted'                 ELSE 'updated'             END AS action  -- PostgreSQL 特有:区分插入/更新    """)    now = datetime.utcnow()    result = await session.execute(        sql,        {"email": email, "name": name, "now": now}    )    row = result.mappings().first()    return dict(row) if row else None# 使用示例user = await record_user_login(session, "test@example.com", "张三")print(f"{user['action']} user {user['email']} with {user['login_count']} logins")# 输出: inserted user test@example.com with 1 logins# 或: updated user test@example.com with 5 logins

示例数据模型类

from sqlalchemy import Column, Integer, String, UniqueConstraintfrom sqlalchemy.orm import DeclarativeBaseclass Base(DeclarativeBase):    passclass User(Base):    __tablename__ = "users"    id = Column(Integer, primary_key=True, autoincrement=True)    email = Column(String(100), unique=True, nullable=False)  # 唯一约束    name = Column(String(50))    age = Column(Integer)    balance = Column(Integer, default=0)    __table_args__ = (        UniqueConstraint("email", name="uq_users_email"),    )class Product(Base):    __tablename__ = "products"    id = Column(Integer, primary_key=True)    sku = Column(String(50), unique=True, nullable=False)  # 唯一 SKU    name = Column(String(100))    stock = Column(Integer, default=0)    price = Column(Integer)

ORM 方式

用ORM时,insert 的导入路径一定要搞清楚,尤其是跨数据库的场景。

基本示例

from sqlalchemy.dialects.postgresql import insert as pg_insertfrom sqlalchemy.dialects.sqlite import insert as sqlite_insertfrom sqlalchemy import insertasync def upsert_user_orm(session: AsyncSession, user_data: dict) -> dict:    """    UPSERT 用户(ORM 风格)    如果 email 冲突则更新,否则插入    """    # 方式 1:使用通用 insert(推荐)    # SQLAlchemy 会根据方言自动选择正确的语法    stmt = (        insert(User)        .values(**user_data)        .on_conflict_do_update(            index_elements=["email"],  # 冲突检测列(唯一约束)            set_={                "name": user_data["name"],                "age": user_data.get("age"),                "updated_at": func.now()  # 假设有 updated_at 列            }        )        .returning(User)  # 返回插入/更新后的行    )    result = await session.execute(stmt)    user = result.scalar_one()    return {        "id": user.id,        "email": user.email,        "name": user.name,        "age": user.age    }async def upsert_user_ignore(session: AsyncSession, user_data: dict) -> bool:    """    UPSERT 但冲突时忽略(DO NOTHING)    """    stmt = (        insert(User)        .values(**user_data)        .on_conflict_do_nothing(            index_elements=["email"]        )    )    result = await session.execute(stmt)    return result.rowcount > 0  # 返回是否插入成功

条件更新:仅更新特定字段

async def upsert_user_conditional(session: AsyncSession, user_data: dict) -> dict:    """    UPSERT:冲突时只更新非空字段    """    stmt = (        insert(User)        .values(**user_data)        .on_conflict_do_update(            index_elements=["email"],            set_={                "name": user_data["name"],                # 条件:只有提供了 age 才更新                "age": user_data.get("age", User.age),  # 保持原值            },            # 可选:添加 WHERE 条件            where=User.email == user_data["email"]        )        .returning(User)    )    result = await session.execute(stmt)    return result.mappings().first()

批量 UPSERT

async def bulk_upsert_users(session: AsyncSession, users: list[dict]) -> int:    """    批量 UPSERT 用户    """    stmt = (        insert(User)        .values(users)        .on_conflict_do_update(            index_elements=["email"],            set_={                "name": insert(User).excluded.name,  # 使用 excluded 表示新值                "age": insert(User).excluded.age,            }        )    )    result = await session.execute(stmt)    return result.rowcount

使用 EXCLUDED 引用新值

async def upsert_product_with_stock(session: AsyncSession, product_data: dict) -> dict:    """    UPSERT 产品:冲突时累加库存    """    stmt = (        insert(Product)        .values(**product_data)        .on_conflict_do_update(            index_elements=["sku"],            set_={                # 累加库存:原库存 + 新库存                "stock": Product.stock + insert(Product).excluded.stock,                # 更新其他字段                "name": insert(Product).excluded.name,                "price": insert(Product).excluded.price,            }        )        .returning(Product)    )    result = await session.execute(stmt)    return result.mappings().first()

用户服务

class UserService:    """用户服务(支持 UPSERT)"""    def __init__(self, session: AsyncSession):        self.session = session    async def create_or_update(self, email: str, name: str, age: int | None = None) -> dict:        """创建或更新用户"""        stmt = (            insert(User)            .values(                email=email,                name=name,                age=age,                created_at=datetime.utcnow()            )            .on_conflict_do_update(                index_elements=["email"],                set_={                    "name": name,                    "age": age,                    "updated_at": datetime.utcnow()                }            )            .returning(User)        )        result = await self.session.execute(stmt)        user = result.scalar_one()        return {            "id": user.id,            "email": user.email,            "name": user.name,            "age": user.age        }    async def bulk_create_or_update(self, users: list[dict]) -> int:        """批量创建或更新"""        stmt = (            insert(User)            .values(users)            .on_conflict_do_update(                index_elements=["email"],                set_={                    "name": insert(User).excluded.name,                    "age": insert(User).excluded.age,                    "updated_at": datetime.utcnow()                }            )        )        result = await self.session.execute(stmt)        return result.rowcount    async def create_if_not_exists(self, email: str, name: str) -> bool:        """仅当不存在时创建"""        stmt = (            insert(User)            .values(                email=email,                name=name,                created_at=datetime.utcnow()            )            .on_conflict_do_nothing(                index_elements=["email"]            )        )        result = await self.session.execute(stmt)        return result.rowcount > 0  # True = 插入成功,False = 已存在

原生 SQL

基本示例

PostgreSQL

async def upsert_user_pg(session: AsyncSession, user_data: dict) -> dict | None:    """    PostgreSQL 原生 UPSERT    """    sql = text("""        INSERT INTO users (email, name, age, created_at)        VALUES (:email, :name, :age, :created_at)        ON CONFLICT (email) DO UPDATE  -- 冲突列        SET             name = EXCLUDED.name,      -- EXCLUDED 表示新插入的值            age = EXCLUDED.age,            updated_at = NOW()        RETURNING id, email, name, age    """)    result = await session.execute(        sql,        {            "email": user_data["email"],            "name": user_data["name"],            "age": user_data.get("age"),            "created_at": datetime.utcnow()        }    )    row = result.mappings().first()    return dict(row) if row else None

SQLite

async def upsert_user_sqlite(session: AsyncSession, user_data: dict) -> dict | None:    """    SQLite 原生 UPSERT(语法与 PostgreSQL 几乎相同)    """    sql = text("""        INSERT INTO users (email, name, age, created_at)        VALUES (:email, :name, :age, :created_at)        ON CONFLICT(email) DO UPDATE SET  -- SQLite 语法稍有不同            name = excluded.name,            age = excluded.age,            updated_at = CURRENT_TIMESTAMP        RETURNING id, email, name, age    """)    result = await session.execute(        sql,        {            "email": user_data["email"],            "name": user_data["name"],            "age": user_data.get("age"),            "created_at": datetime.utcnow()        }    )    row = result.mappings().first()    return dict(row) if row else None

冲突时忽略

async def insert_or_ignore_user(session: AsyncSession, user_data: dict) -> bool:    """    插入用户,如果冲突则忽略    """    # PostgreSQL    sql = text("""        INSERT INTO users (email, name, age, created_at)        VALUES (:email, :name, :age, :created_at)        ON CONFLICT (email) DO NOTHING    """)    # SQLite(语法相同)    # sql = text("""    #     INSERT INTO users (email, name, age, created_at)    #     VALUES (:email, :name, :age, :created_at)    #     ON CONFLICT(email) DO NOTHING    # """)    result = await session.execute(        sql,        {            "email": user_data["email"],            "name": user_data["name"],            "age": user_data.get("age"),            "created_at": datetime.utcnow()        }    )    return result.rowcount > 0  # 返回是否插入成功

批量 UPSERT

async def bulk_upsert_products(session: AsyncSession, products: list[dict]) -> int:    """    批量 UPSERT 产品(原生 SQL)    """    # PostgreSQL    sql = text("""        INSERT INTO products (sku, name, stock, price, created_at)        VALUES (            :sku, :name, :stock, :price, :created_at        )        ON CONFLICT (sku) DO UPDATE SET            name = EXCLUDED.name,            stock = products.stock + EXCLUDED.stock,  -- 累加库存            price = EXCLUDED.price,            updated_at = NOW()    """)    # 批量执行    for product in products:        await session.execute(            sql,            {                "sku": product["sku"],                "name": product["name"],                "stock": product.get("stock", 0),                "price": product.get("price", 0),                "created_at": datetime.utcnow()            }        )    return len(products)

部分更新 + 条件判断

async def upsert_user_smart(session: AsyncSession, user_data: dict) -> dict | None:    """    智能 UPSERT:    - 如果提供了 age,才更新 age    - 如果提供了 name,才更新 name    - 更新 updated_at    """    sql = text("""        INSERT INTO users (email, name, age, created_at)        VALUES (:email, :name, :age, :created_at)        ON CONFLICT (email) DO UPDATE SET            name = COALESCE(:name, users.name),  -- 如果新值为 NULL,保持原值            age = COALESCE(:age, users.age),            updated_at = NOW()        RETURNING id, email, name, age, updated_at    """)    result = await session.execute(        sql,        {            "email": user_data["email"],            "name": user_data.get("name"),  # 可能为 None            "age": user_data.get("age"),    # 可能为 None            "created_at": datetime.utcnow()        }    )    row = result.mappings().first()    return dict(row) if row else None

用户注册/登录:存在则更新最后登录时间

async def register_or_login(session: AsyncSession, email: str, name: str) -> dict:    """    用户注册或登录:    - 新用户:插入    - 老用户:更新最后登录时间    """    sql = text("""        INSERT INTO users (email, name, last_login, created_at)        VALUES (:email, :name, :now, :now)        ON CONFLICT (email) DO UPDATE SET            last_login = EXCLUDED.last_login,            name = EXCLUDED.name  -- 可选:更新用户名        RETURNING id, email, name, last_login, created_at    """)    now = datetime.utcnow()    result = await session.execute(        sql,        {"email": email, "name": name, "now": now}    )    return dict(result.mappings().first())

库存累加

async def add_product_stock(session: AsyncSession, sku: str, quantity: int) -> bool:    """    增加商品库存:    - 商品不存在:插入    - 商品存在:累加库存    """    sql = text("""        INSERT INTO products (sku, stock, created_at)        VALUES (:sku, :quantity, :now)        ON CONFLICT (sku) DO UPDATE SET            stock = products.stock + EXCLUDED.stock,            updated_at = NOW()    """)    result = await session.execute(        sql,        {            "sku": sku,            "quantity": quantity,            "now": datetime.utcnow()        }    )    return result.rowcount > 0

用户积分累加

async def add_user_points(session: AsyncSession, user_id: int, points: int) -> dict | None:    """    增加用户积分(累加)    """    sql = text("""        INSERT INTO user_points (user_id, points, created_at)        VALUES (:user_id, :points, :now)        ON CONFLICT (user_id) DO UPDATE SET            points = user_points.points + EXCLUDED.points,            updated_at = NOW()        RETURNING user_id, points    """)    result = await session.execute(        sql,        {            "user_id": user_id,            "points": points,            "now": datetime.utcnow()        }    )    row = result.mappings().first()    return dict(row) if row else None

标签计数

存在则 +1,不存在则创建:

async def increment_tag_count(session: AsyncSession, tag_name: str) -> int:    """    标签计数:    - 标签不存在:插入 count=1    - 标签存在:count += 1    """    sql = text("""        INSERT INTO tags (name, count, created_at)        VALUES (:name, 1, :now)        ON CONFLICT (name) DO UPDATE SET            count = tags.count + 1,            updated_at = NOW()        RETURNING count    """)    result = await session.execute(        sql,        {"name": tag_name, "now": datetime.utcnow()}    )    return result.scalar() or 0
来源:https://www.jb51.net/database/3585148h5.htm
上一篇详解SQL Server触发器常见应用场景与注意事项 下一篇Navicat生成ER关系图并导出完整教程
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
Redis 7.0增量AOF重写RDB前导码配置详解
数据库 · 2026-07-02

Redis 7.0增量AOF重写RDB前导码配置详解

先说一个几乎所有人都踩过的典型误区:很多人把 aof-use-rdb-preamble yes 当作开启“增量重写”的开关。实际上,这个配置只干了一件事——让重写后的 AOF 文件头部带上 RDB 快照。它解决的是加载速度问题,跟“增量重写”本身的概念压根不是一回事。真正的增量重写,依赖的是 Red

在Python Tornado异步框架中安全执行SQL命令的方法与最佳实践
数据库 · 2026-07-02

在Python Tornado异步框架中安全执行SQL命令的方法与最佳实践

直接在Tornado里用SQLAlchemy同步执行SQL,结果就是阻塞IOLoop,所谓“异步框架里写同步数据库代码”,等于白搭。安全执行的关键不是“怎么写SQL”,而是“怎么不卡住事件循环”。 为什么不能在RequestHandler里直接调用session execute() 因为sessio

利用SQL触发器实现在INSERT数据时自动同步到审计表
数据库 · 2026-07-02

利用SQL触发器实现在INSERT数据时自动同步到审计表

先说结论:可以用触发器把 INSERT 数据同步到审计表,但必须用 AFTER INSERT,并且审计表的字段顺序、类型、字符集得和源表严格一致。否则,轻则写入错位、数据截断,重则直接报错、丢数据。下面把这些坑一个一个掰开说。 能,但必须用 AFTER INSERT,且审计表字段顺序、类型、字符集要

如何用SQL编写按不同工作日统计员工出勤率
数据库 · 2026-07-02

如何用SQL编写按不同工作日统计员工出勤率

在实际业务中,统计不同工作日的出勤率是HR系统里的高频需求。如果直接按日期函数分组,很容易掉进语言环境、索引失效或分母口径的坑里。下面就来拆解具体的实现要点。 必须用 CASE WHEN 将日期映射为固定 weekday 标签(如 Mon )再分组,避免语言环境导致的分组断裂;需过滤 DOW IN

Spring Boot 3动态拼接SQL为何引发严重安全漏洞
数据库 · 2026-07-02

Spring Boot 3动态拼接SQL为何引发严重安全漏洞

SQL注入漏洞的核心成因,本质上是因为用户输入直接参与了SQL语句的字符串拼接,而未采用参数化绑定机制。在MyBatis中使用${}、QueryWrapper中调用apply()与last()、JPA的@Query注解进行拼接等操作,都会绕过PreparedStatement的安全防护。动态字段必须