游乐游手机版
首页/AI教程/文章详情

DDPM源码逐行注释:正向加噪、逆向去噪、MSE损失全流程复现

时间:2026-06-23 15:28
扩散模型以正向加噪和逆向去噪为双过程,通过重参数化直接从数据计算任意时刻噪声状态,训练仅需预测噪声的均方误差损失。采用U-Net架构与正弦时间嵌入,结合线性或余弦噪声调度,实现了稳定高效的图像生成、音频合成等任务。

摘要

扩散模型(Diffusion Models)如今已是生成式AI领域当之无愧的核心范式,在图像生成、音频合成、分子设计等任务中,表现远胜GAN和VAE。本文从最底层的数学原理出发,逐步推导扩散过程与逆过程的核心公式,并提供一份完整、可运行的PyTorch代码实现。内容涵盖正向加噪、逆向去噪、损失函数设计、采样策略等关键环节,同时针对训练不稳定、采样速度慢、条件控制等常见问题,给出系统性的解决方案。逻辑严密,代码可直接运行,适合有一定深度学习基础、希望深入理解扩散模型内部机制的读者。

逐行注释DDPM源码:正向加噪、逆向去噪、MSE损失全流程复现

应用场景

扩散模型凭借强大的分布建模能力和稳定的训练过程,已在以下领域结出丰硕成果:

  1. 图像生成与编辑:DALL-E 2、Stable Diffusion、Imagen等主流文生图模型,均采用扩散架构,能够支撑高分辨率、高保真度的图像合成。
  2. 音频生成:WaveGrad、DiffWave等模型将扩散过程应用于语音波形生成,质量显著优于传统自回归方法。
  3. 分子构象生成:GeoDiff等模型利用扩散模型生成3D分子结构,在药物发现领域展现出广阔的应用前景。
  4. 时序数据预测:扩散模型也可用于金融时间序列、气象数据的概率预测,灵活度极高。
  5. 图像超分辨率与修复:SR3、Palette等模型在条件扩散框架下完成图像复原任务,效果相当出色。

核心原理

扩散模型的核心其实很简单,包含两个过程:前向扩散过程和逆向生成过程。

前向扩散过程

给定真实数据分布 \(x_0 \sim q(x)\),前向过程会逐步向数据添加高斯噪声,经过 \(T\) 步后得到近似标准正态分布的 \(x_T\)。这个过程被建模为马尔可夫链:

\[ q(x_t | x_{t-1}) = \mathcal{N}(x_t; \sqrt{1 - \beta_t} x_{t-1}, \beta_t \mathbf{I}) \]

其中 \(\beta_t\) 是预定义的噪声调度表,通常从 \(1 \times 10^{-4}\) 到 \(0.02\) 线性增长。这里有个关键技巧:利用重参数化方法,可以直接从 \(x_0\) 计算任意时刻 \(t\) 的 \(x_t\):

\[ x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon \]

其中 \(\alpha_t = 1 - \beta_t\),\(\bar{\alpha}_t = \prod_{i=1}^t \alpha_i\),\(\epsilon \sim \mathcal{N}(0, \mathbf{I})\)。这一公式让训练时无需逐步迭代,可直接采样任意时刻的噪声状态,效率大幅提升。

逆向生成过程

逆向过程需要学习从噪声 \(x_T\) 一步步还原出真实数据 \(x_0\)。同样被建模为马尔可夫链,不过转移概率需靠神经网络来近似:

\[ p_\theta(x_{t-1} | x_t) = \mathcal{N}(x_{t-1}; \mu_\theta(x_t, t), \sigma_t^2 \mathbf{I}) \]

其中 \(\sigma_t^2\) 通常固定为 \(\beta_t\) 或 \(\frac{1 - \bar{\alpha}_{t-1}}{1 - \bar{\alpha}_t} \beta_t\)。核心在于学习均值 \(\mu_\theta\)。根据DDPM的推导,最优均值可以表示为:

\[ \mu_\theta(x_t, t) = \frac{1}{\sqrt{\alpha_t}} \left( x_t - \frac{\beta_t}{\sqrt{1 - \bar{\alpha}_t}} \epsilon_\theta(x_t, t) \right) \]

因此,我们只需训练一个神经网络 \(\epsilon_\theta\) 来预测添加的噪声 \(\epsilon\),即可完成逆向过程。这一简化设计极为巧妙。

