diff --git a/news_analyze.py b/news_analyze.py index 80a0369..b7d6af5 100644 --- a/news_analyze.py +++ b/news_analyze.py @@ -146,14 +146,34 @@ def get_news_by_date(date_str, force_update=False): return None -def analyze_news_in_batches(news_df): - """将新闻按批次分析后汇总""" +def analyze_news_in_batches(news_df, date_str=None, force_analyze=False): + """将新闻按批次分析后汇总,支持缓存分析结果""" if news_df is None or news_df.empty: return None + # 确保temp目录存在 + temp_dir = ensure_temp_dir() + config = load_config() openai_config = config['openai_api'] + # 如果提供了日期,尝试读取缓存的分析结果 + cache_file = None + if date_str: + date_file = datetime.datetime.strptime(date_str, '%Y-%m-%d').strftime('%Y%m%d') + cache_file = os.path.join(temp_dir, f"analysis_cache_{date_file}.json") + + # 检查是否有缓存,且不强制重新分析 + if os.path.exists(cache_file) and not force_analyze: + try: + import json + with open(cache_file, 'r', encoding='utf-8') as f: + cached_analysis = json.load(f) + print(f"使用缓存的分析结果: {cache_file}") + return cached_analysis + except Exception as e: + print(f"读取缓存文件时出错: {e}") + # 初始化 OpenAI 客户端 client = OpenAI( api_key=openai_config['api_key'], @@ -164,6 +184,8 @@ def analyze_news_in_batches(news_df): batch_size = 200 total_batches = math.ceil(len(news_df) / batch_size) batch_results = [] + # 保存每批次的重要新闻原文 + important_news = [] print(f"将{len(news_df)}条新闻分成{total_batches}批进行分析,每批{batch_size}条") @@ -171,12 +193,41 @@ def analyze_news_in_batches(news_df): start_idx = i * batch_size end_idx = min((i + 1) * batch_size, len(news_df)) batch_df = news_df.iloc[start_idx:end_idx] - print(f"分析第{i + 1}/{total_batches}批新闻...") + # 生成缓存文件名,按批次缓存 + batch_cache_file = None + if date_str: + date_file = datetime.datetime.strptime(date_str, '%Y-%m-%d').strftime('%Y%m%d') + batch_cache_file = os.path.join(temp_dir, f"batch_analysis_cache_{date_file}_{i + 1}.json") + + # 检查是否有批次缓存,且不强制重新分析 + if os.path.exists(batch_cache_file) and not force_analyze: + try: + import json + with open(batch_cache_file, 'r', encoding='utf-8') as f: + cached_batch = json.load(f) + print(f"使用缓存的批次分析结果: {batch_cache_file}") + batch_results.append(cached_batch) + + # 添加此批次的重要新闻到列表 + if "important_news" in cached_batch: + important_news.extend(cached_batch["important_news"]) + + continue + except Exception as e: + print(f"读取批次缓存文件时出错: {e}") + # 合并当前批次新闻内容 batch_content = "" + batch_news = [] # 存储此批次的新闻 + for _, row in batch_df.iterrows(): + news_item = { + "datetime": row['datetime'], + "content": row['content'] + } + batch_news.append(news_item) batch_content += f"{row['datetime']}: {row['content']}\n\n" # 对当前批次进行初步分析 @@ -185,13 +236,25 @@ def analyze_news_in_batches(news_df): {batch_content} -请总结以下两点: +请完成以下两项分析: 1. 这批新闻中提到的重要政策(特别是中国颁布的法令等),如果没有相关政策可以直接说明 2. 这些政策可能对哪些股市板块带来影响(利好/利空) -请只提取重要信息,简明扼要地回答。 -""" +对于重要的政策新闻,请在分析后提供一个"重要新闻列表",包含新闻的原文和发布时间。 +请只提取重要信息,过滤掉不相关的新闻(如普通的股价波动、ETF变动等)。 +回复格式: +## 政策分析 +[您的分析内容] + +## 板块影响 +[您的影响分析] + +## 重要新闻列表 +1. [时间] [完整新闻原文] +2. [时间] [完整新闻原文] +... +""" try: response = client.chat.completions.create( messages=[ @@ -203,14 +266,43 @@ def analyze_news_in_batches(news_df): model=openai_config['model'], temperature=0.2 ) - batch_analysis = response.choices[0].message.content - batch_results.append({ + # 提取重要新闻 + batch_important_news = [] + if "## 重要新闻列表" in batch_analysis: + news_section = batch_analysis.split("## 重要新闻列表", 1)[1].strip() + import re + # 匹配时间和新闻内容 + news_matches = re.findall(r'\d+\.\s+\[([^\]]+)\]\s+(.+?)(?=\n\d+\.|$)', news_section, re.DOTALL) + for time_str, content in news_matches: + batch_important_news.append({ + "datetime": time_str.strip(), + "content": content.strip() + }) + + batch_result = { "batch_number": i + 1, "news_count": len(batch_df), - "analysis": batch_analysis - }) + "analysis": batch_analysis, + "important_news": batch_important_news + } + + batch_results.append(batch_result) + + # 添加重要新闻到总列表 + important_news.extend(batch_important_news) + + # 保存批次缓存 + if batch_cache_file: + try: + import json + with open(batch_cache_file, 'w', encoding='utf-8') as f: + json.dump(batch_result, f, ensure_ascii=False, indent=2) + print(f"已缓存批次分析结果到: {batch_cache_file}") + except Exception as e: + print(f"保存批次缓存文件时出错: {e}") + except Exception as e: print(f"分析第{i + 1}批新闻时出错: {e}") @@ -236,6 +328,7 @@ def analyze_news_in_batches(news_df): - 提取3-5条最重要的国家层面政策(特别是中国的宏观经济政策) - 每条政策用一句话概括核心内容 - 标注政策发布部门/会议名称 + - 对每条重要政策,提供对应的原始新闻来源 2. 【板块影响分析】 - 对每条重要政策,分析直接影响的相关行业板块 @@ -251,17 +344,17 @@ def analyze_news_in_batches(news_df): ### 一、核心政策摘要 1. [政策部门/会议] 政策核心内容 + - 来源新闻: [对应的原始新闻] - 影响板块:利好:XXX;利空:XXX - 影响逻辑:... - 2. [政策部门/会议] 政策核心内容 + - 来源新闻: [对应的原始新闻] - 影响板块:利好:XXX;利空:XXX - 影响逻辑:... ### 二、综合市场影响 [整体分析,包含市场情绪预判和关键时间节点提醒] """ - try: final_response = client.chat.completions.create( messages=[ @@ -273,16 +366,29 @@ def analyze_news_in_batches(news_df): model=openai_config['model'], temperature=0.2 ) - final_analysis = final_response.choices[0].message.content - return { + analysis_result = { "news_count": len(news_df), "batch_count": total_batches, "time_range": f"{news_df['datetime'].iloc[0]} 至 {news_df['datetime'].iloc[-1]}" if not news_df.empty else "", "batch_results": batch_results, - "final_analysis": final_analysis + "final_analysis": final_analysis, + "important_news": important_news # 保存所有重要新闻原文 } + + # 保存完整分析缓存 + if cache_file: + try: + import json + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(analysis_result, f, ensure_ascii=False, indent=2) + print(f"已缓存完整分析结果到: {cache_file}") + except Exception as e: + print(f"保存缓存文件时出错: {e}") + + return analysis_result + except Exception as e: print(f"生成最终分析时出错: {e}") return None @@ -308,31 +414,53 @@ def compare_trend_analysis(yesterday_analysis, today_analysis): base_url=openai_config['base_url'] ) + # 提取今天和昨天的重要新闻 + yesterday_news = yesterday_analysis.get('important_news', []) + today_news = today_analysis.get('important_news', []) + + # 整理新闻字符串 + yesterday_news_str = "" + for i, news in enumerate(yesterday_news): + yesterday_news_str += f"{i + 1}. [{news['datetime']}] {news['content']}\n\n" + + today_news_str = "" + for i, news in enumerate(today_news): + today_news_str += f"{i + 1}. [{news['datetime']}] {news['content']}\n\n" + # 构建对比分析提示 - prompt = f""" + prompt = f""" 你是一位资深财经分析师,请对比昨天和今天的政策分析结果,找出从弱势转为强势的板块,分析未来市场趋势: ### 昨天的分析: {yesterday_analysis['final_analysis']} +### 昨天的重要新闻原文: +{yesterday_news_str} + ### 今天的分析: {today_analysis['final_analysis']} +### 今天的重要新闻原文: +{today_news_str} + 请完成以下分析: 1. 【弱转强板块识别】 - 对比昨天的弱势板块和今天的强势板块,找出明显从弱势转为强势的板块 - 分析这些板块转强的原因 - 评估这种转变的持续性 + - 关联对应的原始新闻,作为判断依据 2. 【政策趋势分析】 - 分析昨天和今天的政策,提取政策方向的变化或延续 - 判断政策组合对未来市场的影响 - 预测可能被持续关注和推动的行业方向 + - 引用相关新闻作为证据 3. 【投资机会预判】 - 基于弱转强的板块和政策趋势,预判未来1-2周可能出现的投资机会 - 提出值得重点关注的细分行业 + - 提供相关的政策和新闻支持 请用以下结构化格式输出: @@ -341,19 +469,20 @@ def compare_trend_analysis(yesterday_analysis, today_analysis): - 转变原因:... - 相关政策支持:... - 持续性评估:... + - 关键新闻依据:[引用原始新闻] 2. [板块名称] - 转变原因:... - 相关政策支持:... - 持续性评估:... + - 关键新闻依据:[引用原始新闻] ### 二、政策趋势变化 -[分析政策方向的延续性和变化] +[分析政策方向的延续性和变化,引用相关新闻作为证据] ### 三、投资机会与建议 -[提出具体的投资方向和关注重点] +[提出具体的投资方向和关注重点,引用相关政策和新闻] """ - try: response = client.chat.completions.create( messages=[ @@ -365,13 +494,14 @@ def compare_trend_analysis(yesterday_analysis, today_analysis): model=openai_config['model'], temperature=0.3 ) - trend_analysis = response.choices[0].message.content return { "yesterday_date": yesterday_analysis.get("time_range", "昨天"), "today_date": today_analysis.get("time_range", "今天"), - "trend_analysis": trend_analysis + "trend_analysis": trend_analysis, + "yesterday_news": yesterday_news, + "today_news": today_news } except Exception as e: print(f"生成趋势分析时出错: {e}") @@ -391,7 +521,6 @@ def main(): # 获取昨天的新闻,只有在不存在时才获取 yesterday_file_exists = False temp_dir = ensure_temp_dir() - for file in os.listdir(temp_dir): if file.startswith(f"{yesterday_file}_") and file.endswith(".csv"): yesterday_file_exists = True @@ -407,7 +536,6 @@ def main(): for file in os.listdir(temp_dir): if file.startswith(f"{yesterday_file}_") and file.endswith(".csv"): yesterday_files.append(os.path.join(temp_dir, file)) - dfs = [] for file in yesterday_files: try: @@ -415,65 +543,86 @@ def main(): dfs.append(df) except Exception as e: print(f"读取文件{file}时出错: {e}") - if dfs: yesterday_news = pd.concat(dfs, ignore_index=True) if 'datetime' in yesterday_news.columns and 'content' in yesterday_news.columns: yesterday_news = yesterday_news[['datetime', 'content']] - # 分析今天和昨天的新闻 + # 分析今天和昨天的新闻,使用缓存机制 today_analysis = None yesterday_analysis = None if today_news is not None: print(f"获取到 {len(today_news)} 条今日新闻,正在分析...") - today_analysis = analyze_news_in_batches(today_news) + today_analysis = analyze_news_in_batches(today_news, today, force_analyze=False) # 保存今天的原始新闻和分析结果 - today_news.to_csv(f"news_raw_{today_file}.csv", index=False, encoding='utf-8-sig') - + today_news.to_csv(f"temp/news_raw_{today_file}.csv", index=False, encoding='utf-8-sig') if today_analysis: - with open(f"news_analysis_{today_file}.txt", "w", encoding="utf-8") as f: + with open(f"temp/news_analysis_{today_file}.txt", "w", encoding="utf-8") as f: f.write( f"分析范围: {today_analysis['news_count']} 条新闻,分成 {today_analysis['batch_count']} 批处理\n") f.write(f"时间范围: {today_analysis['time_range']}\n\n") f.write("最终分析结果:\n") f.write("-" * 80 + "\n") f.write(today_analysis['final_analysis']) - f.write("\n" + "-" * 80 + "\n") + f.write("\n" + "-" * 80 + "\n\n") - print(f"今日分析结果已保存到 news_analysis_{today_file}.txt") + # 添加重要新闻原文 + f.write("重要新闻原文:\n") + f.write("-" * 80 + "\n") + for i, news in enumerate(today_analysis.get('important_news', [])): + f.write(f"{i + 1}. [{news['datetime']}] {news['content']}\n\n") + f.write("-" * 80 + "\n") + + print(f"今日分析结果已保存到 temp/news_analysis_{today_file}.txt") else: print("无法获取今天的新闻数据") if yesterday_news is not None: print(f"获取到 {len(yesterday_news)} 条昨日新闻,正在分析...") - yesterday_analysis = analyze_news_in_batches(yesterday_news) + yesterday_analysis = analyze_news_in_batches(yesterday_news, yesterday, force_analyze=False) # 保存昨天的原始新闻和分析结果 - yesterday_news.to_csv(f"news_raw_{yesterday_file}.csv", index=False, encoding='utf-8-sig') - + yesterday_news.to_csv(f"temp/news_raw_{yesterday_file}.csv", index=False, encoding='utf-8-sig') if yesterday_analysis: - with open(f"news_analysis_{yesterday_file}.txt", "w", encoding="utf-8") as f: + with open(f"temp/news_analysis_{yesterday_file}.txt", "w", encoding="utf-8") as f: f.write( f"分析范围: {yesterday_analysis['news_count']} 条新闻,分成 {yesterday_analysis['batch_count']} 批处理\n") f.write(f"时间范围: {yesterday_analysis['time_range']}\n\n") f.write("最终分析结果:\n") f.write("-" * 80 + "\n") f.write(yesterday_analysis['final_analysis']) - f.write("\n" + "-" * 80 + "\n") + f.write("\n" + "-" * 80 + "\n\n") - print(f"昨日分析结果已保存到 news_analysis_{yesterday_file}.txt") + # 添加重要新闻原文 + f.write("重要新闻原文:\n") + f.write("-" * 80 + "\n") + for i, news in enumerate(yesterday_analysis.get('important_news', [])): + f.write(f"{i + 1}. [{news['datetime']}] {news['content']}\n\n") + f.write("-" * 80 + "\n") + + print(f"昨日分析结果已保存到 temp/news_analysis_{yesterday_file}.txt") else: # 尝试读取昨天的分析结果文件 try: - with open(f"news_analysis_{yesterday_file}.txt", "r", encoding="utf-8") as f: - content = f.read() - yesterday_analysis = { - "final_analysis": content, - "time_range": yesterday - } - print(f"已从文件中读取昨日分析结果") + # 首先尝试读取JSON缓存文件 + cache_file = os.path.join(temp_dir, f"analysis_cache_{yesterday_file}.json") + if os.path.exists(cache_file): + import json + with open(cache_file, 'r', encoding='utf-8') as f: + yesterday_analysis = json.load(f) + print(f"已从缓存中读取昨日分析结果") + else: + # 如果没有缓存,尝试读取文本文件 + with open(f"temp/news_analysis_{yesterday_file}.txt", "r", encoding="utf-8") as f: + content = f.read() + yesterday_analysis = { + "final_analysis": content.split("-" * 80)[1].strip(), + "time_range": yesterday, + "important_news": [] # 从文本文件中难以结构化提取新闻,置为空列表 + } + print(f"已从文本文件中读取昨日分析结果") except Exception as e: print(f"无法读取昨日分析结果: {e}") @@ -481,17 +630,29 @@ def main(): if today_analysis and yesterday_analysis: print("正在进行今日与昨日的趋势对比分析...") trend_analysis = compare_trend_analysis(yesterday_analysis, today_analysis) - if trend_analysis: # 保存趋势分析结果 - with open(f"trend_analysis_{today_file}.txt", "w", encoding="utf-8") as f: + with open(f"temp/trend_analysis_{today_file}.txt", "w", encoding="utf-8") as f: f.write(f"对比分析范围: 昨日({yesterday}) 与 今日({today})\n\n") f.write("趋势分析结果:\n") f.write("-" * 80 + "\n") f.write(trend_analysis['trend_analysis']) - f.write("\n" + "-" * 80 + "\n") + f.write("\n" + "-" * 80 + "\n\n") - print(f"趋势分析结果已保存到 trend_analysis_{today_file}.txt") + # 添加弱转强的新闻原文供参考 + f.write("昨日重要新闻原文:\n") + f.write("-" * 80 + "\n") + for i, news in enumerate(trend_analysis.get('yesterday_news', [])): + f.write(f"{i + 1}. [{news['datetime']}] {news['content']}\n\n") + f.write("-" * 80 + "\n\n") + + f.write("今日重要新闻原文:\n") + f.write("-" * 80 + "\n") + for i, news in enumerate(trend_analysis.get('today_news', [])): + f.write(f"{i + 1}. [{news['datetime']}] {news['content']}\n\n") + f.write("-" * 80 + "\n") + + print(f"趋势分析结果已保存到 temp/trend_analysis_{today_file}.txt") # 打印趋势分析结果 print("\n=== 今日与昨日趋势对比分析 ===\n")