feat(news_analyser): 改进Excel处理以保留格式并优化新闻分析评分标准

This commit is contained in:
Qihang Zhang 2025-04-21 13:43:32 +08:00
parent a2e85ff1a6
commit ea8801a6a5

View File

@ -15,16 +15,15 @@ import os
logger = get_logger() logger = get_logger()
def create_concept_analysis_excel(force_update=False): def create_concept_analysis_excel(force_update=False):
"""创建或更新板块分析Excel文件根据需要从20250101或现有表格最后日期到今天 """创建或更新板块分析Excel文件根据需要从20250101或现有表格最后日期到今天
参数: 参数:
force_update: 布尔值如果为True则强制从20250101开始重新获取所有数据 force_update: 布尔值如果为True则强制从20250101开始重新获取所有数据
""" """
# 检查文件是否存在 # 检查文件是否存在
excel_path = 'concept_analysis.xlsx' excel_path = 'concept_analysis.xlsx'
today = date.today() today = date.today()
# 如果强制更新直接设置起始日期为20250101 # 如果强制更新直接设置起始日期为20250101
if force_update: if force_update:
start_date = '20250101' start_date = '20250101'
@ -38,17 +37,14 @@ def create_concept_analysis_excel(force_update=False):
existing_z_t_df = pd.read_excel(excel_path, sheet_name='涨停板', index_col=0) existing_z_t_df = pd.read_excel(excel_path, sheet_name='涨停板', index_col=0)
# 将索引转换为日期时间类型 # 将索引转换为日期时间类型
existing_z_t_df.index = pd.to_datetime(existing_z_t_df.index) existing_z_t_df.index = pd.to_datetime(existing_z_t_df.index)
# 找出有实际数据(非空)的最后一个日期 # 找出有实际数据(非空)的最后一个日期
z_t_last_date = None z_t_last_date = None
# 检查z_t_df中有实际数据的最后一天 # 检查z_t_df中有实际数据的最后一天
for idx in sorted(existing_z_t_df.index, reverse=True): for idx in sorted(existing_z_t_df.index, reverse=True):
row = existing_z_t_df.loc[idx] row = existing_z_t_df.loc[idx]
if not row.isnull().all() and not (row == '').all(): if not row.isnull().all() and not (row == '').all():
z_t_last_date = idx z_t_last_date = idx
break break
# 如果没有找到有效的最后日期(可能是空表),则从头开始 # 如果没有找到有效的最后日期(可能是空表),则从头开始
if z_t_last_date is None: if z_t_last_date is None:
start_date = '20250101' start_date = '20250101'
@ -74,10 +70,8 @@ def create_concept_analysis_excel(force_update=False):
# 检查是否需要获取新数据 # 检查是否需要获取新数据
need_data_update = pd.to_datetime(start_date, format='%Y%m%d') <= pd.Timestamp(today) need_data_update = pd.to_datetime(start_date, format='%Y%m%d') <= pd.Timestamp(today)
# 定义空的新数据框架 # 定义空的新数据框架
new_z_t_df = pd.DataFrame() new_z_t_df = pd.DataFrame()
# 只有在需要更新数据且今天或之前有日期需要获取时才从数据库获取数据 # 只有在需要更新数据且今天或之前有日期需要获取时才从数据库获取数据
if need_data_update: if need_data_update:
# 获取新的板块数据 # 获取新的板块数据
@ -88,7 +82,6 @@ def create_concept_analysis_excel(force_update=False):
end_date=None, end_date=None,
filter_main_board=False filter_main_board=False
) )
# 如果获取到了新数据,处理这些数据 # 如果获取到了新数据,处理这些数据
if not kpl_concept.empty: if not kpl_concept.empty:
logger.info(f"获取到 {len(kpl_concept)} 条新数据") logger.info(f"获取到 {len(kpl_concept)} 条新数据")
@ -96,7 +89,6 @@ def create_concept_analysis_excel(force_update=False):
kpl_concept['z_t_num'] = pd.to_numeric(kpl_concept['z_t_num'], errors='coerce') kpl_concept['z_t_num'] = pd.to_numeric(kpl_concept['z_t_num'], errors='coerce')
# 将交易日期转换为日期类型 # 将交易日期转换为日期类型
kpl_concept['trade_date'] = pd.to_datetime(kpl_concept['trade_date'], format='%Y%m%d') kpl_concept['trade_date'] = pd.to_datetime(kpl_concept['trade_date'], format='%Y%m%d')
# 创建新数据的透视表 # 创建新数据的透视表
new_z_t_df = kpl_concept.set_index(['trade_date', 'name'])['z_t_num'].unstack(level='name') new_z_t_df = kpl_concept.set_index(['trade_date', 'name'])['z_t_num'].unstack(level='name')
new_z_t_df = new_z_t_df.replace(0, np.nan) # 将0替换为NaN而不是空字符串 new_z_t_df = new_z_t_df.replace(0, np.nan) # 将0替换为NaN而不是空字符串
@ -112,9 +104,11 @@ def create_concept_analysis_excel(force_update=False):
z_t_df = pd.concat([existing_z_t_df, new_z_t_df]) z_t_df = pd.concat([existing_z_t_df, new_z_t_df])
# 处理可能的重复行 # 处理可能的重复行
z_t_df = z_t_df[~z_t_df.index.duplicated(keep='last')] z_t_df = z_t_df[~z_t_df.index.duplicated(keep='last')]
logger.info(f"合并了现有数据和新数据,共 {len(z_t_df)}")
else: else:
# 如果没有新数据,仍使用现有数据 # 如果没有新数据,仍使用现有数据
z_t_df = existing_z_t_df.copy() z_t_df = existing_z_t_df.copy()
logger.info(f"没有新数据,保留原有数据 {len(z_t_df)} 行,仅添加新日期的空行")
else: else:
# 如果是第一次创建表格且有新数据 # 如果是第一次创建表格且有新数据
if not new_z_t_df.empty: if not new_z_t_df.empty:
@ -127,7 +121,6 @@ def create_concept_analysis_excel(force_update=False):
all_columns = set() all_columns = set()
if not z_t_df.empty: if not z_t_df.empty:
all_columns.update(z_t_df.columns) all_columns.update(z_t_df.columns)
# 如果没有任何列但需要创建表格,尝试从数据库获取列名 # 如果没有任何列但需要创建表格,尝试从数据库获取列名
if not all_columns and existing_z_t_df is None: if not all_columns and existing_z_t_df is None:
logger.info("尝试从数据库获取板块名称以创建空表格...") logger.info("尝试从数据库获取板块名称以创建空表格...")
@ -153,7 +146,6 @@ def create_concept_analysis_excel(force_update=False):
# 确保最晚日期为今天 # 确保最晚日期为今天
max_date = pd.Timestamp(today) max_date = pd.Timestamp(today)
logger.info(f"创建从 {min_date.strftime('%Y%m%d')}{max_date.strftime('%Y%m%d')} 的日期范围") logger.info(f"创建从 {min_date.strftime('%Y%m%d')}{max_date.strftime('%Y%m%d')} 的日期范围")
# 创建一个包含所有日期的连续序列(包括非交易日) # 创建一个包含所有日期的连续序列(包括非交易日)
@ -168,6 +160,9 @@ def create_concept_analysis_excel(force_update=False):
if col not in z_t_df.columns: if col not in z_t_df.columns:
z_t_df[col] = np.nan z_t_df[col] = np.nan
# 记录处理前的行数
original_row_count = len(z_t_df)
# 确保透视表包含所有日期 # 确保透视表包含所有日期
# 首先获取唯一的交易日 # 首先获取唯一的交易日
unique_trade_dates = set(z_t_df.index) if not z_t_df.empty else set() unique_trade_dates = set(z_t_df.index) if not z_t_df.empty else set()
@ -183,11 +178,62 @@ def create_concept_analysis_excel(force_update=False):
# 重新排序索引,确保日期按顺序排列 # 重新排序索引,确保日期按顺序排列
z_t_df = z_t_df.sort_index() z_t_df = z_t_df.sort_index()
# 创建Excel文件并写入数据 # 记录处理后的行数
with pd.ExcelWriter(excel_path) as writer: new_row_count = len(z_t_df)
z_t_df.to_excel(writer, sheet_name='涨停板') logger.info(
f"原有数据行数: {original_row_count}, 添加空行后总行数: {new_row_count}, 新增空行: {new_row_count - original_row_count}")
# 保存Excel文件
sheet_name = '涨停板'
# 如果文件存在且非强制更新模式使用openpyxl保留格式
if os.path.exists(excel_path) and not force_update:
import openpyxl
from openpyxl.utils.dataframe import dataframe_to_rows
# 加载现有Excel文件
logger.info("使用openpyxl加载Excel文件以保留格式...")
wb = openpyxl.load_workbook(excel_path)
# 检查是否存在目标工作表,如不存在则创建
if sheet_name in wb.sheetnames:
ws = wb[sheet_name]
# 清除工作表内容但保留格式
for row in ws.iter_rows(min_row=2): # 跳过标题行
for cell in row:
cell.value = None
else:
ws = wb.create_sheet(sheet_name)
# 将DataFrame写入工作表
# 首先写入索引名称到A1单元格
ws.cell(row=1, column=1, value="日期")
# 写入列名从第2列开始
for col_idx, col_name in enumerate(z_t_df.columns, start=2):
ws.cell(row=1, column=col_idx, value=col_name)
# 写入索引和数据从第2行开始
for row_idx, (idx, row_data) in enumerate(z_t_df.iterrows(), start=2):
# 写入索引日期到第1列
ws.cell(row=row_idx, column=1, value=idx)
# 写入数据从第2列开始
for col_idx, value in enumerate(row_data, start=2):
if pd.isna(value):
ws.cell(row=row_idx, column=col_idx, value=None)
else:
ws.cell(row=row_idx, column=col_idx, value=value)
# 保存工作簿
wb.save(excel_path)
logger.info(f"Excel文件已更新并保留原格式{excel_path}")
else:
# 对于新文件或强制更新使用pandas的to_excel
with pd.ExcelWriter(excel_path) as writer:
z_t_df.to_excel(writer, sheet_name=sheet_name)
logger.info(f"Excel文件已{'更新' if os.path.exists(excel_path) else '创建'}{excel_path}")
logger.info(f"Excel文件已成功{'更新' if os.path.exists(excel_path) else '创建'}{excel_path}")
logger.info(f"已包含 {len(missing_dates)} 个非交易日或未来日期") logger.info(f"已包含 {len(missing_dates)} 个非交易日或未来日期")
@ -317,10 +363,9 @@ def analyze_sectors_from_news(force_update=False):
# 6. 创建系统提示词,强调格式必须严格遵守 # 6. 创建系统提示词,强调格式必须严格遵守
sector_list_text = ", ".join(sector_names) sector_list_text = ", ".join(sector_names)
system_prompt = """你是一位资深宏观新闻分析师和股票板块研究专家。 system_prompt = """你是一位资深宏观新闻分析师和股票板块研究专家。
请分析以下一天内的宏观新闻集合识别出对股票板块的潜在影响 请分析以下一天内的新闻集合识别出对股票板块的潜在影响
你只能从以下列出的板块名称中选择必须使用完全一致的板块名称一个字都不能改变: 你只能从以下列出的板块名称中选择必须使用完全一致的板块名称一个字都不能改变:
{} ===输出格式要求极其重要===
===格式要求极其重要===
你必须严格按照以下格式输出结果不要添加任何额外的标点或修改格式: 你必须严格按照以下格式输出结果不要添加任何额外的标点或修改格式:
## 利好板块 ## 利好板块
[板块名称]:[影响评分]-[简要解释] [板块名称]:[影响评分]-[简要解释]
@ -329,9 +374,6 @@ def analyze_sectors_from_news(force_update=False):
## 利空板块 ## 利空板块
[板块名称]:[影响评分]-[简要解释] [板块名称]:[影响评分]-[简要解释]
[板块名称]:[影响评分]-[简要解释] [板块名称]:[影响评分]-[简要解释]
...
## 宏观分析摘要
[简要分析新闻对市场的整体影响]
===格式示例精确参考=== ===格式示例精确参考===
如下所示不要修改格式只需替换内容: 如下所示不要修改格式只需替换内容:
## 利好板块 ## 利好板块
@ -340,20 +382,46 @@ def analyze_sectors_from_news(force_update=False):
## 利空板块 ## 利空板块
[板块C]:[-7]-[这是对板块C影响的解释] [板块C]:[-7]-[这是对板块C影响的解释]
[板块D]:[-9]-[这是对板块D影响的解释] [板块D]:[-9]-[这是对板块D影响的解释]
## 宏观分析摘要
[这里是宏观分析摘要内容]
===重要规则=== ===重要规则===
1. 严格遵循上述格式不要添加额外的冒号或任何其他标点符号 1. 严格遵循上述格式不要添加额外的冒号或任何其他标点符号
2. 方括号[]在最终输出中必须保留 2. 方括号[]在最终输出中必须保留
3. 评分必须是-10到10之间的整数且不为0 3. 评分必须是-10到10之间的整数且不为0
4. 利好板块评分为正数(1-10)利空板块评分为负数(-1-10) 4. 利好板块评分为正数(1-10)利空板块评分为负数(-1-10)
5. 只能使用提供的板块名称不得修改一个字""".format(sector_list_text) 5. 评分标准如下:
===利好/利空评分标准===
A. 政策与监管因素 (基于发布主体和政策确定性):
* 10/(-10): 国家级明确政策 - 国务院中央部委或央行发布的正式政策法规或措施
* 9/(-9): 省级明确政策 - 省级政府部门发布的正式政策法规或措施
* 8/(-8): 市级明确政策 - 市级政府部门发布的正式政策法规或措施
* 7/(-7): 行业协会明确指导 - 全国性行业协会发布的规范性文件或指导意见
* 6/(-6): 国家级政策导向 - 国家级部门的讲话会议精神或指导性意见
* 5/(-5): 省级政策导向 - 省级部门的讲话会议精神或指导性意见
* 4/(-4): 市级政策导向 - 市级部门的讲话会议精神或指导性意见
* 3/(-3): 行业协会政策导向 - 行业协会的讲话会议精神或建议
B. 市场与行业事件 (基于事件影响范围与程度):
* 10/(-10): 重大市场事件 - 对整个行业产生全局性长期影响的重大事件
* 8/(-8): 显著市场事件 - 对行业产生明显影响的重要事件
* 6/(-6): 中等市场事件 - 对行业产生一定影响的事件
* 4/(-4): 一般市场事件 - 对行业产生有限影响的普通事件
* 2/(-2): 轻微市场事件 - 对行业产生轻微影响的小型事件
C. 信息来源可信度与影响程度 (其他情况):
* 5/(-5): 权威专家观点 - 行业公认的顶级专家或研究机构的深度分析
* 3/(-3): 一般专家观点 - 行业内专家学者的分析观点
* 2/(-2): 市场分析 - 主流媒体或分析师的市场分析
* 1/(-1): 一般信息 - 普通媒体报道或一般性市场信息
6. 根据新闻性质从以上三类标准中选择最适合的一类进行评分
7. 只能使用提供的板块名称不得修改一个字确保你输出的板块名称能够在""" + sector_list_text + """中找到并且选择的板块名称和你判断的以及有着高度相关性不要延伸概念实事求是"""
# 7. 存储分析结果 # 7. 存储分析结果
sector_impact_results = {} sector_impact_results = {}
# 8. 对每天的新闻进行分析 # 8. 对每天的新闻进行分析
logger.info("\n开始分析每日宏观新闻对板块的影响...") logger.info("开始分析每日宏观新闻对板块的影响...")
for index, row in result.iterrows(): for index, row in result.iterrows():
date_str = row['date_str'] date_str = row['date_str']
content_list = row['content'] content_list = row['content']
@ -374,16 +442,16 @@ def analyze_sectors_from_news(force_update=False):
'sector_scores': sector_scores 'sector_scores': sector_scores
} }
# 输出分析结果 # 输出分析结果
logger.info(f"\n=============== {date_str} 板块影响分析 ({len(content_list)}条新闻) ===============") logger.info(f"=============== {date_str} 板块影响分析 ({len(content_list)}条新闻) ===============")
logger.info("利好板块:") logger.info("利好板块:")
for sector, score in sorted([(k, v) for k, v in sector_scores.items() if v > 0], key=lambda x: -x[1]): for sector, score in sorted([(k, v) for k, v in sector_scores.items() if v > 0], key=lambda x: -x[1]):
logger.info(f" {sector}: +{score}") logger.info(f" {sector}: +{score}")
logger.info("\n利空板块:") logger.info("利空板块:")
for sector, score in sorted([(k, v) for k, v in sector_scores.items() if v < 0], key=lambda x: x[1]): for sector, score in sorted([(k, v) for k, v in sector_scores.items() if v < 0], key=lambda x: x[1]):
logger.info(f" {sector}: {score}") logger.info(f" {sector}: {score}")
logger.info("=" * 60) logger.info("=" * 60)
logger.info(f"\n完成分析! 共分析了 {len(sector_impact_results)} 天的宏观新闻") logger.info(f"完成分析! 共分析了 {len(sector_impact_results)} 天的宏观新闻")
# 9. 将结果更新到Excel中 # 9. 将结果更新到Excel中
update_excel_with_colors(sector_impact_results) update_excel_with_colors(sector_impact_results)
@ -406,7 +474,7 @@ def parse_sector_impact(analysis_text):
matches = re.findall(pattern, analysis_text) matches = re.findall(pattern, analysis_text)
# 处理匹配结果 # 处理匹配结果
logger.info("\n== 提取的评分 ==") logger.info("== 提取的评分 ==")
for sector_name, score in matches: for sector_name, score in matches:
sector_name = sector_name.strip() sector_name = sector_name.strip()
try: try:
@ -428,7 +496,7 @@ def update_excel_with_colors(sector_impact_results):
logger.info(f"错误: {excel_path} 文件不存在,无法更新颜色") logger.info(f"错误: {excel_path} 文件不存在,无法更新颜色")
return return
logger.info(f"\n正在更新Excel文件 {excel_path} 的颜色标注...") logger.info(f"正在更新Excel文件 {excel_path} 的颜色标注...")
# 加载Excel文件 # 加载Excel文件
workbook = load_workbook(excel_path) workbook = load_workbook(excel_path)
@ -540,4 +608,4 @@ if __name__ == '__main__':
create_concept_analysis_excel() create_concept_analysis_excel()
# 执行宏观新闻分析并更新Excel # 执行宏观新闻分析并更新Excel
sector_impact_results = analyze_sectors_from_news() sector_impact_results = analyze_sectors_from_news(force_update=False)