训练损失函数

基于上述推导,训练目标变得极其简单:

\[ L_{\text{simple}} = \mathbb{E}_{t, x_0, \epsilon} \left[ \| \epsilon - \epsilon_\theta(x_t, t) \|^2 \right] \]

这便是一个简单的均方误差损失,其中 \(x_t\) 由 \(x_0\) 和 \(\epsilon\) 通过前向公式直接计算得到。这样的设计使训练异常稳定,完全无需对抗训练或变分下界近似,非常省心。

详细步骤

步骤1:定义噪声调度表

常用的是线性调度(Linear Schedule)或余弦调度(Cosine Schedule)。线性调度在 \(T=1000\) 时效果不错,而余弦调度在高分辨率任务中表现更佳。

步骤2:构建神经网络

首选架构是U-Net,包含下采样、中间块、上采样三个部分,每个块内包含残差卷积层和自注意力机制。时间步 \(t\) 通过正弦位置编码嵌入到每一层中。

步骤3:训练循环

  1. 从数据集中采样 \(x_0\)
  2. 随机采样时间步 \(t \sim \text{Uniform}(1, T)\)
  3. 采样噪声 \(\epsilon \sim \mathcal{N}(0, \mathbf{I})\)
  4. 计算 \(x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon\)
  5. 输入 \(x_t\) 和时间步 \(t\) 到网络,预测噪声 \(\hat{\epsilon}\)
  6. 计算损失 \(L = \text{MSE}(\epsilon, \hat{\epsilon})\)
  7. 反向传播更新网络参数

步骤4:采样生成

  1. 从标准正态分布采样 \(x_T\)
  2. 从 \(t=T\) 到 \(1\) 迭代:
    a. 采样 \(z \sim \mathcal{N}(0, \mathbf{I})\)(当 \(t>1\) 时)
    b. 预测噪声 \(\hat{\epsilon} = \epsilon_\theta(x_t, t)\)
    c. 计算 \(x_{t-1} = \frac{1}{\sqrt{\alpha_t}} \left( x_t - \frac{\beta_t}{\sqrt{1 - \bar{\alpha}_t}} \hat{\epsilon} \right) + \sigma_t z\)
  3. 返回 \(x_0\)

完整可运行代码(带注释)

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt

# -------------------- 工具函数 --------------------
def sinusoidal_embedding(timesteps, embedding_dim):
    """
    时间步正弦位置编码
    timesteps: [batch_size] 或 [batch_size, 1]
    embedding_dim: 编码维度,必须是偶数
    """
    half_dim = embedding_dim // 2
    emb = np.log(10000) / (half_dim - 1)
    emb = torch.exp(torch.arange(half_dim, dtype=torch.float32) * -emb)
    emb = timesteps.float() * emb.unsqueeze(0)  # [batch, half_dim]
    emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=-1)
    return emb

# -------------------- 噪声调度表 --------------------
class NoiseSchedule:
    """
    线性噪声调度表
    T: 总步数
    beta_start, beta_end: beta的起始和结束值
    """
    def __init__(self, T=1000, beta_start=1e-4, beta_end=0.02):
        self.T = T
        self.beta = torch.linspace(beta_start, beta_end, T)
        self.alpha = 1.0 - self.beta
        self.alpha_bar = torch.cumprod(self.alpha, dim=0)  # alpha_bar_t

    def get_alpha_bar(self, t):
        """获取指定时间步的alpha_bar"""
        return self.alpha_bar[t]

# -------------------- U-Net 网络 --------------------
class ResidualBlock(nn.Module):
    """残差卷积块,包含时间步嵌入"""
    def __init__(self, in_ch, out_ch, time_emb_dim):
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3, padding=1)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, padding=1)
        self.time_mlp = nn.Linear(time_emb_dim, out_ch)
        self.relu = nn.ReLU()
        self.norm1 = nn.BatchNorm2d(out_ch)
        self.norm2 = nn.BatchNorm2d(out_ch)
        self.skip = nn.Conv2d(in_ch, out_ch, 1) if in_ch != out_ch else nn.Identity()

    def forward(self, x, t_emb):
        h = self.relu(self.norm1(self.conv1(x)))
        # 将时间嵌入加到特征图上
        time_shift = self.time_mlp(t_emb).unsqueeze(-1).unsqueeze(-1)
        h = h + time_shift
        h = self.relu(self.norm2(self.conv2(h)))
        return h + self.skip(x)


