diff --git a/main.py b/main.py index aa7d250..979a0cb 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,9 @@ from data_fetcher import DataFetcher +from money_flow_analyzer import MoneyflowAnalyzer if __name__ == "__main__": # 指定日期范围 - start_date = '20250401' + start_date = '20230912' end_date = None # 创建数据获取器实例 @@ -10,4 +11,7 @@ if __name__ == "__main__": # 获取板块资金流向数据 # 可以通过force_update=True参数强制更新指定日期范围的数据 - df = data_fetcher.get_moneyflow_ind_dc(start_date, end_date, force_update=False) \ No newline at end of file + df = data_fetcher.get_moneyflow_ind_dc(start_date, end_date, force_update=False) + + analyzer = MoneyflowAnalyzer() + analyzer.main_flow_analyze(days_forward=2,use_consistent_samples=True) \ No newline at end of file diff --git a/money_flow_analyzer.py b/money_flow_analyzer.py new file mode 100644 index 0000000..8350af7 --- /dev/null +++ b/money_flow_analyzer.py @@ -0,0 +1,246 @@ +import os +import traceback +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from config_manager import get_config_manager +from database_manager import DatabaseManager +from logger import get_logger + + +class MoneyflowAnalyzer: + """ + 市场资金流分析器,分析各类资金流向指标对行业在随后1-N天表现的影响 + """ + + def __init__(self): + self.db_manager = DatabaseManager() + self.config = get_config_manager() + self.logger = get_logger() + + def main_flow_analyze(self, days_forward=10, use_consistent_samples=False, max_days=20): + """ + 分析各类资金流向指标对行业在随后1-N天表现的影响 + + 参数: + days_forward (int): 要分析的未来天数,默认为10天 + """ + self.logger.info(f"开始分析资金流数据,未来{days_forward}天的表现...") + self.logger.info(f"使用一致样本集: {use_consistent_samples}") + + # 读取资金流数据 + try: + df = self.db_manager.load_df_from_db('moneyflow_ind_dc') + self.logger.info(f"成功从数据库加载资金流数据,共计{len(df)}条记录") + except Exception as e: + self.logger.error(f"从数据库读取数据失败: {e}") + self.logger.debug(f"完整的错误追踪信息:\n{traceback.format_exc()}") + return False + + # 检查是否成功获取数据 + if df.empty: + self.logger.info("获取的资金流数据为空,无法进行分析") + return False + + # 数据预处理 - 计算主力资金 + df['main_force_amount'] = df['buy_elg_amount'] + df['buy_lg_amount'] + + # 将日期格式转换为datetime + df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d', errors='coerce') + df = df[~df['trade_date'].isna()] + + # 按日期排序 + df = df.sort_values('trade_date') + + # 获取所有交易日期 + all_dates = df['trade_date'].unique() + + # 确定样本范围 + if use_consistent_samples: + # 使用固定的最大预测天数来确保所有分析使用相同的样本集 + sample_limit = len(all_dates) - max_days + self.logger.info(f"使用统一样本集进行分析,样本量: {sample_limit}") + else: + # 使用当前分析所需的预测天数 + sample_limit = len(all_dates) - days_forward + self.logger.info(f"使用动态样本集进行分析,样本量: {sample_limit}") + + # 定义要分析的资金流指标 + # 格式: (指标名, 排序方向, 关联性) + # 关联性: 正相关=1, 负相关=-1 (用于确定是取最高还是最低) + flow_indicators = [ + ('main_force_amount', 1, '主力净额') + ] + + # 确保结果目录存在 + os.makedirs('result', exist_ok=True) + + # 为每个指标进行分析 + for indicator, correlation, indicator_name in flow_indicators: + self.logger.info(f"分析 {indicator_name} 与未来指数关系...") + + # 创建结果数据结构 + results = [] + + # 遍历每个交易日期(固定样本集大小) + for i in range(sample_limit): + current_date = all_dates[i] + + # 获取当前日期的数据 + current_day_data = df[df['trade_date'] == current_date] + + # 找出该指标排名靠前的行业 + if correlation > 0: + # 正相关,找最高值 + top_sectors = current_day_data.sort_values(indicator, ascending=False).head(1)['name'].tolist() + else: + # 负相关,找最低值 + top_sectors = current_day_data.sort_values(indicator, ascending=True).head(1)['name'].tolist() + + # 分析每个行业在随后1-N天的表现 + for sector in top_sectors: + # 获取该行业当天的指数变化和指标值 + sector_current = current_day_data[current_day_data['name'] == sector] + if sector_current.empty: + continue + + current_pct_change = sector_current['pct_change'].values[0] + current_indicator_value = sector_current[indicator].values[0] + + # 分析随后1-N天的表现 + future_changes = [] + for day_offset in range(1, days_forward + 1): + if i + day_offset < len(all_dates): + future_date = all_dates[i + day_offset] + future_data = df[(df['trade_date'] == future_date) & (df['name'] == sector)] + if not future_data.empty: + future_changes.append(future_data['pct_change'].values[0]) + else: + future_changes.append(None) + else: + future_changes.append(None) + + # 如果至少有一个未来日期有数据 + if any(x is not None for x in future_changes): + result_entry = { + 'date': current_date.strftime('%Y%m%d'), # 将日期格式化为YYYYMMDD字符串 + 'sector': sector, + f'{indicator}': current_indicator_value, + 'current_pct_change': current_pct_change, + } + + # 添加1-N天的变化 + for day in range(1, days_forward + 1): + result_entry[f'day{day}_change'] = future_changes[day - 1] + + # 计算平均变化 + result_entry[f'avg_{days_forward}day_change'] = np.nanmean( + [x for x in future_changes if x is not None]) + results.append(result_entry) + + # 转换为DataFrame + results_df = pd.DataFrame(results) + if results_df.empty: + self.logger.info(f"没有足够的数据来分析{indicator_name}与后续表现的关系") + continue + + # 保存结果 + os.makedirs('result', exist_ok=True) + output_file = f'result/{indicator}_performance_{days_forward}days.xlsx' + results_df.to_excel(output_file, index=False) + self.logger.info(f"{indicator_name}表现分析已保存至{output_file}") + + # 分析整体表现 + avg_performance = {} + success_rates = {} + for day in range(1, days_forward + 1): + # 计算每一天的变化均值 + avg_performance[f'day{day}'] = results_df[f'day{day}_change'].mean() + # 计算每一天的胜率(正收益的概率) + success_rates[f'day{day}'] = (results_df[f'day{day}_change'] > 0).mean() * 100 + + avg_performance[f'avg_{days_forward}day'] = results_df[f'avg_{days_forward}day_change'].mean() + + # 输出每天的表现数据 + self.logger.info(f"{indicator_name}极值行业在未来{days_forward}天的表现分析:") + self.logger.info("日期\t\t平均收益率\t\t胜率") + for day in range(1, days_forward + 1): + self.logger.info(f"T+{day}\t\t{avg_performance[f'day{day}']:.5f}%\t\t{success_rates[f'day{day}']:.2f}%") + self.logger.info(f"{days_forward}天平均: {avg_performance[f'avg_{days_forward}day']:.5f}%") + + # 绘制折线图显示未来N天的平均表现和胜率 + plt.figure(figsize=(14, 8)) + # 设置中文字体 + try: + plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei'] + plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 + days_labels = ['当天'] + [f'第{i}天' for i in range(1, days_forward + 1)] + direction = "最高" if correlation > 0 else "最低" + title = f'{indicator_name}{direction}后的平均表现 ({days_forward}天)' + xlabel = '时间' + ylabel = '指数变化率 (%)' + except: + # 如果没有中文字体,使用英文 + days_labels = ['Current'] + [f'Day+{i}' for i in range(1, days_forward + 1)] + direction = "Highest" if correlation > 0 else "Lowest" + title = f'Average Performance After {indicator_name} {direction} ({days_forward} Days)' + xlabel = 'Time' + ylabel = 'Index Change Rate (%)' + + # 创建双y轴图表 + fig, ax1 = plt.subplots(figsize=(14, 8)) + + # 设置收益率曲线 (左y轴) + values = [results_df['current_pct_change'].mean()] + [avg_performance[f'day{i}'] for i in + range(1, days_forward + 1)] + ax1.plot(days_labels, values, marker='o', linewidth=2, color='blue', label='平均收益率') + ax1.axhline(y=0, color='r', linestyle='--') + ax1.set_xlabel(xlabel) + ax1.set_ylabel('收益率 (%)', color='blue') + ax1.tick_params(axis='y', labelcolor='blue') + + # 设置胜率曲线 (右y轴) + ax2 = ax1.twinx() + win_rates = [50] # 当天默认为50% + for i in range(1, days_forward + 1): + win_rates.append(success_rates[f'day{i}']) + ax2.plot(days_labels, win_rates, marker='s', linewidth=2, color='green', label='胜率') + ax2.set_ylabel('胜率 (%)', color='green') + ax2.tick_params(axis='y', labelcolor='green') + ax2.axhline(y=50, color='green', linestyle='--', alpha=0.5) + + # 添加图例 + lines1, labels1 = ax1.get_legend_handles_labels() + lines2, labels2 = ax2.get_legend_handles_labels() + ax1.legend(lines1 + lines2, labels1 + labels2, loc='best') + + plt.title(title, fontsize=14) + plt.grid(True) + plt.xticks(rotation=45) # 旋转x轴标签以避免重叠 + + # 保存图表 + output_image = f'result/{indicator}_performance_{days_forward}days.png' + plt.savefig(output_image, dpi=300, bbox_inches='tight') + self.logger.info(f"{indicator_name}表现图表已保存至{output_image}") + + # 额外保存胜率图表 + plt.figure(figsize=(14, 8)) + plt.bar(range(1, days_forward + 1), [success_rates[f'day{i}'] for i in range(1, days_forward + 1)], + color='green', alpha=0.7) + plt.axhline(y=50, color='r', linestyle='--') + plt.title(f'{indicator_name}{direction}后的未来胜率分布 ({days_forward}天)', fontsize=14) + plt.xlabel('未来天数') + plt.ylabel('胜率 (%)') + plt.xticks(range(1, days_forward + 1), [f'T+{i}' for i in range(1, days_forward + 1)]) + plt.grid(True, axis='y') + + # 在柱状图上标注具体数值 + for i in range(1, days_forward + 1): + plt.text(i, success_rates[f'day{i}'] + 1, f"{success_rates[f'day{i}']:.1f}%", + ha='center', va='bottom', fontsize=9) + + win_rate_image = f'result/{indicator}_win_rate_{days_forward}days.png' + plt.savefig(win_rate_image, dpi=300, bbox_inches='tight') + self.logger.info(f"{indicator_name}胜率分布图已保存至{win_rate_image}") + + return True \ No newline at end of file