diff --git a/data_downloader.py b/data_downloader.py index 1370e34..8dcffc4 100644 --- a/data_downloader.py +++ b/data_downloader.py @@ -226,12 +226,12 @@ def download_stock_data(pro, ts_code, start_date, end_date): if daily_df.empty or daily_basic_df.empty or moneyflow_df.empty: print(f"警告:{ts_code}daily数据不全,无法合并") - return pd.DataFrame() + return None # 确保每个DataFrame都有trade_date列作为合并键 if 'trade_date' not in daily_df.columns: print(f"错误:{ts_code}的daily数据缺少trade_date列") - return pd.DataFrame() + return None # 为方便处理,确保所有日期列是字符串类型 daily_df['trade_date'] = daily_df['trade_date'].astype(str) @@ -787,7 +787,7 @@ def perform_incremental_update(processes=4): FROM stock_metadata WHERE - list_status = 'L' AND status = 1 + (list_status = 'L' AND status = 1) OR (list_status = 'L' AND status = 3) """ result = conn.execute(text(query)) for row in result: @@ -999,7 +999,7 @@ def update_calendar(engine, pro): def main(): parser = argparse.ArgumentParser(description='股票数据下载器') parser.add_argument('--init', action='store_true', help='初始化元数据') - parser.add_argument('--mode', choices=['full', 'incremental', 'both'], default='full', + parser.add_argument('--mode', choices=['full', 'incremental', 'both'], default='incremental', help='更新模式: full(全量), incremental(增量), both(两者都)') parser.add_argument('--year', type=int, default=2020, help='起始年份(用于全量更新)') parser.add_argument('--processes', type=int, default=4, help='并行进程数') diff --git a/news_analyze.py b/news_analyze.py new file mode 100644 index 0000000..5671e62 --- /dev/null +++ b/news_analyze.py @@ -0,0 +1,330 @@ +import datetime +import pandas as pd +import os +import math +from openai import OpenAI +from utils import load_config + +def ensure_temp_dir(): + """确保temp目录存在""" + temp_dir = os.path.join(os.getcwd(), 'temp') + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + return temp_dir + + +def get_today_news(force_update=False): + """ + 获取今天的新闻数据,分批存储 + + Args: + force_update (bool): 是否强制从API重新获取今日数据,默认为False + """ + config = load_config() + # 初始化 Tushare + import tushare as ts + ts.set_token(config['tushare_token']) + pro = ts.pro_api() + + # 获取今天的日期和当前时间 + today = datetime.datetime.now().strftime('%Y-%m-%d') + today_file = datetime.datetime.now().strftime('%Y%m%d') + current_time = datetime.datetime.now().strftime('%H:%M:%S') + start_time = "00:00:00" + + # 完整的开始和结束时间格式 + start_datetime = f"{today} {start_time}" + end_datetime = f"{today} {current_time}" + print(f"获取从 {start_datetime} 到 {end_datetime} 的新闻") + + # 确保temp目录存在 + temp_dir = ensure_temp_dir() + + # 检查是否已有今日数据文件 + all_news_df = None + today_files = [] + + if not force_update: + for file in os.listdir(temp_dir): + if file.startswith(f"{today_file}_") and file.endswith(".csv"): + today_files.append(os.path.join(temp_dir, file)) + + if today_files: + # 如果有今日数据文件,直接读取合并 + print(f"发现{len(today_files)}个今日新闻数据文件,正在读取...") + dfs = [] + for file in today_files: + try: + df = pd.read_csv(file, encoding='utf-8-sig') + dfs.append(df) + except Exception as e: + print(f"读取文件{file}时出错: {e}") + if dfs: + all_news_df = pd.concat(dfs, ignore_index=True) + else: + print("强制更新模式:忽略现有今日数据文件,从API重新获取") + + if all_news_df is None: + # 需要从API获取数据 + try: + # 获取API数据 + all_news = [] + offset = 0 + batch_size = 1500 # 每次API调用的限制 + + while True: + print(f"获取第{offset // batch_size + 1}批新闻数据...") + df = pro.news(src='sina', start_date=start_datetime, end_date=end_datetime, + channel='hongguan', offset=offset, limit=batch_size) + + if df.empty or len(df) == 0: + break + + all_news.append(df) + + # 判断是否已获取全部数据 + if len(df) < batch_size: + break + + offset += batch_size + + if not all_news: + print("今天暂无新闻数据") + return None + + all_news_df = pd.concat(all_news, ignore_index=True) + + # 分批存储数据,每1000条存一个文件 + chunk_size = 1000 + num_chunks = math.ceil(len(all_news_df) / chunk_size) + + for i in range(num_chunks): + start_idx = i * chunk_size + end_idx = min((i + 1) * chunk_size, len(all_news_df)) + chunk_df = all_news_df.iloc[start_idx:end_idx] + + # 保存到temp目录 + file_path = os.path.join(temp_dir, f"{today_file}_{i + 1}.csv") + chunk_df.to_csv(file_path, index=False, encoding='utf-8-sig') + print(f"已保存{len(chunk_df)}条新闻数据到 {file_path}") + + except Exception as e: + print(f"获取新闻数据时出错: {e}") + + # 尝试从现有文件读取 + try: + print("尝试从备份文件读取数据...") + all_news_df = pd.read_csv('news_raw_20250408.csv', encoding='utf-8-sig') + except Exception as sub_e: + print(f"从备份文件读取数据时出错: {sub_e}") + return None + + # 只保留datetime和content列 + if all_news_df is not None and not all_news_df.empty: + if 'datetime' in all_news_df.columns and 'content' in all_news_df.columns: + return all_news_df[['datetime', 'content']] + else: + print("返回的数据结构不包含所需列") + + return None + + +def analyze_news_in_batches(news_df): + """将新闻按批次分析后汇总""" + if news_df is None or news_df.empty: + return None + + config = load_config() + openai_config = config['openai_api'] + + # 初始化 OpenAI 客户端 + client = OpenAI( + api_key=openai_config['api_key'], + base_url=openai_config['base_url'] + ) + + # 将新闻按每批200条进行分组 + batch_size = 200 + total_batches = math.ceil(len(news_df) / batch_size) + batch_results = [] + + print(f"将{len(news_df)}条新闻分成{total_batches}批进行分析,每批{batch_size}条") + + for i in range(total_batches): + start_idx = i * batch_size + end_idx = min((i + 1) * batch_size, len(news_df)) + batch_df = news_df.iloc[start_idx:end_idx] + + print(f"分析第{i + 1}/{total_batches}批新闻...") + + # 合并当前批次新闻内容 + batch_content = "" + for _, row in batch_df.iterrows(): + batch_content += f"{row['datetime']}: {row['content']}\n\n" + + # 对当前批次进行初步分析 + prompt = f""" +你是一位资深财经分析师,请从下面的新闻中提取重要的政策信息和可能对股市产生影响的信息: + +{batch_content} + +请总结以下两点: +1. 这批新闻中提到的重要政策(特别是中国颁布的法令等),如果没有相关政策可以直接说明 +2. 这些政策可能对哪些股市板块带来影响(利好/利空) + +请只提取重要信息,简明扼要地回答。 +""" + + try: + response = client.chat.completions.create( + messages=[ + { + "role": "user", + "content": prompt + } + ], + model=openai_config['model'], + temperature=0.2 + ) + + batch_analysis = response.choices[0].message.content + batch_results.append({ + "batch_number": i + 1, + "news_count": len(batch_df), + "analysis": batch_analysis + }) + + except Exception as e: + print(f"分析第{i + 1}批新闻时出错: {e}") + + # 汇总所有批次的分析结果 + if not batch_results: + return None + + # 合并所有分析结果,进行最终汇总分析 + all_batch_analyses = "" + for result in batch_results: + all_batch_analyses += f"第{result['batch_number']}批({result['news_count']}条新闻)分析结果:\n" + all_batch_analyses += result['analysis'] + "\n\n" + + # 最终的汇总分析 + final_prompt = f""" +你是一位资深财经分析师,现在我提供给你多批新闻的初步分析结果,请你进行最终的汇总和深入分析: + +{all_batch_analyses} + +基于以上所有批次的分析,请完成以下分析任务: + +1. 【政策要点提炼】 + - 提取3-5条最重要的国家层面政策(特别是中国的宏观经济政策) + - 每条政策用一句话概括核心内容 + - 标注政策发布部门/会议名称 + +2. 【板块影响分析】 + - 对每条重要政策,分析直接影响的相关行业板块 + - 明确标注"利好板块"和"利空板块" + - 简要说明影响逻辑(1-2句话) + +3. 【市场影响预判】 + - 综合分析今日政策组合对A股市场的整体影响 + - 预判短期(1周内)可能的市场反应 + - 指出需要重点关注的政策执行时间节点 + +请用以下结构化格式输出: + +### 一、今日核心政策摘要 +1. [政策部门/会议] 政策核心内容 + - 影响板块:利好:XXX;利空:XXX + - 影响逻辑:... + +2. [政策部门/会议] 政策核心内容 + - 影响板块:利好:XXX;利空:XXX + - 影响逻辑:... + +### 二、综合市场影响 +[整体分析,包含市场情绪预判和关键时间节点提醒] +""" + + try: + final_response = client.chat.completions.create( + messages=[ + { + "role": "user", + "content": final_prompt + } + ], + model=openai_config['model'], + temperature=0.2 + ) + + final_analysis = final_response.choices[0].message.content + + return { + "news_count": len(news_df), + "batch_count": total_batches, + "time_range": f"{news_df['datetime'].iloc[0]} 至 {news_df['datetime'].iloc[-1]}" if not news_df.empty else "", + "batch_results": batch_results, + "final_analysis": final_analysis + } + + except Exception as e: + print(f"生成最终分析时出错: {e}") + return None + + +def main(): + # 获取今日新闻,默认不强制更新 + news_df = get_today_news(force_update=False) + + # 分析新闻 + if news_df is not None: + print(f"获取到 {len(news_df)} 条新闻,正在分批分析...") + + # 使用分批分析方法 + analysis_result = analyze_news_in_batches(news_df) + + if analysis_result: + # 打印分析结果 + print("\n=== 今日新闻分析摘要 ===\n") + print(f"分析范围: {analysis_result['news_count']} 条新闻,分成 {analysis_result['batch_count']} 批处理") + print(f"时间范围: {analysis_result['time_range']}") + print("\n最终分析结果:") + print("-" * 80) + print(analysis_result['final_analysis']) + print("-" * 80) + + # 保存原始新闻和分析结果 + today = datetime.datetime.now().strftime('%Y%m%d') + + # 保存原始新闻 + news_df.to_csv(f"news_raw_{today}.csv", index=False, encoding='utf-8-sig') + + # 保存分析结果 + with open(f"news_analysis_{today}.txt", "w", encoding="utf-8") as f: + f.write( + f"分析范围: {analysis_result['news_count']} 条新闻,分成 {analysis_result['batch_count']} 批处理\n") + f.write(f"时间范围: {analysis_result['time_range']}\n\n") + + # 写入各批次分析结果 + f.write("各批次分析结果:\n") + for batch in analysis_result['batch_results']: + f.write(f"\n--- 第{batch['batch_number']}批({batch['news_count']}条新闻)---\n") + f.write(batch['analysis']) + f.write("\n") + + # 写入最终分析结果 + f.write("\n\n最终分析结果:\n") + f.write("-" * 80 + "\n") + f.write(analysis_result['final_analysis']) + f.write("\n" + "-" * 80 + "\n") + + print(f"原始新闻已保存到 news_raw_{today}.csv") + print(f"分析结果已保存到 news_analysis_{today}.txt") + else: + print("无法获取分析结果") + else: + print("无法获取新闻数据") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/utils.py b/utils.py index 2d466ed..15e8b59 100644 --- a/utils.py +++ b/utils.py @@ -3,7 +3,6 @@ import yaml import tushare as ts import pandas as pd from datetime import datetime, timedelta - from sqlalchemy import create_engine # 模块级单例 @@ -27,15 +26,21 @@ def load_config(): 'port': 3306, 'database': 'tushare', 'charset': 'utf8' + }, + 'openai_api': { + 'api_key': 'sk-your-api-key-here', + 'base_url': 'https://api.tu-zi.com/v1', + 'model': 'gpt-4' } } with open(config_path, 'w') as f: yaml.dump(config, f) - print(f"请在 {config_path} 中填入您的 tushare token 和 MySQL 连接信息") + print(f"请在 {config_path} 中填入您的 tushare token、MySQL 连接信息以及第三方 API 信息") exit(1) with open(config_path, 'r') as f: _config = yaml.safe_load(f) + return _config @@ -47,4 +52,5 @@ def get_engine(): mysql = config['mysql'] connection_string = f"mysql+pymysql://{mysql['user']}:{mysql['password']}@{mysql['host']}:{mysql['port']}/{mysql['database']}?charset={mysql['charset']}&use_unicode=1" _engine = create_engine(connection_string) + return _engine \ No newline at end of file