class SimpleUNet(nn.Module):
    """
    简化的U-Net,适用于小规模数据集(如MNIST)
    输入: [batch, 1, 28, 28]
    """
    def __init__(self, in_channels=1, base_channels=64, time_emb_dim=128):
        super().__init__()
        self.time_emb_dim = time_emb_dim

        # 下采样路径
        self.enc1 = ResidualBlock(in_channels, base_channels, time_emb_dim)
        self.enc2 = ResidualBlock(base_channels, base_channels*2, time_emb_dim)
        self.enc3 = ResidualBlock(base_channels*2, base_channels*4, time_emb_dim)

        # 中间层
        self.mid = ResidualBlock(base_channels*4, base_channels*4, time_emb_dim)

        # 上采样路径
        self.dec3 = ResidualBlock(base_channels*4*2, base_channels*2, time_emb_dim)
        self.dec2 = ResidualBlock(base_channels*2*2, base_channels, time_emb_dim)
        self.dec1 = ResidualBlock(base_channels*2, in_channels, time_emb_dim)

        # 池化和上采样
        self.pool = nn.MaxPool2d(2)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)

    def forward(self, x, t):
        # 时间嵌入
        t_emb = sinusoidal_embedding(t, self.time_emb_dim)

        # 下采样
        x1 = self.enc1(x, t_emb)
        x2 = self.enc2(self.pool(x1), t_emb)
        x3 = self.enc3(self.pool(x2), t_emb)

        # 中间
        x_mid = self.mid(self.pool(x3), t_emb)

        # 上采样(带跳跃连接)
        x = self.upsample(x_mid)
        x = torch.cat([x, x3], dim=1)
        x = self.dec3(x, t_emb)

        x = self.upsample(x)
        x = torch.cat([x, x2], dim=1)
        x = self.dec2(x, t_emb)

        x = self.upsample(x)
        x = torch.cat([x, x1], dim=1)
        x = self.dec1(x, t_emb)

        return x


# -------------------- 扩散模型 --------------------
class DiffusionModel:
    def __init__(self, model, noise_schedule, device='cpu'):
        self.model = model.to(device)
        self.noise_schedule = noise_schedule
        self.device = device

    def train_step(self, x0, optimizer):
        """
        单步训练
        x0: 真实数据 [batch, channels, H, W]
        """
        batch_size = x0.shape[0]

        # 随机采样时间步
        t = torch.randint(0, self.noise_schedule.T, (batch_size,), device=self.device)

        # 采样噪声
        epsilon = torch.randn_like(x0, device=self.device)

        # 计算 x_t
        alpha_bar = self.noise_schedule.get_alpha_bar(t).view(-1, 1, 1, 1)
        x_t = torch.sqrt(alpha_bar) * x0 + torch.sqrt(1 - alpha_bar) * epsilon

        # 预测噪声
        epsilon_hat = self.model(x_t, t)

        # 计算损失
        loss = F.mse_loss(epsilon_hat, epsilon)

        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        return loss.item()

    @torch.no_grad()
    def sample(self, batch_size=1, image_shape=(1, 28, 28)):
        """从噪声生成样本"""
        # 初始噪声 x_T
        x = torch.randn(batch_size, *image_shape, device=self.device)

        # 逆向迭代
        for t in reversed(range(self.noise_schedule.T)):
            t_tensor = torch.full((batch_size,), t, device=self.device, dtype=torch.long)

            # 预测噪声
            epsilon_hat = self.model(x, t_tensor)

            # 计算 x_{t-1}
            beta = self.noise_schedule.beta[t].to(self.device)
            alpha = self.noise_schedule.alpha[t].to(self.device)
            alpha_bar = self.noise_schedule.alpha_bar[t].to(self.device)

            # 系数
            coef1 = 1.0 / torch.sqrt(alpha)
            coef2 = beta / torch.sqrt(1 - alpha_bar)

            # 均值
            mu = coef1 * (x - coef2 * epsilon_hat)

            # 添加噪声(t>0时)
            if t > 0:
                sigma = torch.sqrt(beta)
                z = torch.randn_like(x)
                x = mu + sigma * z
            else:
                x = mu

        return x


