diff --git a/news_analyze.py b/news_analyze.py index 5671e62..80a0369 100644 --- a/news_analyze.py +++ b/news_analyze.py @@ -5,6 +5,7 @@ import math from openai import OpenAI from utils import load_config + def ensure_temp_dir(): """确保temp目录存在""" temp_dir = os.path.join(os.getcwd(), 'temp') @@ -13,12 +14,12 @@ def ensure_temp_dir(): return temp_dir -def get_today_news(force_update=False): +def get_news_by_date(date_str, force_update=False): """ - 获取今天的新闻数据,分批存储 - + 根据指定日期获取新闻数据 Args: - force_update (bool): 是否强制从API重新获取今日数据,默认为False + date_str (str): 日期字符串,格式为'YYYY-MM-DD' + force_update (bool): 是否强制从API重新获取数据 """ config = load_config() # 初始化 Tushare @@ -26,98 +27,114 @@ def get_today_news(force_update=False): ts.set_token(config['tushare_token']) pro = ts.pro_api() - # 获取今天的日期和当前时间 - today = datetime.datetime.now().strftime('%Y-%m-%d') - today_file = datetime.datetime.now().strftime('%Y%m%d') - current_time = datetime.datetime.now().strftime('%H:%M:%S') - start_time = "00:00:00" + # 日期格式转换 + date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d') + date_file = date_obj.strftime('%Y%m%d') - # 完整的开始和结束时间格式 - start_datetime = f"{today} {start_time}" - end_datetime = f"{today} {current_time}" - print(f"获取从 {start_datetime} 到 {end_datetime} 的新闻") + # 获取完整的开始和结束时间 + start_datetime = f"{date_str} 00:00:00" + end_datetime = f"{date_str} 23:59:59" + + print(f"获取 {date_str} 的新闻数据") # 确保temp目录存在 temp_dir = ensure_temp_dir() - # 检查是否已有今日数据文件 + # 检查是否已有该日期的数据文件 all_news_df = None - today_files = [] + date_files = [] if not force_update: for file in os.listdir(temp_dir): - if file.startswith(f"{today_file}_") and file.endswith(".csv"): - today_files.append(os.path.join(temp_dir, file)) + if file.startswith(f"{date_file}_") and file.endswith(".csv"): + date_files.append(os.path.join(temp_dir, file)) - if today_files: - # 如果有今日数据文件,直接读取合并 - print(f"发现{len(today_files)}个今日新闻数据文件,正在读取...") + if date_files: + # 如果有该日期的数据文件,直接读取合并 + print(f"发现{len(date_files)}个 {date_str} 的新闻数据文件,正在读取...") dfs = [] - for file in today_files: + for file in date_files: try: df = pd.read_csv(file, encoding='utf-8-sig') dfs.append(df) except Exception as e: print(f"读取文件{file}时出错: {e}") + if dfs: all_news_df = pd.concat(dfs, ignore_index=True) + return all_news_df else: - print("强制更新模式:忽略现有今日数据文件,从API重新获取") + print(f"强制更新模式:忽略现有 {date_str} 数据文件,从API重新获取") - if all_news_df is None: - # 需要从API获取数据 - try: - # 获取API数据 - all_news = [] - offset = 0 - batch_size = 1500 # 每次API调用的限制 + # 从API获取数据 + try: + # 获取API数据 + all_news = [] + last_timestamp = None + batch_size = 1500 # 每次获取1500条 - while True: - print(f"获取第{offset // batch_size + 1}批新闻数据...") - df = pro.news(src='sina', start_date=start_datetime, end_date=end_datetime, - channel='hongguan', offset=offset, limit=batch_size) + while True: + print(f"获取 {date_str} 的新闻数据批次...") - if df.empty or len(df) == 0: - break + # 第一次调用使用日期范围,后续调用使用时间戳 + if last_timestamp is None: + df = pro.news( + src='sina', + start_date=start_datetime, + end_date=end_datetime, + channel='hongguan', + limit=batch_size + ) + else: + # 使用上一批次的最早时间戳作为新的结束时间 + temp_end_time = last_timestamp + df = pro.news( + src='sina', + start_date=start_datetime, + end_date=temp_end_time, + channel='hongguan', + limit=batch_size + ) - all_news.append(df) + if df.empty or len(df) == 0: + break - # 判断是否已获取全部数据 - if len(df) < batch_size: - break + all_news.append(df) - offset += batch_size + # 更新时间戳,用于下一次请求 + if 'datetime' in df.columns and not df.empty: + # 获取当前批次中最早的时间戳 + last_timestamp = df['datetime'].min() + else: + break - if not all_news: - print("今天暂无新闻数据") - return None + # 如果返回的数据量小于批次大小,说明已获取完毕 + if len(df) < batch_size: + break - all_news_df = pd.concat(all_news, ignore_index=True) + if not all_news: + print(f"{date_str} 暂无新闻数据") + return None - # 分批存储数据,每1000条存一个文件 - chunk_size = 1000 - num_chunks = math.ceil(len(all_news_df) / chunk_size) + all_news_df = pd.concat(all_news, ignore_index=True) - for i in range(num_chunks): - start_idx = i * chunk_size - end_idx = min((i + 1) * chunk_size, len(all_news_df)) - chunk_df = all_news_df.iloc[start_idx:end_idx] + # 分批存储数据,每1000条存一个文件 + chunk_size = 1000 + num_chunks = math.ceil(len(all_news_df) / chunk_size) - # 保存到temp目录 - file_path = os.path.join(temp_dir, f"{today_file}_{i + 1}.csv") - chunk_df.to_csv(file_path, index=False, encoding='utf-8-sig') - print(f"已保存{len(chunk_df)}条新闻数据到 {file_path}") + for i in range(num_chunks): + start_idx = i * chunk_size + end_idx = min((i + 1) * chunk_size, len(all_news_df)) + chunk_df = all_news_df.iloc[start_idx:end_idx] - except Exception as e: - print(f"获取新闻数据时出错: {e}") + # 保存到temp目录 + file_path = os.path.join(temp_dir, f"{date_file}_{i + 1}.csv") + chunk_df.to_csv(file_path, index=False, encoding='utf-8-sig') + print(f"已保存{len(chunk_df)}条新闻数据到 {file_path}") - # 尝试从现有文件读取 - try: - print("尝试从备份文件读取数据...") - all_news_df = pd.read_csv('news_raw_20250408.csv', encoding='utf-8-sig') - except Exception as sub_e: - print(f"从备份文件读取数据时出错: {sub_e}") - return None + except Exception as e: + print(f"获取 {date_str} 新闻数据时出错: {e}") + return None # 只保留datetime和content列 if all_news_df is not None and not all_news_df.empty: @@ -163,7 +180,7 @@ def analyze_news_in_batches(news_df): batch_content += f"{row['datetime']}: {row['content']}\n\n" # 对当前批次进行初步分析 - prompt = f""" + prompt = f""" 你是一位资深财经分析师,请从下面的新闻中提取重要的政策信息和可能对股市产生影响的信息: {batch_content} @@ -188,12 +205,12 @@ def analyze_news_in_batches(news_df): ) batch_analysis = response.choices[0].message.content + batch_results.append({ "batch_number": i + 1, "news_count": len(batch_df), "analysis": batch_analysis }) - except Exception as e: print(f"分析第{i + 1}批新闻时出错: {e}") @@ -208,7 +225,7 @@ def analyze_news_in_batches(news_df): all_batch_analyses += result['analysis'] + "\n\n" # 最终的汇总分析 - final_prompt = f""" + final_prompt = f""" 你是一位资深财经分析师,现在我提供给你多批新闻的初步分析结果,请你进行最终的汇总和深入分析: {all_batch_analyses} @@ -226,13 +243,13 @@ def analyze_news_in_batches(news_df): - 简要说明影响逻辑(1-2句话) 3. 【市场影响预判】 - - 综合分析今日政策组合对A股市场的整体影响 + - 综合分析政策组合对A股市场的整体影响 - 预判短期(1周内)可能的市场反应 - 指出需要重点关注的政策执行时间节点 请用以下结构化格式输出: -### 一、今日核心政策摘要 +### 一、核心政策摘要 1. [政策部门/会议] 政策核心内容 - 影响板块:利好:XXX;利空:XXX - 影响逻辑:... @@ -266,64 +283,227 @@ def analyze_news_in_batches(news_df): "batch_results": batch_results, "final_analysis": final_analysis } - except Exception as e: print(f"生成最终分析时出错: {e}") return None +def compare_trend_analysis(yesterday_analysis, today_analysis): + """ + 比较昨天和今天的分析结果,找出弱转强的板块 + Args: + yesterday_analysis: 昨天的分析结果 + today_analysis: 今天的分析结果 + """ + if not yesterday_analysis or not today_analysis: + print("缺少分析数据,无法进行趋势对比") + return None + + config = load_config() + openai_config = config['openai_api'] + + # 初始化 OpenAI 客户端 + client = OpenAI( + api_key=openai_config['api_key'], + base_url=openai_config['base_url'] + ) + + # 构建对比分析提示 + prompt = f""" +你是一位资深财经分析师,请对比昨天和今天的政策分析结果,找出从弱势转为强势的板块,分析未来市场趋势: + +### 昨天的分析: +{yesterday_analysis['final_analysis']} + +### 今天的分析: +{today_analysis['final_analysis']} + +请完成以下分析: + +1. 【弱转强板块识别】 + - 对比昨天的弱势板块和今天的强势板块,找出明显从弱势转为强势的板块 + - 分析这些板块转强的原因 + - 评估这种转变的持续性 + +2. 【政策趋势分析】 + - 分析昨天和今天的政策,提取政策方向的变化或延续 + - 判断政策组合对未来市场的影响 + - 预测可能被持续关注和推动的行业方向 + +3. 【投资机会预判】 + - 基于弱转强的板块和政策趋势,预判未来1-2周可能出现的投资机会 + - 提出值得重点关注的细分行业 + +请用以下结构化格式输出: + +### 一、弱转强板块识别 +1. [板块名称] + - 转变原因:... + - 相关政策支持:... + - 持续性评估:... + +2. [板块名称] + - 转变原因:... + - 相关政策支持:... + - 持续性评估:... + +### 二、政策趋势变化 +[分析政策方向的延续性和变化] + +### 三、投资机会与建议 +[提出具体的投资方向和关注重点] +""" + + try: + response = client.chat.completions.create( + messages=[ + { + "role": "user", + "content": prompt + } + ], + model=openai_config['model'], + temperature=0.3 + ) + + trend_analysis = response.choices[0].message.content + + return { + "yesterday_date": yesterday_analysis.get("time_range", "昨天"), + "today_date": today_analysis.get("time_range", "今天"), + "trend_analysis": trend_analysis + } + except Exception as e: + print(f"生成趋势分析时出错: {e}") + return None + + def main(): - # 获取今日新闻,默认不强制更新 - news_df = get_today_news(force_update=False) + # 获取当前日期和昨天的日期 + today = datetime.datetime.now().strftime('%Y-%m-%d') + yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') + today_file = datetime.datetime.now().strftime('%Y%m%d') + yesterday_file = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d') - # 分析新闻 - if news_df is not None: - print(f"获取到 {len(news_df)} 条新闻,正在分批分析...") + # 获取今天的新闻,可以强制更新 + today_news = get_news_by_date(today, force_update=False) - # 使用分批分析方法 - analysis_result = analyze_news_in_batches(news_df) + # 获取昨天的新闻,只有在不存在时才获取 + yesterday_file_exists = False + temp_dir = ensure_temp_dir() - if analysis_result: - # 打印分析结果 - print("\n=== 今日新闻分析摘要 ===\n") - print(f"分析范围: {analysis_result['news_count']} 条新闻,分成 {analysis_result['batch_count']} 批处理") - print(f"时间范围: {analysis_result['time_range']}") - print("\n最终分析结果:") - print("-" * 80) - print(analysis_result['final_analysis']) - print("-" * 80) + for file in os.listdir(temp_dir): + if file.startswith(f"{yesterday_file}_") and file.endswith(".csv"): + yesterday_file_exists = True + break - # 保存原始新闻和分析结果 - today = datetime.datetime.now().strftime('%Y%m%d') + yesterday_news = None + if not yesterday_file_exists: + print("未找到昨天的新闻数据,正在获取...") + yesterday_news = get_news_by_date(yesterday, force_update=False) + else: + print("已找到昨天的新闻数据,正在读取...") + yesterday_files = [] + for file in os.listdir(temp_dir): + if file.startswith(f"{yesterday_file}_") and file.endswith(".csv"): + yesterday_files.append(os.path.join(temp_dir, file)) - # 保存原始新闻 - news_df.to_csv(f"news_raw_{today}.csv", index=False, encoding='utf-8-sig') + dfs = [] + for file in yesterday_files: + try: + df = pd.read_csv(file, encoding='utf-8-sig') + dfs.append(df) + except Exception as e: + print(f"读取文件{file}时出错: {e}") - # 保存分析结果 - with open(f"news_analysis_{today}.txt", "w", encoding="utf-8") as f: + if dfs: + yesterday_news = pd.concat(dfs, ignore_index=True) + if 'datetime' in yesterday_news.columns and 'content' in yesterday_news.columns: + yesterday_news = yesterday_news[['datetime', 'content']] + + # 分析今天和昨天的新闻 + today_analysis = None + yesterday_analysis = None + + if today_news is not None: + print(f"获取到 {len(today_news)} 条今日新闻,正在分析...") + today_analysis = analyze_news_in_batches(today_news) + + # 保存今天的原始新闻和分析结果 + today_news.to_csv(f"news_raw_{today_file}.csv", index=False, encoding='utf-8-sig') + + if today_analysis: + with open(f"news_analysis_{today_file}.txt", "w", encoding="utf-8") as f: f.write( - f"分析范围: {analysis_result['news_count']} 条新闻,分成 {analysis_result['batch_count']} 批处理\n") - f.write(f"时间范围: {analysis_result['time_range']}\n\n") - - # 写入各批次分析结果 - f.write("各批次分析结果:\n") - for batch in analysis_result['batch_results']: - f.write(f"\n--- 第{batch['batch_number']}批({batch['news_count']}条新闻)---\n") - f.write(batch['analysis']) - f.write("\n") - - # 写入最终分析结果 - f.write("\n\n最终分析结果:\n") + f"分析范围: {today_analysis['news_count']} 条新闻,分成 {today_analysis['batch_count']} 批处理\n") + f.write(f"时间范围: {today_analysis['time_range']}\n\n") + f.write("最终分析结果:\n") f.write("-" * 80 + "\n") - f.write(analysis_result['final_analysis']) + f.write(today_analysis['final_analysis']) f.write("\n" + "-" * 80 + "\n") - print(f"原始新闻已保存到 news_raw_{today}.csv") - print(f"分析结果已保存到 news_analysis_{today}.txt") - else: - print("无法获取分析结果") + print(f"今日分析结果已保存到 news_analysis_{today_file}.txt") else: - print("无法获取新闻数据") + print("无法获取今天的新闻数据") + + if yesterday_news is not None: + print(f"获取到 {len(yesterday_news)} 条昨日新闻,正在分析...") + yesterday_analysis = analyze_news_in_batches(yesterday_news) + + # 保存昨天的原始新闻和分析结果 + yesterday_news.to_csv(f"news_raw_{yesterday_file}.csv", index=False, encoding='utf-8-sig') + + if yesterday_analysis: + with open(f"news_analysis_{yesterday_file}.txt", "w", encoding="utf-8") as f: + f.write( + f"分析范围: {yesterday_analysis['news_count']} 条新闻,分成 {yesterday_analysis['batch_count']} 批处理\n") + f.write(f"时间范围: {yesterday_analysis['time_range']}\n\n") + f.write("最终分析结果:\n") + f.write("-" * 80 + "\n") + f.write(yesterday_analysis['final_analysis']) + f.write("\n" + "-" * 80 + "\n") + + print(f"昨日分析结果已保存到 news_analysis_{yesterday_file}.txt") + else: + # 尝试读取昨天的分析结果文件 + try: + with open(f"news_analysis_{yesterday_file}.txt", "r", encoding="utf-8") as f: + content = f.read() + yesterday_analysis = { + "final_analysis": content, + "time_range": yesterday + } + print(f"已从文件中读取昨日分析结果") + except Exception as e: + print(f"无法读取昨日分析结果: {e}") + + # 进行趋势对比分析 + if today_analysis and yesterday_analysis: + print("正在进行今日与昨日的趋势对比分析...") + trend_analysis = compare_trend_analysis(yesterday_analysis, today_analysis) + + if trend_analysis: + # 保存趋势分析结果 + with open(f"trend_analysis_{today_file}.txt", "w", encoding="utf-8") as f: + f.write(f"对比分析范围: 昨日({yesterday}) 与 今日({today})\n\n") + f.write("趋势分析结果:\n") + f.write("-" * 80 + "\n") + f.write(trend_analysis['trend_analysis']) + f.write("\n" + "-" * 80 + "\n") + + print(f"趋势分析结果已保存到 trend_analysis_{today_file}.txt") + + # 打印趋势分析结果 + print("\n=== 今日与昨日趋势对比分析 ===\n") + print(f"对比分析范围: 昨日({yesterday}) 与 今日({today})") + print("\n趋势分析结果:") + print("-" * 80) + print(trend_analysis['trend_analysis']) + print("-" * 80) + else: + print("无法生成趋势分析结果") + else: + print("缺少今日或昨日分析结果,无法进行趋势对比") if __name__ == "__main__":