8. 马科维茨资产组合模型+FF5+ARCH风险模型优化方案(理论+Python实战)
目录
- 0. 承前
- 1. 核心风险函数代码讲解
- 1.1 数据准备和初始化
- 1.2 单资产ARCH建模
- 1.3 模型拟合和波动率预测
- 1.4 异常处理机制
- 1.5 相关系数矩阵计算
- 1.6 构建波动率矩阵
- 1.7 计算协方差矩阵
- 1.8 确保矩阵对称性
- 1.9 确保矩阵半正定性
- 1.10 格式转换和返回
- 1.11 calculate_covariance_matrix函数汇总
- 2. 代码汇总
- 3. 反思
- 3.1 不足之处
- 3.2 提升思路
- 4. 启后
0. 承前
本篇博文是对之前的篇文章,链接:
4. 马科维茨资产组合模型+Fama-French五因子优化方案(理论+Python实战)
的风险模型协方差矩阵的计算流程进行优化。
本文使用ARCH (Autoregressive Conditional Heteroskedasticity) 自回归条件异方差模型计量风险,目的是:
- 风险计量动态化,能够捕捉波动率的时变特征;
- 体验高可用、低耦合的开发风格,本文汇总代码与上文4.的代码区别仅在于函数:
calculate_covariance_matrix
,且输入输出参数格式一致。
本文主要要素:
- 马科维茨资产组合模型;
- Fama-French五因子模型预期收益率;
- ARCH金融风险计量模型。
如果想更加全面清晰地了解金融资产组合模型进化论的体系架构,可参考:
0. 金融资产组合模型进化全图鉴
- 金融工程流程图:
1. 核心风险函数代码讲解
1.1 数据准备和初始化
初始化存储波动率的数组和获取收益率数据
n_assets = len(monthly_log_returns.columns)
volatilities = np.zeros(n_assets)
returns_array = monthly_log_returns.values
1.2 单资产ARCH建模
对每个资产进行数据缩放和ARCH模型拟合
scale_factor = 100 # 将收益率转换为百分比
scaled_returns = returns_array[:, i] * scale_factor
model = arch.arch_model(
scaled_returns,
vol='Arch',
p=1,
mean='Zero',
dist='normal',
rescale=False
)
1.3 模型拟合和波动率预测
拟合模型并获取最新的条件波动率预测
result = model.fit(disp='off', show_warning=False)
forecast = result.forecast()
last_variance = forecast.variance.values[-1][0]
volatilities[i] = np.sqrt(last_variance) / scale_factor
1.4 异常处理机制
当ARCH模型拟合失败时,使用传统方法计算波动率
except Exception as e:
volatilities[i] = np.std(returns_array[:, i])
1.5 相关系数矩阵计算
计算资产间的相关系数矩阵
correlation_matrix = monthly_log_returns.corr().values
1.6 构建波动率矩阵
将单资产波动率转换为对角矩阵
vol_matrix = np.diag(volatilities)
1.7 计算协方差矩阵
结合波动率矩阵和相关系数矩阵计算最终的协方差矩阵
covariance_matrix = vol_matrix @ correlation_matrix @ vol_matrix
1.8 确保矩阵对称性
通过矩阵运算确保协方差矩阵的对称性
covariance_matrix = (covariance_matrix + covariance_matrix.T) / 2
1.9 确保矩阵半正定性
检查并调整特征值确保矩阵的半正定性
min_eigenval = np.min(np.linalg.eigvals(covariance_matrix))
if min_eigenval < 0:
covariance_matrix -= 1.2 * min_eigenval * np.eye(n_assets)
1.10 格式转换和返回
将结果转换为DataFrame并保持原始数据的索引结构
cov_df = pd.DataFrame(
covariance_matrix,
index=monthly_log_returns.columns,
columns=monthly_log_returns.columns
)
return cov_df
1.11 calculate_covariance_matrix函数汇总
def calculate_covariance_matrix(monthly_log_returns):
"""
使用ARCH模型计算协方差矩阵
参数:
monthly_log_returns: DataFrame, 月度对数收益率数据
返回:
DataFrame: 协方差矩阵
"""
n_assets = len(monthly_log_returns.columns)
volatilities = np.zeros(n_assets)
returns_array = monthly_log_returns.values
# 计算每个资产的条件波动率
for i in range(n_assets):
try:
# 对数据进行缩放(解决DataScaleWarning)
scale_factor = 100 # 将收益率转换为百分比
scaled_returns = returns_array[:, i] * scale_factor
# 使用GARCH(1,1)模型
model = arch.arch_model(
scaled_returns,
vol='Arch',
p=1,
mean='Zero',
dist='normal',
rescale=False # 禁用自动缩放
)
# 拟合模型
result = model.fit(disp='off', show_warning=False)
# 获取最新的条件波动率(修复DeprecationWarning)
forecast = result.forecast()
last_variance = forecast.variance.values[-1][0] # 明确获取标量值
# 将波动率转换回原始比例
volatilities[i] = np.sqrt(last_variance) / scale_factor
except Exception as e:
# 如果ARCH模型拟合失败,退化为使用样本标准差
volatilities[i] = np.std(returns_array[:, i])
# 计算相关系数矩阵
correlation_matrix = monthly_log_returns.corr().values
# 构建波动率矩阵
vol_matrix = np.diag(volatilities)
# 计算协方差矩阵
covariance_matrix = vol_matrix @ correlation_matrix @ vol_matrix
# 确保矩阵是对称的
covariance_matrix = (covariance_matrix + covariance_matrix.T) / 2
# 确保矩阵是半正定的
min_eigenval = np.min(np.linalg.eigvals(covariance_matrix))
if min_eigenval < 0:
covariance_matrix -= 1.2 * min_eigenval * np.eye(n_assets)
# 转换为DataFrame,保持与原始数据相同的索引和列名
cov_df = pd.DataFrame(
covariance_matrix,
index=monthly_log_returns.columns,
columns=monthly_log_returns.columns
)
return cov_df
2. 代码汇总
import tushare as ts
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from scipy.optimize import minimize
import backtrader as bt
import statsmodels.api as sm
import arch
from arch.univariate import ARCH, GARCH
# 参数集##############################################################################
ts.set_token('token')
pro = ts.pro_api()
industry = '银行'
end_date = '20240101'
years = 5 # 数据时长
risk_free_rate = 0.03 # 无风险利率参数
top_holdings = 10 # 持仓数量参数
index_code = '000300.SH' # 市场指数代码参数
# 参数集##############################################################################
def get_industry_stocks(industry):
"""获取指定行业的股票列表"""
df = pro.stock_basic(fields=["ts_code", "name", "industry"])
industry_stocks = df[df["industry"]==industry].copy()
industry_stocks.sort_values(by='ts_code', inplace=True)
industry_stocks.reset_index(drop=True, inplace=True)
return industry_stocks['ts_code'].tolist()
def get_data(code_list, end_date, years):
"""获取指定行业名称的历史收盘价数据"""
ts_code_list = code_list
end_date_dt = datetime.strptime(end_date, '%Y%m%d')
start_date_dt = end_date_dt - timedelta(days=years*365)
start_date = start_date_dt.strftime('%Y%m%d')
all_data = []
for stock in ts_code_list:
df = pro.daily(ts_code=stock, start_date=start_date, end_date=end_date)
all_data.append(df)
combined_df = pd.concat(all_data).sort_values(by=['ts_code', 'trade_date'])
combined_df.reset_index(drop=True, inplace=True)
combined_df.rename(columns={'trade_date': 'date'}, inplace=True)
return combined_df
def get_market_data(index_code='000300.SH', start_date=None, end_date=None):
"""获取市场指数数据用于计算贝塔"""
df_market = pro.index_daily(ts_code=index_code,
start_date=start_date,
end_date=end_date,
fields=['trade_date', 'close'])
df_market['date'] = pd.to_datetime(df_market['trade_date'])
df_market.set_index('date', inplace=True)
df_market = df_market.sort_index()
monthly_last_close = df_market['close'].resample('M').last()
monthly_log_returns = np.log(monthly_last_close).diff().dropna()
return monthly_log_returns
def get_factor_data(stock_codes, start_date=None, end_date=None):
"""获取指定股票的因子数据(市值和PB)"""
all_factor_data = []
for stock in stock_codes:
try:
df = pro.daily_basic(
ts_code=stock,
start_date=start_date,
end_date=end_date,
fields=['ts_code', 'trade_date', 'total_mv', 'pb']
)
all_factor_data.append(df)
except Exception as e:
print(f"获取股票 {stock} 的因子数据失败: {str(e)}")
continue
factor_data = pd.concat(all_factor_data, ignore_index=True)
factor_data['trade_date'] = pd.to_datetime(factor_data['trade_date'])
return factor_data
def get_fina_data(stock_codes, start_date=None, end_date=None):
"""获取指定股票的财务指标数据(ROE和资产增长率)"""
all_fina_data = []
for stock in stock_codes:
try:
df = pro.fina_indicator(
ts_code=stock,
start_date=start_date,
end_date=end_date,
fields=['ts_code', 'end_date', 'roe_dt', 'assets_yoy', 'update_flag']
)
all_fina_data.append(df)
except Exception as e:
print(f"获取股票 {stock} 的财务数据失败: {str(e)}")
continue
# 合并数据
fina_data = pd.concat(all_fina_data, ignore_index=True)
# 处理update_flag,保留最新数据
fina_data = (fina_data.groupby(['ts_code', 'end_date'])
.agg({'roe_dt': 'first',
'assets_yoy': 'first',
'update_flag': 'max'})
.reset_index())
# 将end_date转换为datetime
fina_data['end_date'] = pd.to_datetime(fina_data['end_date'])
# 创建季度到月度的映射
monthly_data = []
for _, row in fina_data.iterrows():
quarter_end = row['end_date']
if quarter_end.month == 3: # Q1
months = [quarter_end + pd.DateOffset(months=i) for i in range(1, 4)]
elif quarter_end.month == 6: # Q2
months = [quarter_end + pd.DateOffset(months=i) for i in range(1, 4)]
elif quarter_end.month == 9: # Q3
months = [quarter_end + pd.DateOffset(months=i) for i in range(1, 4)]
else: # Q4
months = [quarter_end + pd.DateOffset(months=i) for i in range(1, 4)]
for month in months:
monthly_data.append({
'ts_code': row['ts_code'],
'trade_date': month,
'roe_dt': row['roe_dt'],
'assets_yoy': row['assets_yoy']
})
monthly_df = pd.DataFrame(monthly_data)
return monthly_df
def calculate_monthly_log_returns(df):
"""计算每月的对数收益率"""
df['date'] = pd.to_datetime(df['date'])
monthly_last_close = df.groupby(['ts_code', pd.Grouper(key='date', freq='M')])['close'].last().unstack(level=-1)
monthly_log_returns = np.log(monthly_last_close).diff().dropna()
return monthly_log_returns.T
def calculate_expected_returns(monthly_log_returns):
"""使用Fama-French五因子模型计算各股票的预期收益率"""
start_date = monthly_log_returns.index.min().strftime('%Y%m%d')
end_date = monthly_log_returns.index.max().strftime('%Y%m%d')
# 获取财务数据时,将start_date往前推一个季度,以确保有完整的季度数据
fina_start_date = (datetime.strptime(start_date, '%Y%m%d') - timedelta(days=90)).strftime('%Y%m%d')
# 获取市场收益率
market_returns = get_market_data(index_code, start_date, end_date)
# 获取股票的市值和PB数据
stock_data = get_factor_data(
monthly_log_returns.columns.tolist(),
start_date,
end_date
)
# 获取财务指标数据,使用提前的start_date
fina_data = get_fina_data(
monthly_log_returns.columns.tolist(),
fina_start_date,
end_date
)
# 确保所有数据的日期对齐
aligned_dates = monthly_log_returns.index.intersection(market_returns.index)
market_returns = market_returns[aligned_dates]
stock_returns = monthly_log_returns.loc[aligned_dates].copy() # 使用copy()避免SettingWithCopyWarning
def calculate_size_factor(date):
date_data = stock_data[stock_data['trade_date'].dt.to_period('M') == date.to_period('M')]
median_mv = date_data['total_mv'].median()
small_returns = stock_returns.loc[date, date_data[date_data['total_mv'] <= median_mv]['ts_code']]
big_returns = stock_returns.loc[date, date_data[date_data['total_mv'] > median_mv]['ts_code']]
return small_returns.mean() - big_returns.mean()
def calculate_value_factor(date):
date_data = stock_data[stock_data['trade_date'].dt.to_period('M') == date.to_period('M')]
# 创建date_data的副本并计算bm_ratio
date_data = date_data.copy()
date_data.loc[:, 'bm_ratio'] = 1 / date_data['pb']
median_bm = date_data['bm_ratio'].median()
high_returns = stock_returns.loc[date, date_data[date_data['bm_ratio'] > median_bm]['ts_code']]
low_returns = stock_returns.loc[date, date_data[date_data['bm_ratio'] <= median_bm]['ts_code']]
return high_returns.mean() - low_returns.mean()
def calculate_profitability_factor(date):
date_data = fina_data[fina_data['trade_date'].dt.to_period('M') == date.to_period('M')]
median_roe = date_data['roe_dt'].median()
robust_returns = stock_returns.loc[date, date_data[date_data['roe_dt'] > median_roe]['ts_code']]
weak_returns = stock_returns.loc[date, date_data[date_data['roe_dt'] <= median_roe]['ts_code']]
return robust_returns.mean() - weak_returns.mean()
def calculate_investment_factor(date):
date_data = fina_data[fina_data['trade_date'].dt.to_period('M') == date.to_period('M')]
median_growth = date_data['assets_yoy'].median()
conservative_returns = stock_returns.loc[date, date_data[date_data['assets_yoy'] <= median_growth]['ts_code']]
aggressive_returns = stock_returns.loc[date, date_data[date_data['assets_yoy'] > median_growth]['ts_code']]
return conservative_returns.mean() - aggressive_returns.mean()
# 计算每个月的因子收益
smb_factor = pd.Series([calculate_size_factor(date) for date in aligned_dates], index=aligned_dates)
hml_factor = pd.Series([calculate_value_factor(date) for date in aligned_dates], index=aligned_dates)
rmw_factor = pd.Series([calculate_profitability_factor(date) for date in aligned_dates], index=aligned_dates)
cma_factor = pd.Series([calculate_investment_factor(date) for date in aligned_dates], index=aligned_dates)
# 使用OLS回归计算每个股票的因子载荷
factor_loadings = {}
for stock in stock_returns.columns:
X = sm.add_constant(pd.concat([
market_returns - risk_free_rate,
smb_factor,
hml_factor,
rmw_factor,
cma_factor
], axis=1))
y = stock_returns[stock] - risk_free_rate
model = sm.OLS(y, X).fit()
factor_loadings[stock] = model.params[1:]
# 计算因子风险溢价
market_premium = market_returns.mean() - risk_free_rate
smb_premium = smb_factor.mean()
hml_premium = hml_factor.mean()
rmw_premium = rmw_factor.mean()
cma_premium = cma_factor.mean()
# 使用FF5模型计算预期收益率
expected_returns = pd.Series({
stock: (risk_free_rate +
loadings.iloc[0] * market_premium +
loadings.iloc[1] * smb_premium +
loadings.iloc[2] * hml_premium +
loadings.iloc[3] * rmw_premium +
loadings.iloc[4] * cma_premium)
for stock, loadings in factor_loadings.items()
})
return expected_returns
# def calculate_covariance_matrix(monthly_log_returns):
# """计算收益率协方差矩阵"""
# return monthly_log_returns.cov()
def calculate_covariance_matrix(monthly_log_returns):
"""
使用ARCH模型计算协方差矩阵
参数:
monthly_log_returns: DataFrame, 月度对数收益率数据
返回:
DataFrame: 协方差矩阵
"""
n_assets = len(monthly_log_returns.columns)
volatilities = np.zeros(n_assets)
returns_array = monthly_log_returns.values
# 计算每个资产的条件波动率
for i in range(n_assets):
try:
# 对数据进行缩放(解决DataScaleWarning)
scale_factor = 100 # 将收益率转换为百分比
scaled_returns = returns_array[:, i] * scale_factor
# 使用GARCH(1,1)模型
model = arch.arch_model(
scaled_returns,
vol='Arch',
p=1,
mean='Zero',
dist='normal',
rescale=False # 禁用自动缩放
)
# 拟合模型
result = model.fit(disp='off', show_warning=False)
# 获取最新的条件波动率(修复DeprecationWarning)
forecast = result.forecast()
last_variance = forecast.variance.values[-1][0] # 明确获取标量值
# 将波动率转换回原始比例
volatilities[i] = np.sqrt(last_variance) / scale_factor
except Exception as e:
# 如果ARCH模型拟合失败,退化为使用样本标准差
volatilities[i] = np.std(returns_array[:, i])
# 计算相关系数矩阵
correlation_matrix = monthly_log_returns.corr().values
# 构建波动率矩阵
vol_matrix = np.diag(volatilities)
# 计算协方差矩阵
covariance_matrix = vol_matrix @ correlation_matrix @ vol_matrix
# 确保矩阵是对称的
covariance_matrix = (covariance_matrix + covariance_matrix.T) / 2
# 确保矩阵是半正定的
min_eigenval = np.min(np.linalg.eigvals(covariance_matrix))
if min_eigenval < 0:
covariance_matrix -= 1.2 * min_eigenval * np.eye(n_assets)
# 转换为DataFrame,保持与原始数据相同的索引和列名
cov_df = pd.DataFrame(
covariance_matrix,
index=monthly_log_returns.columns,
columns=monthly_log_returns.columns
)
return cov_df
def portfolio_performance(weights, mean_returns, cov_matrix):
"""计算投资组合的表现"""
returns = np.sum(mean_returns * weights)
std_dev = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
return returns, std_dev
def negative_sharpe_ratio(weights, mean_returns, cov_matrix, risk_free_rate):
"""计算负夏普比率"""
p_ret, p_std = portfolio_performance(weights, mean_returns, cov_matrix)
sharpe_ratio = (p_ret - risk_free_rate) / p_std
return -sharpe_ratio
def max_sharpe_ratio(mean_returns, cov_matrix, risk_free_rate):
"""计算最大夏普比率的投资组合权重"""
num_assets = len(mean_returns)
args = (mean_returns, cov_matrix, risk_free_rate)
constraints = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})
bounds = tuple((0, 1) for asset in range(num_assets))
result = minimize(negative_sharpe_ratio, num_assets*[1./num_assets], args=args,
method='SLSQP', bounds=bounds, constraints=constraints)
return result.x
def calculate_top_holdings_weights(optimal_weights, monthly_log_returns_columns, top_n):
"""计算前N大持仓的权重占比"""
result_dict = {asset: weight for asset, weight in zip(monthly_log_returns_columns, optimal_weights)}
top_n_holdings = sorted(result_dict.items(), key=lambda item: item[1], reverse=True)[:top_n]
top_n_sum = sum(value for _, value in top_n_holdings)
updated_result = {key: value / top_n_sum for key, value in top_n_holdings}
return updated_result
def main():
# 获取数据
code_list = get_industry_stocks(industry)
df = get_data(code_list, end_date, years)
# 计算每月的对数收益率
monthly_log_returns = calculate_monthly_log_returns(df)
# 使用FF5模型计算预期收益率
mean_returns = calculate_expected_returns(monthly_log_returns)
# 计算收益率协方差矩阵
cov_matrix = calculate_covariance_matrix(monthly_log_returns)
# 优化权重
optimal_weights = max_sharpe_ratio(mean_returns, cov_matrix, risk_free_rate)
# 计算前N大持仓权重
updated_result = calculate_top_holdings_weights(
optimal_weights,
monthly_log_returns.columns,
top_holdings
)
# 打印更新后的资产占比
print(f"\n{end_date}最优资产前{top_holdings}占比:")
print(updated_result)
if __name__ == "__main__":
main()
注意:tushare接口需要自行申请。
运行结果:
{'601998.SH': 0.6639486870806041, '601997.SH': 0.12331601907118481, '603323.SH': 0.07328978581111818, '002839.SZ': 0.03539439950421193, '600016.SH': 0.029211516071868344
,'600919.SH': 0.018224215392246165, '600015.SH': 0.017269471180238454, '601288.SH': 0.015789914005622727, '002936.SZ': 0.015148528476267218, '002948.SZ': 0.008407463406638267}
股票代码 | 股票占比 |
---|---|
601998.SH | 0.6639487 |
601997.SH | 0.1233160 |
603323.SH | 0.0732898 |
002839.SZ | 0.0353944 |
600016.SH | 0.0292115 |
600919.SH | 0.0182242 |
600015.SH | 0.0172695 |
601288.SH | 0.0157899 |
002936.SZ | 0.0151485 |
002948.SZ | 0.0084075 |
最大的权重分配给了601998.SH(占比约66.4%),这表明根据模型计算,这只股票可能提供了最佳的风险调整后收益或与其他股票相比具有更优的特征。
其他尝试:
- 使用日度数据替代月度数据,以获得更多的观测样本,提高ARCH模型的拟合效果
- 尝试其他波动率模型,如GARCH、EGARCH等,以捕捉不同的波动率特征
3. 反思
3.1 不足之处
- 月度数据样本量较少(5年约60个观测值),可能影响ARCH模型的拟合效果
- 单一行业(银行业)的资产相关性较高,可能导致风险分散效果不理想
- 权重过于集中(第一大持仓超过66%),存在较大的集中度风险
- ARCH模型计算复杂,可能增加组合调整的实时性要求
3.2 提升思路
-
数据优化:
- 增加历史数据长度(8-10年)
- 扩大资产池范围,增加跨行业选择
-
模型优化:
- 引入交易成本约束
- 添加权重上限约束
4. 启后
-
优化,尝试其他波动率模型,GARCH:,可参考下一篇文章:
8. 马科维茨资产组合模型+FF5+ARCH风险模型优化方案(理论+Python实战) -
量化回测实现,可参考下一篇文章:
pass