当前位置: 首页 > article >正文

【python因果库实战9】TMLE - 目标最大似然估计2

这里写目录标题

    • TMLE in causallib
      • 数据
      • 双重稳健性
      • TMLE 变体
        • 简化形式与完整形式
        • 特征与加权

在这里插入图片描述

TMLE in causallib

让我们看一个例子,说明如何使用CausalLib中的TMLE。

import pandas as pd
import numpy as np

from causallib.estimation import TMLE
from causallib.estimation import Standardization, IPW
from mlxtend.classifier import StackingCVClassifier
from mlxtend.regressor import StackingCVRegressor

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.model_selection import RandomizedSearchCV, GridSearchCV
from sklearn.ensemble import GradientBoostingClassifier, HistGradientBoostingClassifier, RandomForestClassifier
# from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingRegressor, HistGradientBoostingRegressor, RandomForestRegressor
from sklearn.svm import SVR
from sklearn.linear_model import Lasso, Ridge, LinearRegression, LassoCV, RidgeCV

import matplotlib.pyplot as plt
import seaborn as sns

数据

我们合成数据,因此我们知道真实的因果效应,依据的是Schuler和Rose 2017年的研究。
这是一个相对简单的机制,其中暴露量依赖于3个伯努利变量的线性组合,而结果也依赖于第三个变量的效果修饰。

def generate_single_dataset(n, seed=None):
    if seed is not None:
        np.random.seed(seed)
    
    X = np.random.binomial(n=1, p=(0.55, 0.30, 0.25), size=(n, 3))
    
    a_logit = -0.5 + X @ np.array([0.75, 1, 1.5])
    propensity = 1 / (1 + np.exp(-a_logit))
    a = np.random.binomial(1, propensity)
    
    y_func = lambda z: 24 - 3*z + X@np.array([3, -4, -6]) - 1.5*z*X[:, -1]
    noise = np.random.normal(0, 4.5, size=n)
    y = y_func(a) + noise
    y0 = y_func(np.zeros_like(a)) + noise
    y1 = y_func(np.ones_like(a)) + noise
    sate = np.mean(y1 - y0)
    # sate = -3.38
    # cate = y1 - y0
    
    X = pd.DataFrame(X)
    a = pd.Series(a)
    y = pd.Series(y)
    return X, a, y, sate
def run_single_simulation(tmle, n_samples, seed=None):
    X, a, y, sate = generate_single_dataset(n_samples, seed=None)
    tmle.fit(X, a, y)
    
    tmle_po = tmle.estimate_population_outcome(X, a)
    tmle_ate = tmle.estimate_effect(tmle_po[1], tmle_po[0])['diff']
    
    outcome_model_po = tmle.outcome_model.estimate_population_outcome(X, a)
    outcome_model_ate = tmle.outcome_model.estimate_effect(outcome_model_po[1], outcome_model_po[0])['diff']
    
    return tmle_ate, outcome_model_ate, sate


def run_multiple_simulations(tmle, n_simulations, n_samples):
    true_ates = []
    tmle_ates = []
    om_ates = []
    for i in range(n_simulations):
        tmle_ate, outcome_model_ate, sate = run_single_simulation(tmle, n_samples, i)

        tmle_ates.append(tmle_ate)
        om_ates.append(outcome_model_ate)
        true_ates.append(sate)
    
    predictions = pd.DataFrame(
        {"tmle": tmle_ates, "initial_model": om_ates, "true": true_ates}
    ).rename_axis("simulation").reset_index()
    # true_ate = np.mean(true_ates)
    true_ate = -3.38
    return predictions, true_ate

def plot_multiple_simulations(results, true_ate):
    results = results.drop(columns=["true"])
    results = results.melt(id_vars="simulation", var_name="method", value_name="ate")
    
    # Plot inidividual experiments:
    fig, axes = plt.subplots(1, 2, sharey=True,  
                             gridspec_kw={'width_ratios': [3, 1]},
                             figsize=(8, 3))
    sns.scatterplot(
        x="simulation", y="ate", hue="method", style="method", 
        data=results, 
        ax=axes[0]
    )
    axes[0].axhline(y=true_ate, linestyle='--', color="grey")
    # axes[0].text(results["simulation"].max(), true_ate - 0.1, "True ATE", color="slategrey")
    # (results.set_index(["simulation", "method"]) - true_ate).groupby("method").mean()
    
    # Plot distribution summary:
    axes[1].axhline(y=true_ate, linestyle='--', color="grey")
    sns.kdeplot(
        y="ate", hue="method", 
        data=results,
        legend=False,
        ax=axes[1]
    )
    axes[1].text(axes[1].get_xlim()[1], true_ate - 0.2, "True ATE", color="slategrey")
    
    # mean_bias = (results.set_index(["simulation", "method"]) - true_ate).groupby("method").mean()
    # pd.plotting.table(axes[2], mean_bias)
    
    axes[0].legend(loc='center left', bbox_to_anchor=(1.35, 0.1))

    plt.subplots_adjust(wspace=0.01)
    return axes
    

