feat(news): 添加新闻数据获取和管理功能,支持智能增量更新和分批获取

This commit is contained in:
Qihang Zhang 2025-04-19 23:18:16 +08:00
parent 3253fd4951
commit ef7eabf3ce

View File

@ -196,6 +196,124 @@ class DataFetcher:
logger.info(f"更新API: {api}") logger.info(f"更新API: {api}")
DataFetcher.get_basic(api_name=api) DataFetcher.get_basic(api_name=api)
# 更新新闻数据
logger.info("更新新闻数据")
DataFetcher.get_news()
@staticmethod
def get_news(src='sina', fields=None):
"""
智能获取新闻数据只获取数据库中不存在的部分
参数
src (str): 新闻来源'sina''wallstreetcn'
fields (str): 需要获取的字段默认为'datetime,title,channels,content'
返回
bool: 是否成功获取数据
"""
table_name = 'news'
# 固定的历史起始点
HISTORY_START = '2025-04-15 00:00:00'
# 如果未指定字段,设置默认字段
if fields is None:
fields = 'datetime,title,channels,content'
# 获取当前时间作为结束时间
end_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 检查表是否存在,确定起始时间
if not db_manager.table_exists(table_name):
logger.info(f"{table_name} 不存在,将获取从 {HISTORY_START} 至今的所有新闻数据")
# 从固定起始点获取所有数据
start_date = HISTORY_START
else:
# 表存在,查询最新的新闻时间
try:
newest_query = f"SELECT MAX(datetime) as max_time FROM {table_name}"
newest_result = db_manager.query(newest_query)
# 如果有数据并且值不为None
if not newest_result.empty and newest_result['max_time'].iloc[0] is not None:
newest_time = newest_result['max_time'].iloc[0]
logger.info(f"数据库中最新的新闻时间为: {newest_time}")
# 从最新时间开始获取新数据
start_date = newest_time
else:
# 数据库表存在但没有数据
logger.info(f"数据库表 {table_name} 存在但没有数据,将获取从 {HISTORY_START} 至今的所有新闻数据")
start_date = HISTORY_START
except Exception as e:
logger.error(f"查询最新新闻时间时出错: {e}")
logger.debug(f"完整的错误追踪信息:\n{traceback.format_exc()}")
# 出错时,从固定起始点获取
start_date = HISTORY_START
logger.info(f"开始获取 {start_date}{end_date} 的新闻数据")
return DataFetcher._fetch_news_recursive(src, start_date, end_date, fields, table_name)
@staticmethod
def _fetch_news_recursive(src, start_date, end_date, fields, table_name, batch_count=0):
"""
递归获取新闻数据处理API返回数据限制
参数
src (str): 新闻来源
start_date (str): 开始日期格式'YYYY-MM-DD HH:MM:SS'
end_date (str): 结束日期格式'YYYY-MM-DD HH:MM:SS'
fields (str): 需要获取的字段
table_name (str): 数据库表名
batch_count (int): 批次计数用于日志
返回
bool: 是否成功获取数据
"""
try:
logger.info(f"获取新闻数据批次 #{batch_count + 1}: {start_date}{end_date}")
# 调用Tushare API获取新闻数据
df = pro.news(src=src, start_date=start_date, end_date=end_date, fields=fields)
if df.empty:
logger.info(f"当前时间范围内无新闻数据")
return True
# 保存数据到数据库
db_manager.save_df_to_db(df, table_name=table_name, if_exists='append')
logger.info(f"成功保存 {len(df)} 条新闻数据")
# 如果返回的数据接近限制数量(1500条),可能还有更多数据
if len(df) >= 1400: # 接近限制,设置一个略小的值作为阈值
# 找到当前批次中最早的新闻时间
earliest_date = df['datetime'].min()
# 检查是否已经达到了期望的起始日期
original_start = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
current_earliest = datetime.strptime(earliest_date, '%Y-%m-%d %H:%M:%S')
if current_earliest <= original_start:
logger.info(f"已获取到指定开始日期的数据,获取完成")
return True
# 将最早日期作为新的结束日期,继续获取更早的数据
new_end_date = earliest_date
# 递归调用,获取更早的数据
logger.info(f"当前批次数据量接近API限制继续获取更早数据")
sleep(1) # 避免频繁调用API
return DataFetcher._fetch_news_recursive(src, start_date, new_end_date, fields,
table_name, batch_count + 1)
return True
except Exception as e:
logger.error(f"获取新闻数据时出错: {e}")
logger.debug(f"完整的错误追踪信息:\n{traceback.format_exc()}")
return False
class DataReader: class DataReader:
""" """