import os import traceback import matplotlib.pyplot as plt import numpy as np import pandas as pd from config_manager import get_config_manager from database_manager import DatabaseManager from logger_manager import get_logger class MoneyflowAnalyzer: """ 市场资金流分析器,分析各类资金流向指标对行业在随后1-N天表现的影响 """ def __init__(self): self.db_manager = DatabaseManager() self.config = get_config_manager() self.logger = get_logger() def main_flow_analyze(self, days_forward=10, use_consistent_samples=False, max_days=10): """ 分析各类资金流向指标对行业在随后1-N天表现的影响 参数: days_forward (int): 要分析的未来天数,默认为10天 """ self.logger.info(f"开始分析资金流数据,未来{days_forward}天的表现...") self.logger.info(f"使用一致样本集: {use_consistent_samples}") # 读取资金流数据 try: df = self.db_manager.load_df_from_db('moneyflow_ind_dc') self.logger.info(f"成功从数据库加载资金流数据,共计{len(df)}条记录") except Exception as e: self.logger.error(f"从数据库读取数据失败: {e}") self.logger.debug(f"完整的错误追踪信息:\n{traceback.format_exc()}") return False # 检查是否成功获取数据 if df.empty: self.logger.info("获取的资金流数据为空,无法进行分析") return False # 数据预处理 - 计算主力资金 df['main_force_amount'] = df['buy_elg_amount'] + df['buy_lg_amount'] # 将日期格式转换为datetime df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d', errors='coerce') df = df[~df['trade_date'].isna()] # 按日期排序 df = df.sort_values('trade_date') # 获取所有交易日期 all_dates = df['trade_date'].unique() # 确定样本范围 if use_consistent_samples: # 如果max_days大于days_forward,则预测天数使用max_days if max_days > days_forward: days_forward = max_days self.logger.info(f"max_days大于days_forward,使用max_days进行分析") # 使用固定的最大预测天数来确保所有分析使用相同的样本集 sample_limit = len(all_dates) - max_days self.logger.info(f"使用统一样本集进行分析,样本量: {sample_limit}") else: # 使用当前分析所需的预测天数 sample_limit = len(all_dates) - days_forward self.logger.info(f"使用动态样本集进行分析,样本量: {sample_limit}") # 定义要分析的资金流指标 # 格式: (指标名, 排序方向, 关联性) # 关联性: 正相关=1, 负相关=-1 (用于确定是取最高还是最低) flow_indicators = [ ('main_force_amount', 1, '主力净额') ] # 确保结果目录存在 os.makedirs('result', exist_ok=True) # 为每个指标进行分析 for indicator, correlation, indicator_name in flow_indicators: self.logger.info(f"分析 {indicator_name} 与未来指数关系...") # 创建结果数据结构 results = [] # 遍历每个交易日期(固定样本集大小) for i in range(sample_limit): current_date = all_dates[i] # 获取当前日期的数据 current_day_data = df[df['trade_date'] == current_date] # 找出该指标排名靠前的行业 if correlation > 0: # 正相关,找最高值 top_sectors = current_day_data.sort_values(indicator, ascending=False).head(1)['name'].tolist() else: # 负相关,找最低值 top_sectors = current_day_data.sort_values(indicator, ascending=True).head(1)['name'].tolist() # 分析每个行业在随后1-N天的表现 for sector in top_sectors: # 获取该行业当天的指数变化和指标值 sector_current = current_day_data[current_day_data['name'] == sector] if sector_current.empty: continue current_pct_change = sector_current['pct_change'].values[0] current_indicator_value = sector_current[indicator].values[0] # 分析随后1-N天的表现 future_changes = [] for day_offset in range(1, days_forward + 1): if i + day_offset < len(all_dates): future_date = all_dates[i + day_offset] future_data = df[(df['trade_date'] == future_date) & (df['name'] == sector)] if not future_data.empty: future_changes.append(future_data['pct_change'].values[0]) else: future_changes.append(None) else: future_changes.append(None) # 如果至少有一个未来日期有数据 if any(x is not None for x in future_changes): result_entry = { 'date': current_date.strftime('%Y%m%d'), # 将日期格式化为YYYYMMDD字符串 'sector': sector, f'{indicator}': current_indicator_value, 'current_pct_change': current_pct_change, } # 添加1-N天的变化 for day in range(1, days_forward + 1): result_entry[f'day{day}_change'] = future_changes[day - 1] # 计算平均变化 result_entry[f'avg_{days_forward}day_change'] = np.nanmean( [x for x in future_changes if x is not None]) results.append(result_entry) # 转换为DataFrame results_df = pd.DataFrame(results) if results_df.empty: self.logger.info(f"没有足够的数据来分析{indicator_name}与后续表现的关系") continue # 保存结果 os.makedirs('result', exist_ok=True) output_file = f'result/{indicator}_performance_{days_forward}days.xlsx' results_df.to_excel(output_file, index=False) self.logger.info(f"{indicator_name}表现分析已保存至{output_file}") # 分析整体表现 avg_performance = {} success_rates = {} for day in range(1, days_forward + 1): # 计算每一天的变化均值 avg_performance[f'day{day}'] = results_df[f'day{day}_change'].mean() # 计算每一天的胜率(正收益的概率) success_rates[f'day{day}'] = (results_df[f'day{day}_change'] > 0).mean() * 100 avg_performance[f'avg_{days_forward}day'] = results_df[f'avg_{days_forward}day_change'].mean() # 输出每天的表现数据 self.logger.info(f"{indicator_name}极值行业在未来{days_forward}天的表现分析:") self.logger.info("日期\t\t平均收益率\t\t胜率") for day in range(1, days_forward + 1): self.logger.info(f"T+{day}\t\t{avg_performance[f'day{day}']:.5f}%\t\t{success_rates[f'day{day}']:.2f}%") self.logger.info(f"{days_forward}天平均: {avg_performance[f'avg_{days_forward}day']:.5f}%") # 绘制折线图显示未来N天的平均表现和胜率 plt.figure(figsize=(14, 8)) # 设置中文字体 try: plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei'] plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 days_labels = ['当天'] + [f'第{i}天' for i in range(1, days_forward + 1)] direction = "最高" if correlation > 0 else "最低" title = f'{indicator_name}{direction}后的平均表现 ({days_forward}天)' xlabel = '时间' ylabel = '指数变化率 (%)' except: # 如果没有中文字体,使用英文 days_labels = ['Current'] + [f'Day+{i}' for i in range(1, days_forward + 1)] direction = "Highest" if correlation > 0 else "Lowest" title = f'Average Performance After {indicator_name} {direction} ({days_forward} Days)' xlabel = 'Time' ylabel = 'Index Change Rate (%)' # 创建双y轴图表 fig, ax1 = plt.subplots(figsize=(14, 8)) # 设置收益率曲线 (左y轴) values = [results_df['current_pct_change'].mean()] + [avg_performance[f'day{i}'] for i in range(1, days_forward + 1)] ax1.plot(days_labels, values, marker='o', linewidth=2, color='blue', label='平均收益率') ax1.axhline(y=0, color='r', linestyle='--') ax1.set_xlabel(xlabel) ax1.set_ylabel('收益率 (%)', color='blue') ax1.tick_params(axis='y', labelcolor='blue') # 设置胜率曲线 (右y轴) ax2 = ax1.twinx() win_rates = [50] # 当天默认为50% for i in range(1, days_forward + 1): win_rates.append(success_rates[f'day{i}']) ax2.plot(days_labels, win_rates, marker='s', linewidth=2, color='green', label='胜率') ax2.set_ylabel('胜率 (%)', color='green') ax2.tick_params(axis='y', labelcolor='green') ax2.axhline(y=50, color='green', linestyle='--', alpha=0.5) # 添加图例 lines1, labels1 = ax1.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels() ax1.legend(lines1 + lines2, labels1 + labels2, loc='best') plt.title(title, fontsize=14) plt.grid(True) plt.xticks(rotation=45) # 旋转x轴标签以避免重叠 # 保存图表 output_image = f'result/{indicator}_performance_{days_forward}days.png' plt.savefig(output_image, dpi=300, bbox_inches='tight') self.logger.info(f"{indicator_name}表现图表已保存至{output_image}") # 额外保存胜率图表 plt.figure(figsize=(14, 8)) plt.bar(range(1, days_forward + 1), [success_rates[f'day{i}'] for i in range(1, days_forward + 1)], color='green', alpha=0.7) plt.axhline(y=50, color='r', linestyle='--') plt.title(f'{indicator_name}{direction}后的未来胜率分布 ({days_forward}天)', fontsize=14) plt.xlabel('未来天数') plt.ylabel('胜率 (%)') plt.xticks(range(1, days_forward + 1), [f'T+{i}' for i in range(1, days_forward + 1)]) plt.grid(True, axis='y') # 在柱状图上标注具体数值 for i in range(1, days_forward + 1): plt.text(i, success_rates[f'day{i}'] + 1, f"{success_rates[f'day{i}']:.1f}%", ha='center', va='bottom', fontsize=9) win_rate_image = f'result/{indicator}_win_rate_{days_forward}days.png' plt.savefig(win_rate_image, dpi=300, bbox_inches='tight') self.logger.info(f"{indicator_name}胜率分布图已保存至{win_rate_image}") return True