双重稳健性

我们将看到,无论是结果模型指定错误,TMLE都能对其进行修正,并导致整体上良好的估计。

我们首先将结果模型指定为L1正则化的逻辑回归(LASSO)。这个模型也是被误指定的,因为它没有考虑到治疗与第三个指示变量之间的交互作用。

outcome_model = Standardization(Lasso(random_state=0))
weight_model = IPW(LogisticRegression(penalty="none", random_state=0))
tmle = TMLE(
    outcome_model=outcome_model,
    weight_model=weight_model,
    reduced=True
)

n_samples = 500
n_simulations = 20

results, true_ate = run_multiple_simulations(tmle, n_simulations, n_samples)
plot_multiple_simulations(results, true_ate);

在这里插入图片描述
我们发现结果模型的预测偏离了实际效果,可能甚至偶尔忽略了治疗变量(在治疗效果接近0的情况下)。

我们可以通过允许更灵活的模型(通过使用交互项)来改进这一点,并使用交叉验证来找到所有新多项式参数正确的正则化程度。

outcome_model = Standardization(make_pipeline(PolynomialFeatures(2), LassoCV(random_state=0)))
tmle = TMLE(
    outcome_model=outcome_model,
    weight_model=weight_model,
    reduced=True
)

n_samples = 500
n_simulations = 20

results, true_ate = run_multiple_simulations(tmle, n_simulations, n_samples)
plot_multiple_simulations(results, true_ate);

在这里插入图片描述
初始的结果模型的表现已经好多了。虽然我们可以看到TMLE能够对其进行轻微的修正。

最后,当我们有了一个正确指定的结果模型后,我们可以极度限制权重模型,使其被误指定,来看看会发生什么:

weight_model = IPW(make_pipeline(PolynomialFeatures(2), LogisticRegression(penalty="l1", C=0.00001, solver="saga")))
tmle = TMLE(
    outcome_model=outcome_model,
    weight_model=weight_model,
    reduced=True
)

n_samples = 500
n_simulations = 20

results, true_ate = run_multiple_simulations(tmle, n_simulations, n_samples)
plot_multiple_simulations(results, true_ate);

在这里插入图片描述
我们发现TMLE知道忽略被误指定的倾向性模型。它几乎不做任何修正,实际上就是结果模型的效果。

最终,TMLE通常与“超级学习”结合使用(因为两者都来自Mark van der Laan和他的学生),这基本上是一个包含丰富基估计器集合的堆叠元学习器。我们可以看看当结果模型和治疗模型都是高度数据适应性时,TMLE会有怎样的表现。
需要注意的是,我们使用了堆叠交叉验证估计器,这样可以确保基估计器和元估计器都使用交叉验证。这种嵌套交叉验证减少了基估计器和元估计器之间信息泄露的风险,进一步确保不会出现过拟合。如果你还记得的话,TMLE对初始估计器的残差进行回归,所以初始结果模型不过拟合、不虚假地隐藏残差偏差信息是非常重要的。

outcome_model = StackingCVRegressor(
    [
        GradientBoostingRegressor(n_estimators=10),
        GradientBoostingRegressor(n_estimators=30),
        GradientBoostingRegressor(n_estimators=50),
        HistGradientBoostingRegressor(max_iter=10),
        HistGradientBoostingRegressor(max_iter=30),
        HistGradientBoostingRegressor(max_iter=50),
        RandomForestRegressor(n_estimators=25),
        RandomForestRegressor(n_estimators=50),
        RandomForestRegressor(n_estimators=100),
        SVR(kernel="rbf", C=0.01),
        SVR(kernel="rbf", C=0.1),
        SVR(kernel="rbf", C=1),
        SVR(kernel="rbf", C=10),
        SVR(kernel="poly", C=0.01, degree=2),
        SVR(kernel="poly", C=0.1, degree=2),
        SVR(kernel="poly", C=1, degree=2),
        SVR(kernel="poly", C=10, degree=2),
        SVR(kernel="poly", C=0.01, degree=3),
        SVR(kernel="poly", C=0.1, degree=3),
        SVR(kernel="poly", C=1, degree=3),
        SVR(kernel="poly", C=10, degree=3),
        make_pipeline(PolynomialFeatures(degree=2), Lasso(alpha=0.01)),
        make_pipeline(PolynomialFeatures(degree=2), Lasso(alpha=0.1)),
        make_pipeline(PolynomialFeatures(degree=2), Lasso(alpha=1)),
        make_pipeline(PolynomialFeatures(degree=2), Lasso(alpha=10)),
        make_pipeline(PolynomialFeatures(degree=2), Ridge(alpha=0.01)),
        make_pipeline(PolynomialFeatures(degree=2), Ridge(alpha=0.1)),
        make_pipeline(PolynomialFeatures(degree=2), Ridge(alpha=1)),
        make_pipeline(PolynomialFeatures(degree=2), Ridge(alpha=10)),
    ],
    LinearRegression(),
    cv=10,
    random_state=0,
    # verbose=1,
)

