2025-04-19 15:30:55 +08:00
|
|
|
|
import os
|
|
|
|
|
import traceback
|
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
import numpy as np
|
|
|
|
|
import pandas as pd
|
|
|
|
|
from config_manager import get_config_manager
|
|
|
|
|
from database_manager import DatabaseManager
|
2025-04-19 19:31:58 +08:00
|
|
|
|
from logger_manager import get_logger
|
2025-04-19 15:30:55 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MoneyflowAnalyzer:
|
|
|
|
|
"""
|
|
|
|
|
市场资金流分析器,分析各类资金流向指标对行业在随后1-N天表现的影响
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.db_manager = DatabaseManager()
|
|
|
|
|
self.config = get_config_manager()
|
|
|
|
|
self.logger = get_logger()
|
|
|
|
|
|
2025-04-19 15:34:36 +08:00
|
|
|
|
def main_flow_analyze(self, days_forward=10, use_consistent_samples=False, max_days=10):
|
2025-04-19 15:30:55 +08:00
|
|
|
|
"""
|
|
|
|
|
分析各类资金流向指标对行业在随后1-N天表现的影响
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
days_forward (int): 要分析的未来天数,默认为10天
|
|
|
|
|
"""
|
|
|
|
|
self.logger.info(f"开始分析资金流数据,未来{days_forward}天的表现...")
|
|
|
|
|
self.logger.info(f"使用一致样本集: {use_consistent_samples}")
|
|
|
|
|
|
|
|
|
|
# 读取资金流数据
|
|
|
|
|
try:
|
|
|
|
|
df = self.db_manager.load_df_from_db('moneyflow_ind_dc')
|
|
|
|
|
self.logger.info(f"成功从数据库加载资金流数据,共计{len(df)}条记录")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.logger.error(f"从数据库读取数据失败: {e}")
|
|
|
|
|
self.logger.debug(f"完整的错误追踪信息:\n{traceback.format_exc()}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# 检查是否成功获取数据
|
|
|
|
|
if df.empty:
|
|
|
|
|
self.logger.info("获取的资金流数据为空,无法进行分析")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# 数据预处理 - 计算主力资金
|
|
|
|
|
df['main_force_amount'] = df['buy_elg_amount'] + df['buy_lg_amount']
|
|
|
|
|
|
|
|
|
|
# 将日期格式转换为datetime
|
|
|
|
|
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d', errors='coerce')
|
|
|
|
|
df = df[~df['trade_date'].isna()]
|
|
|
|
|
|
|
|
|
|
# 按日期排序
|
|
|
|
|
df = df.sort_values('trade_date')
|
|
|
|
|
|
|
|
|
|
# 获取所有交易日期
|
|
|
|
|
all_dates = df['trade_date'].unique()
|
|
|
|
|
|
|
|
|
|
# 确定样本范围
|
|
|
|
|
if use_consistent_samples:
|
2025-04-19 15:34:36 +08:00
|
|
|
|
# 如果max_days大于days_forward,则预测天数使用max_days
|
|
|
|
|
if max_days > days_forward:
|
|
|
|
|
days_forward = max_days
|
|
|
|
|
self.logger.info(f"max_days大于days_forward,使用max_days进行分析")
|
2025-04-19 15:30:55 +08:00
|
|
|
|
# 使用固定的最大预测天数来确保所有分析使用相同的样本集
|
|
|
|
|
sample_limit = len(all_dates) - max_days
|
|
|
|
|
self.logger.info(f"使用统一样本集进行分析,样本量: {sample_limit}")
|
|
|
|
|
else:
|
|
|
|
|
# 使用当前分析所需的预测天数
|
|
|
|
|
sample_limit = len(all_dates) - days_forward
|
|
|
|
|
self.logger.info(f"使用动态样本集进行分析,样本量: {sample_limit}")
|
|
|
|
|
|
|
|
|
|
# 定义要分析的资金流指标
|
|
|
|
|
# 格式: (指标名, 排序方向, 关联性)
|
|
|
|
|
# 关联性: 正相关=1, 负相关=-1 (用于确定是取最高还是最低)
|
|
|
|
|
flow_indicators = [
|
|
|
|
|
('main_force_amount', 1, '主力净额')
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# 确保结果目录存在
|
|
|
|
|
os.makedirs('result', exist_ok=True)
|
|
|
|
|
|
|
|
|
|
# 为每个指标进行分析
|
|
|
|
|
for indicator, correlation, indicator_name in flow_indicators:
|
|
|
|
|
self.logger.info(f"分析 {indicator_name} 与未来指数关系...")
|
|
|
|
|
|
|
|
|
|
# 创建结果数据结构
|
|
|
|
|
results = []
|
|
|
|
|
|
|
|
|
|
# 遍历每个交易日期(固定样本集大小)
|
|
|
|
|
for i in range(sample_limit):
|
|
|
|
|
current_date = all_dates[i]
|
|
|
|
|
|
|
|
|
|
# 获取当前日期的数据
|
|
|
|
|
current_day_data = df[df['trade_date'] == current_date]
|
|
|
|
|
|
|
|
|
|
# 找出该指标排名靠前的行业
|
|
|
|
|
if correlation > 0:
|
|
|
|
|
# 正相关,找最高值
|
|
|
|
|
top_sectors = current_day_data.sort_values(indicator, ascending=False).head(1)['name'].tolist()
|
|
|
|
|
else:
|
|
|
|
|
# 负相关,找最低值
|
|
|
|
|
top_sectors = current_day_data.sort_values(indicator, ascending=True).head(1)['name'].tolist()
|
|
|
|
|
|
|
|
|
|
# 分析每个行业在随后1-N天的表现
|
|
|
|
|
for sector in top_sectors:
|
|
|
|
|
# 获取该行业当天的指数变化和指标值
|
|
|
|
|
sector_current = current_day_data[current_day_data['name'] == sector]
|
|
|
|
|
if sector_current.empty:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
current_pct_change = sector_current['pct_change'].values[0]
|
|
|
|
|
current_indicator_value = sector_current[indicator].values[0]
|
|
|
|
|
|
|
|
|
|
# 分析随后1-N天的表现
|
|
|
|
|
future_changes = []
|
|
|
|
|
for day_offset in range(1, days_forward + 1):
|
|
|
|
|
if i + day_offset < len(all_dates):
|
|
|
|
|
future_date = all_dates[i + day_offset]
|
|
|
|
|
future_data = df[(df['trade_date'] == future_date) & (df['name'] == sector)]
|
|
|
|
|
if not future_data.empty:
|
|
|
|
|
future_changes.append(future_data['pct_change'].values[0])
|
|
|
|
|
else:
|
|
|
|
|
future_changes.append(None)
|
|
|
|
|
else:
|
|
|
|
|
future_changes.append(None)
|
|
|
|
|
|
|
|
|
|
# 如果至少有一个未来日期有数据
|
|
|
|
|
if any(x is not None for x in future_changes):
|
|
|
|
|
result_entry = {
|
|
|
|
|
'date': current_date.strftime('%Y%m%d'), # 将日期格式化为YYYYMMDD字符串
|
|
|
|
|
'sector': sector,
|
|
|
|
|
f'{indicator}': current_indicator_value,
|
|
|
|
|
'current_pct_change': current_pct_change,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# 添加1-N天的变化
|
|
|
|
|
for day in range(1, days_forward + 1):
|
|
|
|
|
result_entry[f'day{day}_change'] = future_changes[day - 1]
|
|
|
|
|
|
|
|
|
|
# 计算平均变化
|
|
|
|
|
result_entry[f'avg_{days_forward}day_change'] = np.nanmean(
|
|
|
|
|
[x for x in future_changes if x is not None])
|
|
|
|
|
results.append(result_entry)
|
|
|
|
|
|
|
|
|
|
# 转换为DataFrame
|
|
|
|
|
results_df = pd.DataFrame(results)
|
|
|
|
|
if results_df.empty:
|
|
|
|
|
self.logger.info(f"没有足够的数据来分析{indicator_name}与后续表现的关系")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 保存结果
|
|
|
|
|
os.makedirs('result', exist_ok=True)
|
|
|
|
|
output_file = f'result/{indicator}_performance_{days_forward}days.xlsx'
|
|
|
|
|
results_df.to_excel(output_file, index=False)
|
|
|
|
|
self.logger.info(f"{indicator_name}表现分析已保存至{output_file}")
|
|
|
|
|
|
|
|
|
|
# 分析整体表现
|
|
|
|
|
avg_performance = {}
|
|
|
|
|
success_rates = {}
|
|
|
|
|
for day in range(1, days_forward + 1):
|
|
|
|
|
# 计算每一天的变化均值
|
|
|
|
|
avg_performance[f'day{day}'] = results_df[f'day{day}_change'].mean()
|
|
|
|
|
# 计算每一天的胜率(正收益的概率)
|
|
|
|
|
success_rates[f'day{day}'] = (results_df[f'day{day}_change'] > 0).mean() * 100
|
|
|
|
|
|
|
|
|
|
avg_performance[f'avg_{days_forward}day'] = results_df[f'avg_{days_forward}day_change'].mean()
|
|
|
|
|
|
|
|
|
|
# 输出每天的表现数据
|
|
|
|
|
self.logger.info(f"{indicator_name}极值行业在未来{days_forward}天的表现分析:")
|
|
|
|
|
self.logger.info("日期\t\t平均收益率\t\t胜率")
|
|
|
|
|
for day in range(1, days_forward + 1):
|
|
|
|
|
self.logger.info(f"T+{day}\t\t{avg_performance[f'day{day}']:.5f}%\t\t{success_rates[f'day{day}']:.2f}%")
|
|
|
|
|
self.logger.info(f"{days_forward}天平均: {avg_performance[f'avg_{days_forward}day']:.5f}%")
|
|
|
|
|
|
|
|
|
|
# 绘制折线图显示未来N天的平均表现和胜率
|
|
|
|
|
plt.figure(figsize=(14, 8))
|
|
|
|
|
# 设置中文字体
|
|
|
|
|
try:
|
|
|
|
|
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei']
|
|
|
|
|
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
|
|
|
|
|
days_labels = ['当天'] + [f'第{i}天' for i in range(1, days_forward + 1)]
|
|
|
|
|
direction = "最高" if correlation > 0 else "最低"
|
|
|
|
|
title = f'{indicator_name}{direction}后的平均表现 ({days_forward}天)'
|
|
|
|
|
xlabel = '时间'
|
|
|
|
|
ylabel = '指数变化率 (%)'
|
|
|
|
|
except:
|
|
|
|
|
# 如果没有中文字体,使用英文
|
|
|
|
|
days_labels = ['Current'] + [f'Day+{i}' for i in range(1, days_forward + 1)]
|
|
|
|
|
direction = "Highest" if correlation > 0 else "Lowest"
|
|
|
|
|
title = f'Average Performance After {indicator_name} {direction} ({days_forward} Days)'
|
|
|
|
|
xlabel = 'Time'
|
|
|
|
|
ylabel = 'Index Change Rate (%)'
|
|
|
|
|
|
|
|
|
|
# 创建双y轴图表
|
|
|
|
|
fig, ax1 = plt.subplots(figsize=(14, 8))
|
|
|
|
|
|
|
|
|
|
# 设置收益率曲线 (左y轴)
|
|
|
|
|
values = [results_df['current_pct_change'].mean()] + [avg_performance[f'day{i}'] for i in
|
|
|
|
|
range(1, days_forward + 1)]
|
|
|
|
|
ax1.plot(days_labels, values, marker='o', linewidth=2, color='blue', label='平均收益率')
|
|
|
|
|
ax1.axhline(y=0, color='r', linestyle='--')
|
|
|
|
|
ax1.set_xlabel(xlabel)
|
|
|
|
|
ax1.set_ylabel('收益率 (%)', color='blue')
|
|
|
|
|
ax1.tick_params(axis='y', labelcolor='blue')
|
|
|
|
|
|
|
|
|
|
# 设置胜率曲线 (右y轴)
|
|
|
|
|
ax2 = ax1.twinx()
|
|
|
|
|
win_rates = [50] # 当天默认为50%
|
|
|
|
|
for i in range(1, days_forward + 1):
|
|
|
|
|
win_rates.append(success_rates[f'day{i}'])
|
|
|
|
|
ax2.plot(days_labels, win_rates, marker='s', linewidth=2, color='green', label='胜率')
|
|
|
|
|
ax2.set_ylabel('胜率 (%)', color='green')
|
|
|
|
|
ax2.tick_params(axis='y', labelcolor='green')
|
|
|
|
|
ax2.axhline(y=50, color='green', linestyle='--', alpha=0.5)
|
|
|
|
|
|
|
|
|
|
# 添加图例
|
|
|
|
|
lines1, labels1 = ax1.get_legend_handles_labels()
|
|
|
|
|
lines2, labels2 = ax2.get_legend_handles_labels()
|
|
|
|
|
ax1.legend(lines1 + lines2, labels1 + labels2, loc='best')
|
|
|
|
|
|
|
|
|
|
plt.title(title, fontsize=14)
|
|
|
|
|
plt.grid(True)
|
|
|
|
|
plt.xticks(rotation=45) # 旋转x轴标签以避免重叠
|
|
|
|
|
|
|
|
|
|
# 保存图表
|
|
|
|
|
output_image = f'result/{indicator}_performance_{days_forward}days.png'
|
|
|
|
|
plt.savefig(output_image, dpi=300, bbox_inches='tight')
|
|
|
|
|
self.logger.info(f"{indicator_name}表现图表已保存至{output_image}")
|
|
|
|
|
|
|
|
|
|
# 额外保存胜率图表
|
|
|
|
|
plt.figure(figsize=(14, 8))
|
|
|
|
|
plt.bar(range(1, days_forward + 1), [success_rates[f'day{i}'] for i in range(1, days_forward + 1)],
|
|
|
|
|
color='green', alpha=0.7)
|
|
|
|
|
plt.axhline(y=50, color='r', linestyle='--')
|
|
|
|
|
plt.title(f'{indicator_name}{direction}后的未来胜率分布 ({days_forward}天)', fontsize=14)
|
|
|
|
|
plt.xlabel('未来天数')
|
|
|
|
|
plt.ylabel('胜率 (%)')
|
|
|
|
|
plt.xticks(range(1, days_forward + 1), [f'T+{i}' for i in range(1, days_forward + 1)])
|
|
|
|
|
plt.grid(True, axis='y')
|
|
|
|
|
|
|
|
|
|
# 在柱状图上标注具体数值
|
|
|
|
|
for i in range(1, days_forward + 1):
|
|
|
|
|
plt.text(i, success_rates[f'day{i}'] + 1, f"{success_rates[f'day{i}']:.1f}%",
|
|
|
|
|
ha='center', va='bottom', fontsize=9)
|
|
|
|
|
|
|
|
|
|
win_rate_image = f'result/{indicator}_win_rate_{days_forward}days.png'
|
|
|
|
|
plt.savefig(win_rate_image, dpi=300, bbox_inches='tight')
|
|
|
|
|
self.logger.info(f"{indicator_name}胜率分布图已保存至{win_rate_image}")
|
|
|
|
|
|
|
|
|
|
return True
|