feat(news_analyze.py): 添加新闻获取和分析功能,支持分批处理和结果保存

🐛 fix(data_downloader.py): 修复合并数据时返回空DataFrame的问题
📝 docs(utils.py): 更新配置文件说明,增加第三方API信息的提示
This commit is contained in:
Qihang Zhang 2025-04-09 17:19:19 +08:00
parent 19718bd59f
commit cb676b36f8
3 changed files with 342 additions and 6 deletions

View File

@ -226,12 +226,12 @@ def download_stock_data(pro, ts_code, start_date, end_date):
if daily_df.empty or daily_basic_df.empty or moneyflow_df.empty:
print(f"警告:{ts_code}daily数据不全无法合并")
return pd.DataFrame()
return None
# 确保每个DataFrame都有trade_date列作为合并键
if 'trade_date' not in daily_df.columns:
print(f"错误:{ts_code}的daily数据缺少trade_date列")
return pd.DataFrame()
return None
# 为方便处理,确保所有日期列是字符串类型
daily_df['trade_date'] = daily_df['trade_date'].astype(str)
@ -787,7 +787,7 @@ def perform_incremental_update(processes=4):
FROM
stock_metadata
WHERE
list_status = 'L' AND status = 1
(list_status = 'L' AND status = 1) OR (list_status = 'L' AND status = 3)
"""
result = conn.execute(text(query))
for row in result:
@ -999,7 +999,7 @@ def update_calendar(engine, pro):
def main():
parser = argparse.ArgumentParser(description='股票数据下载器')
parser.add_argument('--init', action='store_true', help='初始化元数据')
parser.add_argument('--mode', choices=['full', 'incremental', 'both'], default='full',
parser.add_argument('--mode', choices=['full', 'incremental', 'both'], default='incremental',
help='更新模式: full(全量), incremental(增量), both(两者都)')
parser.add_argument('--year', type=int, default=2020, help='起始年份(用于全量更新)')
parser.add_argument('--processes', type=int, default=4, help='并行进程数')

330
news_analyze.py Normal file
View File

@ -0,0 +1,330 @@
import datetime
import pandas as pd
import os
import math
from openai import OpenAI
from utils import load_config
def ensure_temp_dir():
"""确保temp目录存在"""
temp_dir = os.path.join(os.getcwd(), 'temp')
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
return temp_dir
def get_today_news(force_update=False):
"""
获取今天的新闻数据分批存储
Args:
force_update (bool): 是否强制从API重新获取今日数据默认为False
"""
config = load_config()
# 初始化 Tushare
import tushare as ts
ts.set_token(config['tushare_token'])
pro = ts.pro_api()
# 获取今天的日期和当前时间
today = datetime.datetime.now().strftime('%Y-%m-%d')
today_file = datetime.datetime.now().strftime('%Y%m%d')
current_time = datetime.datetime.now().strftime('%H:%M:%S')
start_time = "00:00:00"
# 完整的开始和结束时间格式
start_datetime = f"{today} {start_time}"
end_datetime = f"{today} {current_time}"
print(f"获取从 {start_datetime}{end_datetime} 的新闻")
# 确保temp目录存在
temp_dir = ensure_temp_dir()
# 检查是否已有今日数据文件
all_news_df = None
today_files = []
if not force_update:
for file in os.listdir(temp_dir):
if file.startswith(f"{today_file}_") and file.endswith(".csv"):
today_files.append(os.path.join(temp_dir, file))
if today_files:
# 如果有今日数据文件,直接读取合并
print(f"发现{len(today_files)}个今日新闻数据文件,正在读取...")
dfs = []
for file in today_files:
try:
df = pd.read_csv(file, encoding='utf-8-sig')
dfs.append(df)
except Exception as e:
print(f"读取文件{file}时出错: {e}")
if dfs:
all_news_df = pd.concat(dfs, ignore_index=True)
else:
print("强制更新模式忽略现有今日数据文件从API重新获取")
if all_news_df is None:
# 需要从API获取数据
try:
# 获取API数据
all_news = []
offset = 0
batch_size = 1500 # 每次API调用的限制
while True:
print(f"获取第{offset // batch_size + 1}批新闻数据...")
df = pro.news(src='sina', start_date=start_datetime, end_date=end_datetime,
channel='hongguan', offset=offset, limit=batch_size)
if df.empty or len(df) == 0:
break
all_news.append(df)
# 判断是否已获取全部数据
if len(df) < batch_size:
break
offset += batch_size
if not all_news:
print("今天暂无新闻数据")
return None
all_news_df = pd.concat(all_news, ignore_index=True)
# 分批存储数据每1000条存一个文件
chunk_size = 1000
num_chunks = math.ceil(len(all_news_df) / chunk_size)
for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = min((i + 1) * chunk_size, len(all_news_df))
chunk_df = all_news_df.iloc[start_idx:end_idx]
# 保存到temp目录
file_path = os.path.join(temp_dir, f"{today_file}_{i + 1}.csv")
chunk_df.to_csv(file_path, index=False, encoding='utf-8-sig')
print(f"已保存{len(chunk_df)}条新闻数据到 {file_path}")
except Exception as e:
print(f"获取新闻数据时出错: {e}")
# 尝试从现有文件读取
try:
print("尝试从备份文件读取数据...")
all_news_df = pd.read_csv('news_raw_20250408.csv', encoding='utf-8-sig')
except Exception as sub_e:
print(f"从备份文件读取数据时出错: {sub_e}")
return None
# 只保留datetime和content列
if all_news_df is not None and not all_news_df.empty:
if 'datetime' in all_news_df.columns and 'content' in all_news_df.columns:
return all_news_df[['datetime', 'content']]
else:
print("返回的数据结构不包含所需列")
return None
def analyze_news_in_batches(news_df):
"""将新闻按批次分析后汇总"""
if news_df is None or news_df.empty:
return None
config = load_config()
openai_config = config['openai_api']
# 初始化 OpenAI 客户端
client = OpenAI(
api_key=openai_config['api_key'],
base_url=openai_config['base_url']
)
# 将新闻按每批200条进行分组
batch_size = 200
total_batches = math.ceil(len(news_df) / batch_size)
batch_results = []
print(f"{len(news_df)}条新闻分成{total_batches}批进行分析,每批{batch_size}")
for i in range(total_batches):
start_idx = i * batch_size
end_idx = min((i + 1) * batch_size, len(news_df))
batch_df = news_df.iloc[start_idx:end_idx]
print(f"分析第{i + 1}/{total_batches}批新闻...")
# 合并当前批次新闻内容
batch_content = ""
for _, row in batch_df.iterrows():
batch_content += f"{row['datetime']}: {row['content']}\n\n"
# 对当前批次进行初步分析
prompt = f"""
你是一位资深财经分析师请从下面的新闻中提取重要的政策信息和可能对股市产生影响的信息
{batch_content}
请总结以下两点:
1. 这批新闻中提到的重要政策特别是中国颁布的法令等如果没有相关政策可以直接说明
2. 这些政策可能对哪些股市板块带来影响利好/利空
请只提取重要信息简明扼要地回答
"""
try:
response = client.chat.completions.create(
messages=[
{
"role": "user",
"content": prompt
}
],
model=openai_config['model'],
temperature=0.2
)
batch_analysis = response.choices[0].message.content
batch_results.append({
"batch_number": i + 1,
"news_count": len(batch_df),
"analysis": batch_analysis
})
except Exception as e:
print(f"分析第{i + 1}批新闻时出错: {e}")
# 汇总所有批次的分析结果
if not batch_results:
return None
# 合并所有分析结果,进行最终汇总分析
all_batch_analyses = ""
for result in batch_results:
all_batch_analyses += f"{result['batch_number']}批({result['news_count']}条新闻)分析结果:\n"
all_batch_analyses += result['analysis'] + "\n\n"
# 最终的汇总分析
final_prompt = f"""
你是一位资深财经分析师现在我提供给你多批新闻的初步分析结果请你进行最终的汇总和深入分析
{all_batch_analyses}
基于以上所有批次的分析请完成以下分析任务
1. 政策要点提炼
- 提取3-5条最重要的国家层面政策特别是中国的宏观经济政策
- 每条政策用一句话概括核心内容
- 标注政策发布部门/会议名称
2. 板块影响分析
- 对每条重要政策分析直接影响的相关行业板块
- 明确标注"利好板块""利空板块"
- 简要说明影响逻辑1-2句话
3. 市场影响预判
- 综合分析今日政策组合对A股市场的整体影响
- 预判短期1周内可能的市场反应
- 指出需要重点关注的政策执行时间节点
请用以下结构化格式输出
### 一、今日核心政策摘要
1. [政策部门/会议] 政策核心内容
- 影响板块利好XXX利空XXX
- 影响逻辑...
2. [政策部门/会议] 政策核心内容
- 影响板块利好XXX利空XXX
- 影响逻辑...
### 二、综合市场影响
[整体分析包含市场情绪预判和关键时间节点提醒]
"""
try:
final_response = client.chat.completions.create(
messages=[
{
"role": "user",
"content": final_prompt
}
],
model=openai_config['model'],
temperature=0.2
)
final_analysis = final_response.choices[0].message.content
return {
"news_count": len(news_df),
"batch_count": total_batches,
"time_range": f"{news_df['datetime'].iloc[0]}{news_df['datetime'].iloc[-1]}" if not news_df.empty else "",
"batch_results": batch_results,
"final_analysis": final_analysis
}
except Exception as e:
print(f"生成最终分析时出错: {e}")
return None
def main():
# 获取今日新闻,默认不强制更新
news_df = get_today_news(force_update=False)
# 分析新闻
if news_df is not None:
print(f"获取到 {len(news_df)} 条新闻,正在分批分析...")
# 使用分批分析方法
analysis_result = analyze_news_in_batches(news_df)
if analysis_result:
# 打印分析结果
print("\n=== 今日新闻分析摘要 ===\n")
print(f"分析范围: {analysis_result['news_count']} 条新闻,分成 {analysis_result['batch_count']} 批处理")
print(f"时间范围: {analysis_result['time_range']}")
print("\n最终分析结果:")
print("-" * 80)
print(analysis_result['final_analysis'])
print("-" * 80)
# 保存原始新闻和分析结果
today = datetime.datetime.now().strftime('%Y%m%d')
# 保存原始新闻
news_df.to_csv(f"news_raw_{today}.csv", index=False, encoding='utf-8-sig')
# 保存分析结果
with open(f"news_analysis_{today}.txt", "w", encoding="utf-8") as f:
f.write(
f"分析范围: {analysis_result['news_count']} 条新闻,分成 {analysis_result['batch_count']} 批处理\n")
f.write(f"时间范围: {analysis_result['time_range']}\n\n")
# 写入各批次分析结果
f.write("各批次分析结果:\n")
for batch in analysis_result['batch_results']:
f.write(f"\n--- 第{batch['batch_number']}批({batch['news_count']}条新闻)---\n")
f.write(batch['analysis'])
f.write("\n")
# 写入最终分析结果
f.write("\n\n最终分析结果:\n")
f.write("-" * 80 + "\n")
f.write(analysis_result['final_analysis'])
f.write("\n" + "-" * 80 + "\n")
print(f"原始新闻已保存到 news_raw_{today}.csv")
print(f"分析结果已保存到 news_analysis_{today}.txt")
else:
print("无法获取分析结果")
else:
print("无法获取新闻数据")
if __name__ == "__main__":
main()

View File

@ -3,7 +3,6 @@ import yaml
import tushare as ts
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import create_engine
# 模块级单例
@ -27,15 +26,21 @@ def load_config():
'port': 3306,
'database': 'tushare',
'charset': 'utf8'
},
'openai_api': {
'api_key': 'sk-your-api-key-here',
'base_url': 'https://api.tu-zi.com/v1',
'model': 'gpt-4'
}
}
with open(config_path, 'w') as f:
yaml.dump(config, f)
print(f"请在 {config_path} 中填入您的 tushare token 和 MySQL 连接信息")
print(f"请在 {config_path} 中填入您的 tushare token、MySQL 连接信息以及第三方 API 信息")
exit(1)
with open(config_path, 'r') as f:
_config = yaml.safe_load(f)
return _config
@ -47,4 +52,5 @@ def get_engine():
mysql = config['mysql']
connection_string = f"mysql+pymysql://{mysql['user']}:{mysql['password']}@{mysql['host']}:{mysql['port']}/{mysql['database']}?charset={mysql['charset']}&use_unicode=1"
_engine = create_engine(connection_string)
return _engine