# -------------------- 训练与测试 --------------------
def train_mnist_diffusion(epochs=50, batch_size=128, device='cuda'):
    """在MNIST上训练扩散模型"""
    from torchvision import datasets, transforms

    # 加载MNIST
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))  # 归一化到[-1, 1]
    ])
    dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2)

    # 初始化模型
    noise_schedule = NoiseSchedule(T=1000)
    unet = SimpleUNet(in_channels=1)
    diffusion = DiffusionModel(unet, noise_schedule, device=device)
    optimizer = torch.optim.Adam(diffusion.model.parameters(), lr=1e-4)

    # 训练循环
    for epoch in range(epochs):
        total_loss = 0
        for batch_idx, (x, _) in enumerate(dataloader):
            x = x.to(device)
            loss = diffusion.train_step(x, optimizer)
            total_loss += loss
        a vg_loss = total_loss / len(dataloader)
        print(f'Epoch {epoch+1}/{epochs}, Loss: {a vg_loss:.6f}')

        # 每个epoch生成样本
        if (epoch+1) % 10 == 0:
            samples = diffusion.sample(batch_size=16).cpu()
            # 反归一化到[0,1]
            samples = (samples + 1) / 2
            # 保存或显示(此处仅打印)
            print(f'Sample shape: {samples.shape}, min: {samples.min():.3f}, max: {samples.max():.3f}')

    return diffusion


# -------------------- 主程序 --------------------
if __name__ == '__main__':
    device = 'cuda' if torch.cuda.is_a vailable() else 'cpu'
    print(f'Using device: {device}')

    # 训练模型(可注释掉以节省时间,直接使用预训练)
    diffusion = train_mnist_diffusion(epochs=50, batch_size=128, device=device)

    # 生成更多样本
    samples = diffusion.sample(batch_size=64)
    samples = (samples + 1) / 2  # 反归一化
    print(f'Generated {samples.shape[0]} samples.')

    # 保存模型
    torch.sa ve(diffusion.model.state_dict(), 'diffusion_mnist.pth')
    print('Model sa ved.')

运行结果说明

  1. 训练损失:初始损失通常在0.5-1.0之间,随着训练深入逐渐降至0.05以下。MNIST数据集较为简单,50个epoch即可生成清晰的数字。

  2. 生成样本质量:生成的图像为28x28的灰度数字,轮廓清晰,数字形态符合MNIST数据分布。早期epoch生成的样本较模糊,后期逐渐锐利。

  3. 采样速度:T=1000步时,单次生成64个样本大约需要5-10秒(GPU上)。若需提速,可通过减少采样步数(例如使用DDIM采样器)来加速。

  4. 注意事项:训练时必须将数据归一化到[-1, 1]区间,否则前向过程会出现数值溢出。生成样本后记得反归一化到[0, 1]才能正确显示。

常见问题与避坑

问题1:训练损失不下降

  • 原因:学习率过大或过小、网络初始化不当、数据未归一化。
  • 解决方案:使用Adam优化器,学习率设为1e-4;检查输入数据是否在[-1, 1]区间;加入BatchNorm以稳定训练。

问题2:生成样本全黑或全白

  • 原因:噪声调度表参数不合理,或采样过程中 \(\sigma_t\) 计算错误。
  • 解决方案:确保 beta_start=1e-4, beta_end=0.02;检查采样代码中 \(\sigma_t\) 是否随 t 变化;亦可尝试余弦调度。

问题3:生成样本模糊,缺乏细节

  • 原因:训练步数不足、网络容量太小、T值过大导致去噪困难。
  • 解决方案:增加 epoch 数;提升U-Net通道数(例如将 base_channels 从64改为128);采用DDIM采样器减少步数。

问题4:采样速度极慢

  • 原因:T=1000步意味着需执行1000次网络前向传播。
  • 解决方案:使用DDIM采样(步数减少至50-100);采用DPM-Solver等快速采样器;推理时使用FP16混合精度。

问题5:条件生成时控制力不足

  • 原因:未正确注入条件信息(如类别标签、文本嵌入)。
  • 解决方案:在U-Net中增加条件嵌入,通过交叉注意力或加法融合;使用Classifier-Free Guidance增强条件强度。

