告别数学恐惧:用Python和NumPy手把手实现Gibbs采样(附完整代码)
用Python实战Gibbs采样:零数学推导的代码实现指南
第一次接触Gibbs采样时,我被那些复杂的条件概率公式和马尔可夫链理论彻底绕晕了。直到在自然语言处理项目中需要实现主题模型,才不得不硬着头皮重新学习。本文记录了我如何用NumPy从零实现Gibbs采样的完整过程——跳过数学证明,直接通过代码理解算法本质。即使你完全不懂马尔可夫链的细致平稳条件,也能跟着代码一步步构建出可用的采样器。
1. 准备工作:理解Gibbs采样的核心思想
Gibbs采样是一种特殊的马尔可夫链蒙特卡洛(MCMC)方法,用于从复杂的高维概率分布中抽取样本。与直接采样不同,它通过轮流固定其他维度,在单个维度上进行条件采样,最终收敛到目标分布。
想象你在一个多峰的高维山地中蒙眼行走:
- 每次只能沿一个坐标轴移动
- 移动距离由当前其他坐标位置决定的山势坡度决定
- 长期游走后,你在每个位置停留的时间比例就是该处的真实概率密度
关键优势在于:
- 避免直接计算复杂的高维积分
- 只需知道各维度上的条件分布即可
- 特别适合贝叶斯统计中的后验分布采样
import numpy as np import matplotlib.pyplot as plt from scipy.stats import norm # 设置随机种子保证结果可复现 np.random.seed(42)2. 案例设计:双峰高斯混合分布
我们用一个具体的二维案例来演示Gibbs采样。假设目标分布是两个高斯分布的混合:
$$ p(x,y) = 0.5 \cdot N(\mu_1, \Sigma_1) + 0.5 \cdot N(\mu_2, \Sigma_2) $$
其中:
- $\mu_1 = [0, 0]$, $\Sigma_1 = \begin{bmatrix}1.5 & 0.2\0.2 & 1.4\end{bmatrix}$
- $\mu_2 = [2, 3]$, $\Sigma_2 = \begin{bmatrix}1.1 & 0.4\0.4 & 1.3\end{bmatrix}$
# 定义两个高斯分布的参数 mu1 = np.array([0, 0]) cov1 = np.array([[1.5, 0.2], [0.2, 1.4]]) mu2 = np.array([2, 3]) cov2 = np.array([[1.1, 0.4], [0.4, 1.3]]) # 混合权重 weights = np.array([0.5, 0.5])3. 条件分布计算:Gibbs采样的核心
Gibbs采样需要知道每个变量在其他变量给定时的条件分布。对于高斯混合模型,条件分布仍然是高斯混合,但权重会变化。
x在y给定时的条件分布: $$ p(x|y) = w_1(y) \cdot N(\mu_{x|y}^{(1)}, \sigma_{x|y}^{(1)}) + w_2(y) \cdot N(\mu_{x|y}^{(2)}, \sigma_{x|y}^{(2)}) $$
其中权重$w_i(y)$与$p_Y^{(i)}(y)$成正比。
def conditional_x_given_y(y): # 计算两个分量在y处的边缘密度 py1 = norm.pdf(y, mu1[1], np.sqrt(cov1[1,1])) py2 = norm.pdf(y, mu2[1], np.sqrt(cov2[1,1])) # 计算权重 w1 = py1 * weights[0] / (py1 * weights[0] + py2 * weights[1]) w2 = 1 - w1 # 计算条件高斯参数 # 对于第一个分量 cond_mu1 = mu1[0] + cov1[0,1]/cov1[1,1] * (y - mu1[1]) cond_var1 = cov1[0,0] - cov1[0,1]**2/cov1[1,1] # 对于第二个分量 cond_mu2 = mu2[0] + cov2[0,1]/cov2[1,1] * (y - mu2[1]) cond_var2 = cov2[0,0] - cov2[0,1]**2/cov2[1,1] return [(w1, cond_mu1, np.sqrt(cond_var1)), (w2, cond_mu2, np.sqrt(cond_var2))] def conditional_y_given_x(x): # 类似于conditional_x_given_y的实现 # 省略详细代码...4. Gibbs采样实现:完整代码解析
Gibbs采样的核心是一个循环,在每次迭代中:
- 固定y,从p(x|y)采样新的x
- 固定x,从p(y|x)采样新的y
def gibbs_sampling(initial_point, n_samples, burn_in): samples = np.zeros((n_samples + burn_in, 2)) current = np.array(initial_point) for i in range(n_samples + burn_in): # 采样x给定y components = conditional_x_given_y(current[1]) # 根据权重选择分量 choice = np.random.choice([0,1], p=[c[0] for c in components]) # 从选中的高斯分量中采样 current[0] = np.random.normal(components[choice][1], components[choice][2]) # 采样y给定x components = conditional_y_given_x(current[0]) choice = np.random.choice([0,1], p=[c[0] for c in components]) current[1] = np.random.normal(components[choice][1], components[choice][2]) samples[i] = current # 丢弃burn-in期的样本 return samples[burn_in:] # 运行采样 samples = gibbs_sampling(initial_point=[0,0], n_samples=10000, burn_in=1000)5. 结果可视化与诊断
采样完成后,我们需要验证样本是否真的来自目标分布。
边缘分布对比:
plt.figure(figsize=(12,5)) # x的边缘分布 plt.subplot(121) plt.hist(samples[:,0], bins=50, density=True, alpha=0.6) # 理论边缘分布 x = np.linspace(-5, 7, 500) px = weights[0]*norm.pdf(x, mu1[0], np.sqrt(cov1[0,0])) + \ weights[1]*norm.pdf(x, mu2[0], np.sqrt(cov2[0,0])) plt.plot(x, px, 'r-', lw=2) plt.title('x的边缘分布') # y的边缘分布 plt.subplot(122) plt.hist(samples[:,1], bins=50, density=True, alpha=0.6) y = np.linspace(-5, 8, 500) py = weights[0]*norm.pdf(y, mu1[1], np.sqrt(cov1[1,1])) + \ weights[1]*norm.pdf(y, mu2[1], np.sqrt(cov2[1,1])) plt.plot(y, py, 'r-', lw=2) plt.title('y的边缘分布') plt.tight_layout() plt.show()采样路径展示:
plt.figure(figsize=(10,8)) plt.plot(samples[:200,0], samples[:200,1], 'b-', alpha=0.3) plt.plot(samples[:200,0], samples[:200,1], 'ro', markersize=2) plt.title('Gibbs采样路径 (前200次迭代)') plt.xlabel('x') plt.ylabel('y') plt.grid(True)6. 性能优化与实用技巧
在实际应用中,Gibbs采样可能会遇到收敛慢、自相关高等问题。以下是几个实用优化技巧:
1. 稀疏采样:
# 每隔k个样本保留一个,减少自相关 thin = 5 thinned_samples = samples[::thin]2. 并行链诊断:
# 运行多条独立链检查收敛性 chains = [gibbs_sampling(np.random.randn(2), 2000, 500) for _ in range(4)] # 绘制不同链的轨迹 plt.figure(figsize=(10,6)) for i, chain in enumerate(chains): plt.plot(chain[:,0], label=f'Chain {i+1}') plt.title('多条链的x值轨迹') plt.legend()3. 自适应调整:
# 动态调整采样步数 def adaptive_gibbs(initial_point, min_samples=5000, conv_threshold=0.01): samples = [] current = np.array(initial_point) converged = False batch_size = 1000 while not converged: batch = gibbs_sampling(current, batch_size, 0) samples.extend(batch) # 检查最后两批次的均值差异 if len(samples) >= 2*batch_size: prev_mean = np.mean(samples[-2*batch_size:-batch_size], axis=0) curr_mean = np.mean(samples[-batch_size:], axis=0) if np.all(np.abs(prev_mean - curr_mean) < conv_threshold): converged = True current = batch[-1] return np.array(samples)7. 在主题模型中的应用实例
Gibbs采样最著名的应用之一是Latent Dirichlet Allocation(LDA)主题模型。虽然完整的LDA实现较复杂,但我们可以看一个简化的核心采样步骤:
# 伪代码展示LDA中的Gibbs采样 def lda_gibbs_sample(documents, n_topics, vocab_size, alpha, beta, n_iterations): # 初始化计数矩阵 topic_word_counts = np.zeros((n_topics, vocab_size)) + beta doc_topic_counts = np.zeros((len(documents), n_topics)) + alpha assignments = [] # 随机初始化主题分配 for d, doc in enumerate(documents): doc_assignments = [] for w in doc: topic = np.random.randint(n_topics) doc_assignments.append(topic) topic_word_counts[topic, w] += 1 doc_topic_counts[d, topic] += 1 assignments.append(doc_assignments) # Gibbs采样迭代 for _ in range(n_iterations): for d, doc in enumerate(documents): for i, w in enumerate(doc): # 移除当前词的计数 old_topic = assignments[d][i] topic_word_counts[old_topic, w] -= 1 doc_topic_counts[d, old_topic] -= 1 # 计算新主题的条件概率 topic_probs = ( (doc_topic_counts[d] / (doc_topic_counts[d].sum() + n_topics * alpha)) * (topic_word_counts[:, w] / (topic_word_counts.sum(axis=1) + vocab_size * beta)) ) topic_probs /= topic_probs.sum() # 从条件分布中采样新主题 new_topic = np.random.choice(n_topics, p=topic_probs) assignments[d][i] = new_topic topic_word_counts[new_topic, w] += 1 doc_topic_counts[d, new_topic] += 1 return topic_word_counts, doc_topic_counts实现Gibbs采样器时,最常遇到的坑是条件分布计算错误。我在第一次尝试时曾混淆了联合分布和条件分布的参数,导致采样结果完全偏离目标分布。另一个教训是burn-in期的选择——太短会导致样本不收敛,太长又会浪费计算资源。通过观察多条链的轨迹和统计量稳定性来确定合适的burn-in期是最可靠的方法。
