Xenium数据分析 | 数据预处理、单细胞降维聚类、细胞类型定义
上节我们下载10x官方数据后,使用spatialdata框架进行数据读取,这节我们拿到单细胞数据后,使用常规单细胞数据分析流程,进行数据质控、低质量细胞删除、降维聚类、筛选特征基因、参考文章细胞类型marker进行细胞类型定义。
数据处理大致过程如下
import os
import threading
import spatialdata as sd
from spatialdata_io import xenium
# 多线程读取Xenium下机数据读取
def xenium_data_load_multithreaded(data_dir, sample_info):
def sd_read_xenium(sample_data, sample_name, sdata_dict):
sdata = xenium(path=sample_data, cells_boundaries=True, n_jobs=6)
sdata_dict[sample_name] = sdata
threads = []
sdata_dict = {}
sample_2_group = {}
with open(sample_info, 'r') as f:
for line in f:
raw_name, sample_name, group_name = line.strip().split('\t')[:3] # 这里根据自己实际情况修改
sample_2_group[sample_name] = group_name
thread = threading.Thread(target=sd_read_xenium, args=(os.path.join(data_dir, raw_name),sample_name, sdata_dict,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
sdata = sd.concatenate(
sdata_dict,
concatenate_tables=True, # 这里是将多样本的单细胞数据合并在一起到table中
obs_names_make_unique=True
)
sdata.tables['table'].obs["sample"] = sdata.tables['table'].obs["region"].str.replace('cell_circles-', '')
sdata.tables['table'].obs["group"] = sdata.tables['table'].obs["sample"].apply(lambda x: sample_2_group[x])
sdata.tables['table'].obs["cell_boundaries"] = sdata.tables['table'].obs["region"].str.replace('cell_circles', 'cell_boundaries')
sdata.set_table_annotates_spatialelement(table_name='table', region=[i for i in sdata.shapes.keys() if i.startswith('cell_boundaries-')], region_key='cell_boundaries')
return sdata
sdata = xenium_data_load_multithreaded(data_dir='./Xenium_Prime_Human_Lung_Cancer_FFPE_outs', sample_info='sample_info.txt')
# 拿到单细胞表达数据
adata = sdata.tables['table']
# 数据质控
sc.pp.calculate_qc_metrics(adata, percent_top=(10, 20, 50), inplace=True)
adata.obs['log10GenesPerUMI'] = np.log10(adata.obs['n_genes_by_counts']) / np.log10(adata.obs['total_counts'])
# 低质量细胞过滤
sc.pp.filter_cells(adata, min_counts=10)
sc.pp.filter_cells(adata, max_counts=4000)
sc.pp.filter_genes(adata, min_cells=10)
adata = adata[(adata.obs['log10GenesPerUMI'] > 0.85) & (adata.obs['log10GenesPerUMI'] < 0.99)]
# 降维聚类
def rapids_singlecell_cluster(
adata: sc.AnnData,
n_hvg: int = 2000,
n_neighbors: int = 20,
n_pcs: int = 15,
res: float = 0.8,
) -> sc.AnnData:
"""
使用GPU加速的单细胞数据聚类分析
参数:
adata -- AnnData对象,包含单细胞数据
n_hvg -- 选择的高变基因数量,默认2000
n_neighbors -- 最近邻数量,默认20
n_pcs -- 使用的主成分数,默认15
res -- Leiden聚类的分辨率参数,默认0.8
random_state -- 随机种子,保证结果可重复,默认0
返回:
处理后的AnnData对象,包含聚类结果和可视化信息
"""
# 转移数据到GPU加速计算
rsc.get.anndata_to_GPU(adata, convert_all=True)
# 数据标准化
rsc.pp.normalize_total(adata, inplace=True)
rsc.pp.log1p(adata)
adata.raw = adata.copy()
# 高变基因筛选
rsc.pp.highly_variable_genes(
adata,
n_top_genes=n_hvg,
flavor="seurat",
batch_key="sample",
)
adata = adata[:, adata.var['highly_variable']].copy()
# 数据归一化
rsc.pp.regress_out(adata, keys=['total_counts'])
rsc.pp.scale(adata, max_value=10)
# PCA降维
rsc.pp.pca(adata, n_comps=n_pcs)
sc.pl.pca_variance_ratio(adata, log=True, n_pcs=30, show=False)
# 批次校正(多样本时)
rsc.pp.harmony_integrate(adata, key="sample")
rsc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=n_pcs, use_rep='X_pca_harmony', random_state=0)
rsc.tl.umap(adata, random_state=0)
rsc.tl.leiden(adata, resolution=r, key_added=f'res_{r}', random_state=0)
rsc.get.anndata_to_CPU(adata, convert_all=True)
sc.pl.umap(
adata,
color=f'res_{res}',
legend_loc='on data',
legend_fontsize=9,
legend_fontoutline=3,
frameon=True,
show=False
)
rsc.get.anndata_to_CPU(adata, convert_all=True)
return adata
adata = rapids_singlecell_cluster(adata)
封装一个细胞类型占比绘图函数,可以绘制单样本或多样本分开的饼图、柱状图
def plot_cellular_composition(adata, column='celltype', group='sample', max_cols=4, label_threshold=2, show_labels=True, adjust_labels=False, plot_type="pie"):
if adjust_labels:
try:
from adjustText import adjust_text
except ImportError:
raise ImportError("The 'adjustText' module is required for label adjustment. Please install it with `pip install adjusttext` or select adjust_labels=False.")
cats = sorted([i for i in adata.obs[group].drop_duplicates()])
compositions = {}
for cat in cats:
idx = adata[adata.obs[group] == cat].obs.index
compositions[cat] = adata.obs[column].loc[idx].value_counts(normalize=True) * 100 # calculate percentage
compositions = pd.DataFrame(compositions)
# Define a function to display percentages above the threshold
def autopct_func(pct):
return ('%1.1f%%' % pct) if pct > label_threshold else ''
def get_nrows_maxcols(n_keys, max_cols):
if n_keys > max_cols:
n_rows = math.ceil(n_keys / max_cols)
else:
n_rows = 1
max_cols = n_keys
return n_keys, n_rows, max_cols
if plot_type == "pie":
# Plot pie charts for each area
n_plots, nrows, ncols = get_nrows_maxcols(len(cats), max_cols)
fig, axs = plt.subplots(nrows, ncols, figsize=(5*ncols, 5*nrows))
if n_plots > 1:
axs = axs.ravel()
else:
axs = [axs]
for i, area in enumerate(compositions.columns):
if show_labels:
wedges, texts, autotexts = axs[i].pie(compositions[area], autopct=autopct_func, pctdistance=1.15)
else:
wedges, texts = axs[i].pie(compositions[area])
title_str = textwrap.fill(f'Proportions of Cell Types in {area}', width=20)
axs[i].set_title(title_str)
if adjust_labels:
# Adjust text to avoid overlap
adjust_text(texts + autotexts, ax=axs[i], arrowprops=dict(arrowstyle="->", color='k', lw=0.5))
# Add a legend
fig.legend(wedges, compositions.index, loc='center left', bbox_to_anchor=(1, 0, 0.5, 1))
elif plot_type in ["bar", "barh"]:
# Plot a single stacked bar plot
if plot_type == "bar":
fig_width = 0.9*len(cats)
fig_height = 4
ylabel = "%"
xlabel = group
else:
fig_width = 6
fig_height = 0.9*len(cats)
ylabel = group
xlabel = "%"
compositions.T.plot(kind=plot_type, stacked=True, figsize=(fig_width, fig_height), width=0.7)
plt.title('Cell type composition')
plt.ylabel(ylabel)
plt.xlabel(xlabel)
plt.grid(False)
plt.legend(title='Cell Types', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plot_cellular_composition(adata, column='celltype', group='sample', max_cols=3, label_threshold=0, show_labels=True, adjust_labels=False, plot_type="pie")