weight_model = StackingCVClassifier(
    [
        GradientBoostingClassifier(n_estimators=10),
        GradientBoostingClassifier(n_estimators=30),
        GradientBoostingClassifier(n_estimators=50),
        make_pipeline(PolynomialFeatures(degree=2), LogisticRegression(penalty="l1", C=0.01, solver="saga", max_iter=5000)),
        make_pipeline(PolynomialFeatures(degree=2), LogisticRegression(penalty="l1", C=0.1, solver="saga", max_iter=5000)),
        make_pipeline(PolynomialFeatures(degree=2), LogisticRegression(penalty="l1", C=1, solver="saga", max_iter=5000)),
        make_pipeline(PolynomialFeatures(degree=2), LogisticRegression(penalty="l1", C=10, solver="saga", max_iter=5000)),
        make_pipeline(PolynomialFeatures(degree=2), LogisticRegression(penalty="l2", C=0.01)),
        make_pipeline(PolynomialFeatures(degree=2), LogisticRegression(penalty="l2", C=0.1)),
        make_pipeline(PolynomialFeatures(degree=2), LogisticRegression(penalty="l2", C=1)),
        make_pipeline(PolynomialFeatures(degree=2), LogisticRegression(penalty="l2", C=10)),
    ],
    LogisticRegression(penalty="none"),
    cv=10,
    random_state=0,
    # verbose=1,
)

outcome_model = Standardization(outcome_model)
weight_model = IPW(weight_model)

tmle = TMLE(
    outcome_model=outcome_model,
    weight_model=weight_model,
    reduced=True
)

n_samples = 500
n_simulations = 20

results, true_ate = run_multiple_simulations(tmle, n_simulations, n_samples)
plot_multiple_simulations(results, true_ate);

在这里插入图片描述
以下是对提供的英文内容的中文翻译,注意保持了Markdown格式的标题和保留了代码块:


TMLE 变体

如果你一直进行到第三步,也就是我们构建“聪明协变量”的地方,你会看到一个星号。这是因为这种聪明协变量有多种变体。

所有变体都使用来自治疗机制的逆倾向性权重信息。然而,它们使用这种信息的确切方式可能会略有不同。

简化形式与完整形式

首先,“聪明协变量”可以是单个协变量或矩阵。

TMLE 需要 ( P r [ A = a ∣ X = i ] ) ( Pr[A = a | X = i] ) (Pr[A=aX=i]) 对于所有个体以及所有可能的治疗水平 —— 我们需要估计每个个体的概率以及每种可能的治疗水平。对于二元治疗,这更直接,因为我们有一个值(并且我们可以轻松计算它的补数)。然而,对于三个或更多的治疗水平,我们需要保留更多信息。

因此,对于二元情况,我们可以使用简化形式,这是单向量形式。由于我们只有两组,我们可以:

  • 完整形式,可以捕捉二元和多元治疗,将聪明协变量编码为矩阵。对于每个单元,我们将有一个向量 ( P r [ A = a ∣ X ] ) ( Pr[A = a | X] ) (Pr[A=aX]) 为 1,对于所有 ( a ≠ a i ) ( a ≠ a_i ) (a=ai) 为 0。然后我们使用矩阵回归来回归残差结果,并估计多个 ( E [ a ] ) ( E[a] ) (E[a]) 值而不是单个 ( e ) ( e ) (e) 值。
特征与加权

第二种选择是我们使用逆倾向性信息的地方。到目前为止,我们将其描述为我们要回归的特征(预测器/协变量)。然而,我们也可以将逆倾向性信息用作目标步骤逻辑模型的权重。

这意味着我们可以将“聪明协变量”分解为两个组成部分:

  1. 来自治疗机制的实际逆倾向性信息,
  2. 治疗组的编码。