问题6:内存溢出(OOM)

  • 原因:batch_size 过大,或图像分辨率过高。
  • 解决方案:减小 batch_size;使用梯度累积;利用混合精度训练(AMP);采样时逐步生成。

总结

扩散模型通过将数据分布逐步转化为噪声分布,再学习逆向去噪过程,实现了稳定且高质量的生成。从数学推导到代码实现,本文完整呈现了DDPM的核心机制。关键要点总结如下:

  1. 前向过程是固定的马尔可夫链,借助重参数化可直接计算任意时刻的噪声状态。
  2. 训练目标简化为预测添加的噪声,使用MSE损失即可稳定收敛,十分省心。
  3. 逆向过程需从 \(x_T\) 逐步去噪,每一步根据预测噪声计算均值,并添加随机噪声。
  4. 网络架构通常采用U-Net,时间步通过正弦位置编码嵌入。
  5. 实际应用时需注意数据归一化、噪声调度表选择、采样加速等工程细节。

扩散模型已成为生成式AI的基石技术,理解其内部机制对后续研究(如Stable Diffusion、DALL-E 2)至关重要。建议读完代码后,尝试修改网络架构以适应更高分辨率图像,或引入条件控制实现文生图功能——那才是真正有趣的地方。

来源:https://juejin.cn/post/7653415367206141994
上一篇FAISS核心场景与工程选型:从RAG到千万级向量检索 下一篇生产级RAG架构设计实战:完整流程与最佳实践
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
Windows Docker Desktop RabbitMQ生产级部署完整指南
AI教程 · 2026-06-29

Windows Docker Desktop RabbitMQ生产级部署完整指南

前言 在 Windows 本地开发环境中,直接安装 RabbitMQ 确实颇为周折:需要单独配置 Erlang 运行环境、手动管理环境变量、服务启停全凭手工操作。更令人困扰的是,版本兼容冲突、端口占用、环境不一致等问题层出不穷。笔者见过不少开发者为搭建环境就得耗费整整半天时间。 相比之下,借助 Do

AI搜索重构制造业采购逻辑的阿里云企业级GEOCMS优化实践
AI教程 · 2026-06-29

AI搜索重构制造业采购逻辑的阿里云企业级GEOCMS优化实践

先分享一个切实感受。过去两年,我们与福建制造企业合作较为频繁,发现一个非常突出的现象:超过80%的企业官网,产品参数仍然存放在PDF或图片中。AI爬虫?根本无法抓取。这些企业技术实力不弱、资质证照齐全、应用案例也丰富,但在AI搜索这一全新战场上,它们几乎处于隐身状态。 一、一个正在发生的行业变化 A

阿里云Token Plan团队版功能价格与省钱购买指南
AI教程 · 2026-06-29

阿里云Token Plan团队版功能价格与省钱购买指南

阿里云百炼近期推出了名为“Token Plan 团队版”的全新服务,这一服务专为企业与开发者量身打造,定位为AI大模型订阅平台。通过引入Credits作为统一计量单位,将文本生成、图像生成等多模态AI能力纳入单一计费体系,同时无缝兼容主流AI编程工具及智能体(Agent)生态系统。其核心亮点包括:全

阿里云物联网.NET Core客户端位置信息上报
AI教程 · 2026-06-29

阿里云物联网.NET Core客户端位置信息上报

阿里云物联网平台的位置服务并非一个完全独立的功能模块。位置信息可包含二维坐标与三维坐标,而位置数据的来源本质上是借助设备属性进行上传。换言之,若要让设备上报位置,您需先将其视为一个普通属性进行处理。 1)添加二维位置数据 操作过程十分简洁。进入数据分析 → 空间数据可视化 → 二维数据,点击添加,将

年阿里云服务器选型配置与网站部署全攻略
AI教程 · 2026-06-29

年阿里云服务器选型配置与网站部署全攻略

2026年,阿里云服务器生态已高度成熟,形成了清晰的轻量应用服务器与ECS云服务器两大产品阵营。无论你是计划搭建个人博客、企业官网,还是运营电商平台、进行应用开发,基本都能找到理想的解决方案。本指南将从服务器选型、配置选择、部署流程到安全运维,系统梳理2026年最实用的操作要点,帮助你少走弯路,让网