backtrader/main_force_strategy.py
Qihang Zhang b652322061 🗑️ remove(get_kpl_list.py): 删除涨停板数据获取旧脚本
 feat(main_force_strategy.py): 新增主力资金流向策略分析功能

🔧 refactor(utils.py): 增加数据库操作支持和辅助函数
2025-04-19 01:41:15 +08:00

313 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm import tqdm
from utils import load_config, get_trade_cal
from utils import save_df_to_db, load_df_from_db, get_existing_trade_dates
# 加载配置并初始化tushare
config = load_config()
import tushare as ts
import seaborn as sns
ts.set_token(config['tushare_token'])
pro = ts.pro_api()
def get_sector_moneyflow_data(start_date=None, end_date=None):
"""
获取指定时间段内的板块资金流向数据,使用数据库缓存
参数:
start_date (str): 开始日期,格式'YYYYMMDD'
end_date (str): 结束日期,格式'YYYYMMDD'
返回:
pandas.DataFrame: 所有板块资金流向数据
"""
# 获取目标交易日历
all_trade_dates = get_trade_cal(start_date, end_date)
# 从数据库获取已有的交易日期
existing_dates = get_existing_trade_dates('sector_fund_flow')
# 筛选出需要新获取的日期
new_dates = [date for date in all_trade_dates if date not in existing_dates]
if not new_dates:
print("所有数据已在数据库中,无需更新")
return load_df_from_db('sector_fund_flow')
print(f"需要获取 {len(new_dates)} 个新交易日的数据")
# 获取新日期的数据
all_new_data = []
# 使用tqdm显示进度
for trade_date in tqdm(new_dates):
try:
# 从tushare获取当日板块资金流向数据
df = pro.moneyflow_ind_dc(trade_date=trade_date)
# 如果有数据,添加到列表
if not df.empty:
# 计算主力资金 = 超大单买入 + 大单买入
df['main_force_amount'] = df['buy_elg_amount'] + df['buy_lg_amount']
all_new_data.append(df)
else:
print(f"日期 {trade_date} 无数据")
except Exception as e:
print(f"获取 {trade_date} 的数据时出错: {e}")
# 如果有新数据,合并并保存到数据库
if all_new_data:
# 将所有新数据合并为一个DataFrame
new_df = pd.concat(all_new_data, ignore_index=True)
# 保存到数据库
save_df_to_db(new_df, table_name='sector_fund_flow', if_exists='append')
print(f"已将 {len(new_df)} 条新记录保存到数据库")
else:
print("未获取到任何新数据")
return load_df_from_db('sector_fund_flow')
def analyze_money_flow():
"""
分析各类资金流向指标对行业在随后1-10天表现的影响
包括期望收益分析和特定交易策略验证
"""
# 读取资金流数据
try:
df = load_df_from_db('sector_fund_flow')
print(f"成功从数据库加载资金流数据,共计{len(df)}条记录")
except Exception as e:
print(f"从数据库读取数据失败:{e}")
return
# 将日期格式转换为datetime - 如果存储在数据库中的是字符串格式
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d', errors='coerce')
df = df[~df['trade_date'].isna()]
# 按日期排序
df = df.sort_values('trade_date')
# 获取所有交易日期
all_dates = df['trade_date'].unique()
# 定义要分析的资金流指标
# 格式: (指标名, 排序方向, 关联性)
# 关联性: 正相关=1, 负相关=-1 (用于确定是取最高还是最低)
flow_indicators = [
('main_force_amount', 1, '主力净额')
]
# 确保结果目录存在
os.makedirs('result', exist_ok=True)
# 为每个指标进行分析
for indicator, correlation, indicator_name in flow_indicators:
print(f"\n\n分析 {indicator_name} 与未来指数关系...")
# 创建结果数据结构
results = []
# 遍历每个交易日期除了最后10天
for i in range(len(all_dates) - 10):
current_date = all_dates[i]
# 获取当前日期的数据
current_day_data = df[df['trade_date'] == current_date]
# 确定排序方向和选择逻辑
sort_ascending = correlation < 0 # 负相关时升序(最小值), 正相关时降序(最大值)
# 找出该指标排名靠前的行业
if correlation > 0:
# 正相关,找最高值
top_sectors = current_day_data.sort_values(indicator, ascending=False).head(1)['name'].tolist()
else:
# 负相关,找最低值
top_sectors = current_day_data.sort_values(indicator, ascending=True).head(1)['name'].tolist()
# 分析每个行业在随后1-10天的表现
for sector in top_sectors:
# 获取该行业当天的指数变化和指标值
sector_current = current_day_data[current_day_data['name'] == sector]
if sector_current.empty:
continue
current_pct_change = sector_current['pct_change'].values[0]
current_indicator_value = sector_current[indicator].values[0]
# 分析随后1-10天的表现
future_changes = []
for day_offset in range(1, 11):
if i + day_offset < len(all_dates):
future_date = all_dates[i + day_offset]
future_data = df[(df['trade_date'] == future_date) & (df['name'] == sector)]
if not future_data.empty:
future_changes.append(future_data['pct_change'].values[0])
else:
future_changes.append(None)
else:
future_changes.append(None)
# 如果至少有一个未来日期有数据
if any(x is not None for x in future_changes):
result_entry = {
'date': current_date.strftime('%Y%m%d'), # 将日期格式化为YYYYMMDD字符串
'sector': sector,
f'{indicator}': current_indicator_value,
'current_pct_change': current_pct_change,
}
# 添加1-10天的变化
for day in range(1, 11):
result_entry[f'day{day}_change'] = future_changes[day - 1]
# 计算平均变化
result_entry['avg_10day_change'] = np.nanmean([x for x in future_changes if x is not None])
results.append(result_entry)
# 转换为DataFrame
results_df = pd.DataFrame(results)
if results_df.empty:
print(f"没有足够的数据来分析{indicator_name}与后续表现的关系")
continue
# 保存结果
output_file = f'result/{indicator}_performance.xlsx'
results_df.to_excel(output_file, index=False)
print(f"{indicator_name}表现分析已保存至{output_file}")
# 分析整体表现
avg_performance = {}
for day in range(1, 11):
avg_performance[f'day{day}'] = results_df[f'day{day}_change'].mean()
avg_performance['avg_10day'] = results_df['avg_10day_change'].mean()
print(f"\n{indicator_name}极值行业的平均表现:")
for day, perf in avg_performance.items():
print(f"{day}: {perf:.4f}%")
# 分析期望值(正指数变化的百分比)
success_rates = {}
for day in range(1, 11):
success_rates[f'day{day}'] = (results_df[f'day{day}_change'] > 0).mean() * 100
print(f"\n{indicator_name}极值后上涨的概率:")
for day, rate in success_rates.items():
print(f"{day}: {rate:.2f}%")
# ------------------ 验证特定交易策略 ------------------
print("\n交易策略验证:")
# T+1买入T+2卖出
day1_to_day2_change = results_df['day2_change'] - results_df['day1_change']
avg_change_1_to_2 = day1_to_day2_change.mean()
win_rate_1_to_2 = (day1_to_day2_change > 0).mean() * 100
print(f"策略A - T+1(第1日)买入T+2(第2日)卖出的平均收益: {avg_change_1_to_2:.4f}%")
print(f"策略A - T+1(第1日)买入T+2(第2日)卖出的盈利概率: {win_rate_1_to_2:.2f}%")
# T+4买入T+8卖出
day4_to_day8_change = results_df['day8_change'] + results_df['day7_change'] + results_df['day6_change'] + \
results_df['day5_change'] - results_df['day4_change']
avg_change_3_to_8 = day4_to_day8_change.mean()
win_rate_3_to_8 = (day4_to_day8_change > 0).mean() * 100
print(f"策略B - T+4(第4日)买入T+8(第8日)卖出的平均收益: {avg_change_3_to_8:.4f}%")
print(f"策略B - T+4(第4日)买入T+8(第8日)卖出的盈利概率: {win_rate_3_to_8:.2f}%")
# 分析策略组合效果
# 模拟完整策略T+1买入T+2卖出T+3买入T+8卖出
combined_change = day1_to_day2_change + day4_to_day8_change
avg_combined_change = combined_change.mean()
win_rate_combined = (combined_change > 0).mean() * 100
print(f"组合策略 - 完整策略组合的平均总收益: {avg_combined_change:.4f}%")
print(f"组合策略 - 完整策略至少盈利的概率: {win_rate_combined:.2f}%")
# 绘制策略示意图
plt.figure(figsize=(14, 8))
try:
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei']
title = f'{indicator_name}极值后交易策略示意图'
xlabel = '交易日'
ylabel = '指数变化率 (%)'
strategy_names = ['策略A: T+1买入,T+2卖出', '策略B: T+4买入,T+8卖出']
except:
title = f'Trading Strategy after {indicator_name} Extreme Value'
xlabel = 'Trading Day'
ylabel = 'Index Change Rate (%)'
strategy_names = ['Strategy A: Buy T+1, Sell T+2', 'Strategy B: Buy T+3, Sell T+8']
days = range(11) # 0-10天
values = [results_df['current_pct_change'].mean()] + [avg_performance[f'day{i}'] for i in range(1, 11)]
plt.plot(days, values, marker='o', color='blue', linewidth=2, label='平均表现')
# 标记策略A: T+1买入T+2卖出
plt.plot([1, 2], [values[1], values[2]], color='green', linewidth=4, alpha=0.7, label=strategy_names[0])
plt.scatter([1, 2], [values[1], values[2]], color='green', s=100)
# 标记策略B: T+4买入T+8卖出
plt.plot([3, 8], [values[3], values[8]], color='red', linewidth=4, alpha=0.7, label=strategy_names[1])
plt.scatter([3, 8], [values[3], values[8]], color='red', s=100)
plt.axhline(y=0, color='gray', linestyle='--')
plt.title(title, fontsize=14)
plt.ylabel(ylabel)
plt.xlabel(xlabel)
plt.xticks(days, ['T'] + [f'T+{i}' for i in range(1, 11)])
plt.grid(True)
plt.legend()
# 保存策略图表
strategy_image = f'result/{indicator}_strategy.png'
plt.savefig(strategy_image, dpi=300, bbox_inches='tight')
print(f"交易策略示意图已保存至{strategy_image}")
# 绘制折线图显示未来10天的平均表现
plt.figure(figsize=(14, 8))
# 设置中文字体
try:
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei']
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
days = ['当天'] + [f'{i}' for i in range(1, 11)]
direction = "最高" if correlation > 0 else "最低"
title = f'{indicator_name}{direction}后的平均表现 (10天)'
xlabel = '时间'
ylabel = '指数变化率 (%)'
except:
# 如果没有中文字体,使用英文
days = ['Current'] + [f'Day+{i}' for i in range(1, 11)]
direction = "Highest" if correlation > 0 else "Lowest"
title = f'Average Performance After {indicator_name} {direction} (10 Days)'
xlabel = 'Time'
ylabel = 'Index Change Rate (%)'
values = [results_df['current_pct_change'].mean()] + [avg_performance[f'day{i}'] for i in range(1, 11)]
plt.plot(days, values, marker='o', linewidth=2)
plt.axhline(y=0, color='r', linestyle='--')
plt.title(title, fontsize=14)
plt.ylabel(ylabel)
plt.xlabel(xlabel)
plt.grid(True)
plt.xticks(rotation=45) # 旋转x轴标签以避免重叠
# 保存图表
output_image = f'result/{indicator}_performance.png'
plt.savefig(output_image, dpi=300, bbox_inches='tight') # 添加bbox_inches参数确保所有标签都显示
print(f"{indicator_name}表现图表已保存至{output_image}")
return True
if __name__ == "__main__":
# 指定日期范围
start_date = '20230912'
end_date = None
# 获取板块资金流向数据
get_sector_moneyflow_data(start_date, end_date)
analyze_money_flow()