backtrader/money_flow_analyzer.py

250 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import traceback
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from config_manager import get_config_manager
from database_manager import DatabaseManager
from logger_manager import get_logger
class MoneyflowAnalyzer:
"""
市场资金流分析器分析各类资金流向指标对行业在随后1-N天表现的影响
"""
def __init__(self):
self.db_manager = DatabaseManager()
self.config = get_config_manager()
self.logger = get_logger()
def main_flow_analyze(self, days_forward=10, use_consistent_samples=False, max_days=10):
"""
分析各类资金流向指标对行业在随后1-N天表现的影响
参数:
days_forward (int): 要分析的未来天数默认为10天
"""
self.logger.info(f"开始分析资金流数据,未来{days_forward}天的表现...")
self.logger.info(f"使用一致样本集: {use_consistent_samples}")
# 读取资金流数据
try:
df = self.db_manager.load_df_from_db('moneyflow_ind_dc')
self.logger.info(f"成功从数据库加载资金流数据,共计{len(df)}条记录")
except Exception as e:
self.logger.error(f"从数据库读取数据失败: {e}")
self.logger.debug(f"完整的错误追踪信息:\n{traceback.format_exc()}")
return False
# 检查是否成功获取数据
if df.empty:
self.logger.info("获取的资金流数据为空,无法进行分析")
return False
# 数据预处理 - 计算主力资金
df['main_force_amount'] = df['buy_elg_amount'] + df['buy_lg_amount']
# 将日期格式转换为datetime
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d', errors='coerce')
df = df[~df['trade_date'].isna()]
# 按日期排序
df = df.sort_values('trade_date')
# 获取所有交易日期
all_dates = df['trade_date'].unique()
# 确定样本范围
if use_consistent_samples:
# 如果max_days大于days_forward则预测天数使用max_days
if max_days > days_forward:
days_forward = max_days
self.logger.info(f"max_days大于days_forward使用max_days进行分析")
# 使用固定的最大预测天数来确保所有分析使用相同的样本集
sample_limit = len(all_dates) - max_days
self.logger.info(f"使用统一样本集进行分析,样本量: {sample_limit}")
else:
# 使用当前分析所需的预测天数
sample_limit = len(all_dates) - days_forward
self.logger.info(f"使用动态样本集进行分析,样本量: {sample_limit}")
# 定义要分析的资金流指标
# 格式: (指标名, 排序方向, 关联性)
# 关联性: 正相关=1, 负相关=-1 (用于确定是取最高还是最低)
flow_indicators = [
('main_force_amount', 1, '主力净额')
]
# 确保结果目录存在
os.makedirs('result', exist_ok=True)
# 为每个指标进行分析
for indicator, correlation, indicator_name in flow_indicators:
self.logger.info(f"分析 {indicator_name} 与未来指数关系...")
# 创建结果数据结构
results = []
# 遍历每个交易日期(固定样本集大小)
for i in range(sample_limit):
current_date = all_dates[i]
# 获取当前日期的数据
current_day_data = df[df['trade_date'] == current_date]
# 找出该指标排名靠前的行业
if correlation > 0:
# 正相关,找最高值
top_sectors = current_day_data.sort_values(indicator, ascending=False).head(1)['name'].tolist()
else:
# 负相关,找最低值
top_sectors = current_day_data.sort_values(indicator, ascending=True).head(1)['name'].tolist()
# 分析每个行业在随后1-N天的表现
for sector in top_sectors:
# 获取该行业当天的指数变化和指标值
sector_current = current_day_data[current_day_data['name'] == sector]
if sector_current.empty:
continue
current_pct_change = sector_current['pct_change'].values[0]
current_indicator_value = sector_current[indicator].values[0]
# 分析随后1-N天的表现
future_changes = []
for day_offset in range(1, days_forward + 1):
if i + day_offset < len(all_dates):
future_date = all_dates[i + day_offset]
future_data = df[(df['trade_date'] == future_date) & (df['name'] == sector)]
if not future_data.empty:
future_changes.append(future_data['pct_change'].values[0])
else:
future_changes.append(None)
else:
future_changes.append(None)
# 如果至少有一个未来日期有数据
if any(x is not None for x in future_changes):
result_entry = {
'date': current_date.strftime('%Y%m%d'), # 将日期格式化为YYYYMMDD字符串
'sector': sector,
f'{indicator}': current_indicator_value,
'current_pct_change': current_pct_change,
}
# 添加1-N天的变化
for day in range(1, days_forward + 1):
result_entry[f'day{day}_change'] = future_changes[day - 1]
# 计算平均变化
result_entry[f'avg_{days_forward}day_change'] = np.nanmean(
[x for x in future_changes if x is not None])
results.append(result_entry)
# 转换为DataFrame
results_df = pd.DataFrame(results)
if results_df.empty:
self.logger.info(f"没有足够的数据来分析{indicator_name}与后续表现的关系")
continue
# 保存结果
os.makedirs('result', exist_ok=True)
output_file = f'result/{indicator}_performance_{days_forward}days.xlsx'
results_df.to_excel(output_file, index=False)
self.logger.info(f"{indicator_name}表现分析已保存至{output_file}")
# 分析整体表现
avg_performance = {}
success_rates = {}
for day in range(1, days_forward + 1):
# 计算每一天的变化均值
avg_performance[f'day{day}'] = results_df[f'day{day}_change'].mean()
# 计算每一天的胜率(正收益的概率)
success_rates[f'day{day}'] = (results_df[f'day{day}_change'] > 0).mean() * 100
avg_performance[f'avg_{days_forward}day'] = results_df[f'avg_{days_forward}day_change'].mean()
# 输出每天的表现数据
self.logger.info(f"{indicator_name}极值行业在未来{days_forward}天的表现分析:")
self.logger.info("日期\t\t平均收益率\t\t胜率")
for day in range(1, days_forward + 1):
self.logger.info(f"T+{day}\t\t{avg_performance[f'day{day}']:.5f}%\t\t{success_rates[f'day{day}']:.2f}%")
self.logger.info(f"{days_forward}天平均: {avg_performance[f'avg_{days_forward}day']:.5f}%")
# 绘制折线图显示未来N天的平均表现和胜率
plt.figure(figsize=(14, 8))
# 设置中文字体
try:
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei']
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
days_labels = ['当天'] + [f'{i}' for i in range(1, days_forward + 1)]
direction = "最高" if correlation > 0 else "最低"
title = f'{indicator_name}{direction}后的平均表现 ({days_forward}天)'
xlabel = '时间'
ylabel = '指数变化率 (%)'
except:
# 如果没有中文字体,使用英文
days_labels = ['Current'] + [f'Day+{i}' for i in range(1, days_forward + 1)]
direction = "Highest" if correlation > 0 else "Lowest"
title = f'Average Performance After {indicator_name} {direction} ({days_forward} Days)'
xlabel = 'Time'
ylabel = 'Index Change Rate (%)'
# 创建双y轴图表
fig, ax1 = plt.subplots(figsize=(14, 8))
# 设置收益率曲线 (左y轴)
values = [results_df['current_pct_change'].mean()] + [avg_performance[f'day{i}'] for i in
range(1, days_forward + 1)]
ax1.plot(days_labels, values, marker='o', linewidth=2, color='blue', label='平均收益率')
ax1.axhline(y=0, color='r', linestyle='--')
ax1.set_xlabel(xlabel)
ax1.set_ylabel('收益率 (%)', color='blue')
ax1.tick_params(axis='y', labelcolor='blue')
# 设置胜率曲线 (右y轴)
ax2 = ax1.twinx()
win_rates = [50] # 当天默认为50%
for i in range(1, days_forward + 1):
win_rates.append(success_rates[f'day{i}'])
ax2.plot(days_labels, win_rates, marker='s', linewidth=2, color='green', label='胜率')
ax2.set_ylabel('胜率 (%)', color='green')
ax2.tick_params(axis='y', labelcolor='green')
ax2.axhline(y=50, color='green', linestyle='--', alpha=0.5)
# 添加图例
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc='best')
plt.title(title, fontsize=14)
plt.grid(True)
plt.xticks(rotation=45) # 旋转x轴标签以避免重叠
# 保存图表
output_image = f'result/{indicator}_performance_{days_forward}days.png'
plt.savefig(output_image, dpi=300, bbox_inches='tight')
self.logger.info(f"{indicator_name}表现图表已保存至{output_image}")
# 额外保存胜率图表
plt.figure(figsize=(14, 8))
plt.bar(range(1, days_forward + 1), [success_rates[f'day{i}'] for i in range(1, days_forward + 1)],
color='green', alpha=0.7)
plt.axhline(y=50, color='r', linestyle='--')
plt.title(f'{indicator_name}{direction}后的未来胜率分布 ({days_forward}天)', fontsize=14)
plt.xlabel('未来天数')
plt.ylabel('胜率 (%)')
plt.xticks(range(1, days_forward + 1), [f'T+{i}' for i in range(1, days_forward + 1)])
plt.grid(True, axis='y')
# 在柱状图上标注具体数值
for i in range(1, days_forward + 1):
plt.text(i, success_rates[f'day{i}'] + 1, f"{success_rates[f'day{i}']:.1f}%",
ha='center', va='bottom', fontsize=9)
win_rate_image = f'result/{indicator}_win_rate_{days_forward}days.png'
plt.savefig(win_rate_image, dpi=300, bbox_inches='tight')
self.logger.info(f"{indicator_name}胜率分布图已保存至{win_rate_image}")
return True