回到简化/完整形式 —— 使用向量或矩阵本质上是指示编码:

  • 在向量(简化)版本中,治疗的编码为 1,对照组为 -1。
  • 在矩阵(完整)版本中,编码为一位有效(完全虚拟)编码。

causallib 允许用户使用两个二元参数控制这 4 种变体:

  • reduced=True 将创建一个单向量协变量,而 reduced=False 将创建一个矩阵,
  • ImportanceSampling=True 将使用逆倾向性作为回归权重,而 ImportanceSampling=False 将将其作为协变量使用。
from itertools import product
from collections import defaultdict

def compare_TMLE_flavors(outcome_model, weight_model, n_simulations, n_samples):
    true_ates = []
    pred_ates = defaultdict(list)
    for i in range(n_simulations):
        X, a, y, sate = generate_single_dataset(n_samples, seed=i)
        
        for reduced, imp_samp in product([True, False], [True, False]):
            tmle = TMLE(
                outcome_model=outcome_model,
                weight_model=weight_model,
                reduced=reduced,
                importance_sampling=imp_samp,
            )
            tmle.fit(X, a, y)
            potential_outcomes = tmle.estimate_population_outcome(X, a)
            effect = potential_outcomes[1] - potential_outcomes[0]
            
            name = str(type(tmle.clever_covariate_)).split(".")[-1][:-2]
            name = name.lstrip("CleverCovariate")  # shorten name
            pred_ates[name].append(effect)

        true_ates.append(sate)
    
    # # true_ate = np.mean(true_ates)
    # true_ate = -3.38
    pred_ates = pd.DataFrame(pred_ates).rename_axis("simulation").reset_index()
    return pred_ates, true_ates

def plot_tmle_flavor_comparison(pred_ates, true_ates):
    pred_ates = pred_ates.melt(id_vars="simulation", value_name="ate", var_name="method")
    fig, ax = plt.subplots(figsize=(8,5))
    sns.violinplot(y="method", x="ate", hue="method", 
                   data=pred_ates, orient="h", dodge=False, ax=ax)
    sns.stripplot(y="method", x="ate", 
                  color="lightgrey", alpha=0.5, 
                  data=pred_ates, orient="h", ax=ax)
    ax.legend([])  # ax.legend(loc='center left', bbox_to_anchor=(1, 0.5)) 
    ax.axvline(x=np.mean(true_ates), linestyle="--", color="grey")
    ax.text(np.mean(true_ates), ax.get_ylim()[0] + 0.2, "True ATE", color="grey");
    # ax.text(true_ate + 0.02, (ax.get_ylim()[1] + ax.get_ylim()[0]) / 2 + 0.1, "True ATE", color="grey");
outcome_model = Standardization(make_pipeline(PolynomialFeatures(2), LassoCV(random_state=0)))
weight_model = IPW(LogisticRegression(penalty="none", random_state=0))

pred_ates, true_ates = compare_TMLE_flavors(outcome_model, weight_model, 50, 5000)
plot_tmle_flavor_comparison(pred_ates, true_ates)

在这里插入图片描述
我们发现对于当前这个相对简单的情况,所有的方法表现都很相近。


http://www.kler.cn/a/447324.html

相关文章:

  • Windows中运行Linux(WSL)
  • “宏“知识详解
  • 使用二分查找法找出给定点距离给定点集合距离最近的点
  • 单元测试使用记录
  • pycharm debug
  • 使用Python从阿里云物联网平台获取STM32温度数据
  • 如何使用 Python 连接 SQLite 数据库?
  • MicroPython+ESP32:五.PC远程控制LED灯
  • 36.2 内置的k8s采集任务分析
  • AI呼入机器人详解
  • ubuntu 执行sh脚本出现报错:source:not found
  • 界面控件DevExpress v24.2.3全新发布——正式支持.NET 9
  • 算法—回文链表
  • Docker的网络
  • 大语言模型的常用微调方法
  • 单片机上电后程序不运行怎么排查问题?
  • Soul Preserver
  • 圣诞快乐(h5 css js(圣诞树))
  • 大数据之Hbase环境安装
  • 【Linux】usb内核设备信息
  • Elixir Supervisor
  • 青少年编程与数学 02-004 Go语言Web编程 12课题、本地数据存储
  • 智能电动汽车游智能化与电动化
  • IIC I2C子协议 SMBus协议 通信协议原理 时序 SMBus深度剖析
  • 智慧养老系统源码医院陪诊代办买药就医陪护上门护理小程序
  • 【蓝桥杯每日一题】扫描游戏——线段树