import datetime import pandas as pd import os import math from openai import OpenAI from utils import load_config def ensure_temp_dir(): """确保temp目录存在""" temp_dir = os.path.join(os.getcwd(), 'temp') if not os.path.exists(temp_dir): os.makedirs(temp_dir) return temp_dir def get_news_by_date(date_str, force_update=False): """ 根据指定日期获取新闻数据 Args: date_str (str): 日期字符串,格式为'YYYY-MM-DD' force_update (bool): 是否强制从API重新获取数据 """ config = load_config() # 初始化 Tushare import tushare as ts ts.set_token(config['tushare_token']) pro = ts.pro_api() # 日期格式转换 date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d') date_file = date_obj.strftime('%Y%m%d') # 获取完整的开始和结束时间 start_datetime = f"{date_str} 00:00:00" end_datetime = f"{date_str} 23:59:59" print(f"获取 {date_str} 的新闻数据") # 确保temp目录存在 temp_dir = ensure_temp_dir() # 检查是否已有该日期的数据文件 all_news_df = None date_files = [] if not force_update: for file in os.listdir(temp_dir): if file.startswith(f"{date_file}_") and file.endswith(".csv"): date_files.append(os.path.join(temp_dir, file)) if date_files: # 如果有该日期的数据文件,直接读取合并 print(f"发现{len(date_files)}个 {date_str} 的新闻数据文件,正在读取...") dfs = [] for file in date_files: try: df = pd.read_csv(file, encoding='utf-8-sig') dfs.append(df) except Exception as e: print(f"读取文件{file}时出错: {e}") if dfs: all_news_df = pd.concat(dfs, ignore_index=True) return all_news_df else: print(f"强制更新模式:忽略现有 {date_str} 数据文件,从API重新获取") # 从API获取数据 try: # 获取API数据 all_news = [] last_timestamp = None batch_size = 1500 # 每次获取1500条 while True: print(f"获取 {date_str} 的新闻数据批次...") # 第一次调用使用日期范围,后续调用使用时间戳 if last_timestamp is None: df = pro.news( src='sina', start_date=start_datetime, end_date=end_datetime, channel='hongguan', limit=batch_size ) else: # 使用上一批次的最早时间戳作为新的结束时间 temp_end_time = last_timestamp df = pro.news( src='sina', start_date=start_datetime, end_date=temp_end_time, channel='hongguan', limit=batch_size ) if df.empty or len(df) == 0: break all_news.append(df) # 更新时间戳,用于下一次请求 if 'datetime' in df.columns and not df.empty: # 获取当前批次中最早的时间戳 last_timestamp = df['datetime'].min() else: break # 如果返回的数据量小于批次大小,说明已获取完毕 if len(df) < batch_size: break if not all_news: print(f"{date_str} 暂无新闻数据") return None all_news_df = pd.concat(all_news, ignore_index=True) # 分批存储数据,每1000条存一个文件 chunk_size = 1000 num_chunks = math.ceil(len(all_news_df) / chunk_size) for i in range(num_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, len(all_news_df)) chunk_df = all_news_df.iloc[start_idx:end_idx] # 保存到temp目录 file_path = os.path.join(temp_dir, f"{date_file}_{i + 1}.csv") chunk_df.to_csv(file_path, index=False, encoding='utf-8-sig') print(f"已保存{len(chunk_df)}条新闻数据到 {file_path}") except Exception as e: print(f"获取 {date_str} 新闻数据时出错: {e}") return None # 只保留datetime和content列 if all_news_df is not None and not all_news_df.empty: if 'datetime' in all_news_df.columns and 'content' in all_news_df.columns: return all_news_df[['datetime', 'content']] else: print("返回的数据结构不包含所需列") return None def analyze_news_in_batches(news_df): """将新闻按批次分析后汇总""" if news_df is None or news_df.empty: return None config = load_config() openai_config = config['openai_api'] # 初始化 OpenAI 客户端 client = OpenAI( api_key=openai_config['api_key'], base_url=openai_config['base_url'] ) # 将新闻按每批200条进行分组 batch_size = 200 total_batches = math.ceil(len(news_df) / batch_size) batch_results = [] print(f"将{len(news_df)}条新闻分成{total_batches}批进行分析,每批{batch_size}条") for i in range(total_batches): start_idx = i * batch_size end_idx = min((i + 1) * batch_size, len(news_df)) batch_df = news_df.iloc[start_idx:end_idx] print(f"分析第{i + 1}/{total_batches}批新闻...") # 合并当前批次新闻内容 batch_content = "" for _, row in batch_df.iterrows(): batch_content += f"{row['datetime']}: {row['content']}\n\n" # 对当前批次进行初步分析 prompt = f""" 你是一位资深财经分析师,请从下面的新闻中提取重要的政策信息和可能对股市产生影响的信息: {batch_content} 请总结以下两点: 1. 这批新闻中提到的重要政策(特别是中国颁布的法令等),如果没有相关政策可以直接说明 2. 这些政策可能对哪些股市板块带来影响(利好/利空) 请只提取重要信息,简明扼要地回答。 """ try: response = client.chat.completions.create( messages=[ { "role": "user", "content": prompt } ], model=openai_config['model'], temperature=0.2 ) batch_analysis = response.choices[0].message.content batch_results.append({ "batch_number": i + 1, "news_count": len(batch_df), "analysis": batch_analysis }) except Exception as e: print(f"分析第{i + 1}批新闻时出错: {e}") # 汇总所有批次的分析结果 if not batch_results: return None # 合并所有分析结果,进行最终汇总分析 all_batch_analyses = "" for result in batch_results: all_batch_analyses += f"第{result['batch_number']}批({result['news_count']}条新闻)分析结果:\n" all_batch_analyses += result['analysis'] + "\n\n" # 最终的汇总分析 final_prompt = f""" 你是一位资深财经分析师,现在我提供给你多批新闻的初步分析结果,请你进行最终的汇总和深入分析: {all_batch_analyses} 基于以上所有批次的分析,请完成以下分析任务: 1. 【政策要点提炼】 - 提取3-5条最重要的国家层面政策(特别是中国的宏观经济政策) - 每条政策用一句话概括核心内容 - 标注政策发布部门/会议名称 2. 【板块影响分析】 - 对每条重要政策,分析直接影响的相关行业板块 - 明确标注"利好板块"和"利空板块" - 简要说明影响逻辑(1-2句话) 3. 【市场影响预判】 - 综合分析政策组合对A股市场的整体影响 - 预判短期(1周内)可能的市场反应 - 指出需要重点关注的政策执行时间节点 请用以下结构化格式输出: ### 一、核心政策摘要 1. [政策部门/会议] 政策核心内容 - 影响板块:利好:XXX;利空:XXX - 影响逻辑:... 2. [政策部门/会议] 政策核心内容 - 影响板块:利好:XXX;利空:XXX - 影响逻辑:... ### 二、综合市场影响 [整体分析,包含市场情绪预判和关键时间节点提醒] """ try: final_response = client.chat.completions.create( messages=[ { "role": "user", "content": final_prompt } ], model=openai_config['model'], temperature=0.2 ) final_analysis = final_response.choices[0].message.content return { "news_count": len(news_df), "batch_count": total_batches, "time_range": f"{news_df['datetime'].iloc[0]} 至 {news_df['datetime'].iloc[-1]}" if not news_df.empty else "", "batch_results": batch_results, "final_analysis": final_analysis } except Exception as e: print(f"生成最终分析时出错: {e}") return None def compare_trend_analysis(yesterday_analysis, today_analysis): """ 比较昨天和今天的分析结果,找出弱转强的板块 Args: yesterday_analysis: 昨天的分析结果 today_analysis: 今天的分析结果 """ if not yesterday_analysis or not today_analysis: print("缺少分析数据,无法进行趋势对比") return None config = load_config() openai_config = config['openai_api'] # 初始化 OpenAI 客户端 client = OpenAI( api_key=openai_config['api_key'], base_url=openai_config['base_url'] ) # 构建对比分析提示 prompt = f""" 你是一位资深财经分析师,请对比昨天和今天的政策分析结果,找出从弱势转为强势的板块,分析未来市场趋势: ### 昨天的分析: {yesterday_analysis['final_analysis']} ### 今天的分析: {today_analysis['final_analysis']} 请完成以下分析: 1. 【弱转强板块识别】 - 对比昨天的弱势板块和今天的强势板块,找出明显从弱势转为强势的板块 - 分析这些板块转强的原因 - 评估这种转变的持续性 2. 【政策趋势分析】 - 分析昨天和今天的政策,提取政策方向的变化或延续 - 判断政策组合对未来市场的影响 - 预测可能被持续关注和推动的行业方向 3. 【投资机会预判】 - 基于弱转强的板块和政策趋势,预判未来1-2周可能出现的投资机会 - 提出值得重点关注的细分行业 请用以下结构化格式输出: ### 一、弱转强板块识别 1. [板块名称] - 转变原因:... - 相关政策支持:... - 持续性评估:... 2. [板块名称] - 转变原因:... - 相关政策支持:... - 持续性评估:... ### 二、政策趋势变化 [分析政策方向的延续性和变化] ### 三、投资机会与建议 [提出具体的投资方向和关注重点] """ try: response = client.chat.completions.create( messages=[ { "role": "user", "content": prompt } ], model=openai_config['model'], temperature=0.3 ) trend_analysis = response.choices[0].message.content return { "yesterday_date": yesterday_analysis.get("time_range", "昨天"), "today_date": today_analysis.get("time_range", "今天"), "trend_analysis": trend_analysis } except Exception as e: print(f"生成趋势分析时出错: {e}") return None def main(): # 获取当前日期和昨天的日期 today = datetime.datetime.now().strftime('%Y-%m-%d') yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') today_file = datetime.datetime.now().strftime('%Y%m%d') yesterday_file = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d') # 获取今天的新闻,可以强制更新 today_news = get_news_by_date(today, force_update=False) # 获取昨天的新闻,只有在不存在时才获取 yesterday_file_exists = False temp_dir = ensure_temp_dir() for file in os.listdir(temp_dir): if file.startswith(f"{yesterday_file}_") and file.endswith(".csv"): yesterday_file_exists = True break yesterday_news = None if not yesterday_file_exists: print("未找到昨天的新闻数据,正在获取...") yesterday_news = get_news_by_date(yesterday, force_update=False) else: print("已找到昨天的新闻数据,正在读取...") yesterday_files = [] for file in os.listdir(temp_dir): if file.startswith(f"{yesterday_file}_") and file.endswith(".csv"): yesterday_files.append(os.path.join(temp_dir, file)) dfs = [] for file in yesterday_files: try: df = pd.read_csv(file, encoding='utf-8-sig') dfs.append(df) except Exception as e: print(f"读取文件{file}时出错: {e}") if dfs: yesterday_news = pd.concat(dfs, ignore_index=True) if 'datetime' in yesterday_news.columns and 'content' in yesterday_news.columns: yesterday_news = yesterday_news[['datetime', 'content']] # 分析今天和昨天的新闻 today_analysis = None yesterday_analysis = None if today_news is not None: print(f"获取到 {len(today_news)} 条今日新闻,正在分析...") today_analysis = analyze_news_in_batches(today_news) # 保存今天的原始新闻和分析结果 today_news.to_csv(f"news_raw_{today_file}.csv", index=False, encoding='utf-8-sig') if today_analysis: with open(f"news_analysis_{today_file}.txt", "w", encoding="utf-8") as f: f.write( f"分析范围: {today_analysis['news_count']} 条新闻,分成 {today_analysis['batch_count']} 批处理\n") f.write(f"时间范围: {today_analysis['time_range']}\n\n") f.write("最终分析结果:\n") f.write("-" * 80 + "\n") f.write(today_analysis['final_analysis']) f.write("\n" + "-" * 80 + "\n") print(f"今日分析结果已保存到 news_analysis_{today_file}.txt") else: print("无法获取今天的新闻数据") if yesterday_news is not None: print(f"获取到 {len(yesterday_news)} 条昨日新闻,正在分析...") yesterday_analysis = analyze_news_in_batches(yesterday_news) # 保存昨天的原始新闻和分析结果 yesterday_news.to_csv(f"news_raw_{yesterday_file}.csv", index=False, encoding='utf-8-sig') if yesterday_analysis: with open(f"news_analysis_{yesterday_file}.txt", "w", encoding="utf-8") as f: f.write( f"分析范围: {yesterday_analysis['news_count']} 条新闻,分成 {yesterday_analysis['batch_count']} 批处理\n") f.write(f"时间范围: {yesterday_analysis['time_range']}\n\n") f.write("最终分析结果:\n") f.write("-" * 80 + "\n") f.write(yesterday_analysis['final_analysis']) f.write("\n" + "-" * 80 + "\n") print(f"昨日分析结果已保存到 news_analysis_{yesterday_file}.txt") else: # 尝试读取昨天的分析结果文件 try: with open(f"news_analysis_{yesterday_file}.txt", "r", encoding="utf-8") as f: content = f.read() yesterday_analysis = { "final_analysis": content, "time_range": yesterday } print(f"已从文件中读取昨日分析结果") except Exception as e: print(f"无法读取昨日分析结果: {e}") # 进行趋势对比分析 if today_analysis and yesterday_analysis: print("正在进行今日与昨日的趋势对比分析...") trend_analysis = compare_trend_analysis(yesterday_analysis, today_analysis) if trend_analysis: # 保存趋势分析结果 with open(f"trend_analysis_{today_file}.txt", "w", encoding="utf-8") as f: f.write(f"对比分析范围: 昨日({yesterday}) 与 今日({today})\n\n") f.write("趋势分析结果:\n") f.write("-" * 80 + "\n") f.write(trend_analysis['trend_analysis']) f.write("\n" + "-" * 80 + "\n") print(f"趋势分析结果已保存到 trend_analysis_{today_file}.txt") # 打印趋势分析结果 print("\n=== 今日与昨日趋势对比分析 ===\n") print(f"对比分析范围: 昨日({yesterday}) 与 今日({today})") print("\n趋势分析结果:") print("-" * 80) print(trend_analysis['trend_analysis']) print("-" * 80) else: print("无法生成趋势分析结果") else: print("缺少今日或昨日分析结果,无法进行趋势对比") if __name__ == "__main__": main()