From ef7eabf3ce9b3fd1f2a25b3901da99bc909bd42f Mon Sep 17 00:00:00 2001 From: Qihang Zhang Date: Sat, 19 Apr 2025 23:18:16 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(news):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=96=B0=E9=97=BB=E6=95=B0=E6=8D=AE=E8=8E=B7=E5=8F=96=E5=92=8C?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=99=BA=E8=83=BD=E5=A2=9E=E9=87=8F=E6=9B=B4=E6=96=B0=E5=92=8C?= =?UTF-8?q?=E5=88=86=E6=89=B9=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data_manager.py | 118 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/data_manager.py b/data_manager.py index 85b4724..72feb26 100644 --- a/data_manager.py +++ b/data_manager.py @@ -196,6 +196,124 @@ class DataFetcher: logger.info(f"更新API: {api}") DataFetcher.get_basic(api_name=api) + # 更新新闻数据 + logger.info("更新新闻数据") + DataFetcher.get_news() + + @staticmethod + def get_news(src='sina', fields=None): + """ + 智能获取新闻数据,只获取数据库中不存在的部分 + + 参数: + src (str): 新闻来源,如'sina'、'wallstreetcn'等 + fields (str): 需要获取的字段,默认为'datetime,title,channels,content' + + 返回: + bool: 是否成功获取数据 + """ + table_name = 'news' + + # 固定的历史起始点 + HISTORY_START = '2025-04-15 00:00:00' + + # 如果未指定字段,设置默认字段 + if fields is None: + fields = 'datetime,title,channels,content' + + # 获取当前时间作为结束时间 + end_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # 检查表是否存在,确定起始时间 + if not db_manager.table_exists(table_name): + logger.info(f"表 {table_name} 不存在,将获取从 {HISTORY_START} 至今的所有新闻数据") + # 从固定起始点获取所有数据 + start_date = HISTORY_START + else: + # 表存在,查询最新的新闻时间 + try: + newest_query = f"SELECT MAX(datetime) as max_time FROM {table_name}" + newest_result = db_manager.query(newest_query) + + # 如果有数据并且值不为None + if not newest_result.empty and newest_result['max_time'].iloc[0] is not None: + newest_time = newest_result['max_time'].iloc[0] + logger.info(f"数据库中最新的新闻时间为: {newest_time}") + + # 从最新时间开始获取新数据 + start_date = newest_time + else: + # 数据库表存在但没有数据 + logger.info(f"数据库表 {table_name} 存在但没有数据,将获取从 {HISTORY_START} 至今的所有新闻数据") + start_date = HISTORY_START + except Exception as e: + logger.error(f"查询最新新闻时间时出错: {e}") + logger.debug(f"完整的错误追踪信息:\n{traceback.format_exc()}") + # 出错时,从固定起始点获取 + start_date = HISTORY_START + + logger.info(f"开始获取 {start_date} 至 {end_date} 的新闻数据") + return DataFetcher._fetch_news_recursive(src, start_date, end_date, fields, table_name) + + @staticmethod + def _fetch_news_recursive(src, start_date, end_date, fields, table_name, batch_count=0): + """ + 递归获取新闻数据,处理API返回数据限制 + + 参数: + src (str): 新闻来源 + start_date (str): 开始日期,格式'YYYY-MM-DD HH:MM:SS' + end_date (str): 结束日期,格式'YYYY-MM-DD HH:MM:SS' + fields (str): 需要获取的字段 + table_name (str): 数据库表名 + batch_count (int): 批次计数,用于日志 + + 返回: + bool: 是否成功获取数据 + """ + try: + logger.info(f"获取新闻数据批次 #{batch_count + 1}: {start_date} 至 {end_date}") + + # 调用Tushare API获取新闻数据 + df = pro.news(src=src, start_date=start_date, end_date=end_date, fields=fields) + + if df.empty: + logger.info(f"当前时间范围内无新闻数据") + return True + + # 保存数据到数据库 + db_manager.save_df_to_db(df, table_name=table_name, if_exists='append') + logger.info(f"成功保存 {len(df)} 条新闻数据") + + # 如果返回的数据接近限制数量(1500条),可能还有更多数据 + if len(df) >= 1400: # 接近限制,设置一个略小的值作为阈值 + # 找到当前批次中最早的新闻时间 + earliest_date = df['datetime'].min() + + # 检查是否已经达到了期望的起始日期 + original_start = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S') + current_earliest = datetime.strptime(earliest_date, '%Y-%m-%d %H:%M:%S') + + if current_earliest <= original_start: + logger.info(f"已获取到指定开始日期的数据,获取完成") + return True + + # 将最早日期作为新的结束日期,继续获取更早的数据 + new_end_date = earliest_date + + # 递归调用,获取更早的数据 + logger.info(f"当前批次数据量接近API限制,继续获取更早数据") + sleep(1) # 避免频繁调用API + return DataFetcher._fetch_news_recursive(src, start_date, new_end_date, fields, + table_name, batch_count + 1) + + return True + + except Exception as e: + logger.error(f"获取新闻数据时出错: {e}") + logger.debug(f"完整的错误追踪信息:\n{traceback.format_exc()}") + return False + class DataReader: """