From 213df6aaff056867cec51bccf7d22f4c530ddac2 Mon Sep 17 00:00:00 2001 From: Qihang Zhang Date: Sat, 19 Apr 2025 14:35:28 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor(data=5Ffetcher):?= =?UTF-8?q?=20=E5=B0=86=E7=A1=AC=E7=BC=96=E7=A0=81=E7=9A=84=E8=A1=A8?= =?UTF-8?q?=E5=90=8D=E6=8F=90=E5=8F=96=E4=B8=BA=E5=8F=98=E9=87=8F=E5=B9=B6?= =?UTF-8?q?=E7=BB=9F=E4=B8=80=E7=AE=A1=E7=90=86=EF=BC=8C=E6=8F=90=E9=AB=98?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=8F=AF=E7=BB=B4=E6=8A=A4=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data_fetcher.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/data_fetcher.py b/data_fetcher.py index f140d17..a266a7f 100644 --- a/data_fetcher.py +++ b/data_fetcher.py @@ -60,11 +60,12 @@ class DataFetcher: """ # 获取目标交易日历 all_trade_dates = self.get_trade_cal(start_date, end_date) + table_key = 'moneyflow_ind_dc' # 确定需要获取的日期 if not force_update: # 从数据库获取已有的交易日期 - existing_dates = self.db_manager.get_existing_trade_dates(table_key='moneyflow_ind_dc') + existing_dates = self.db_manager.get_existing_trade_dates(table_key=table_key) # 筛选出需要新获取的日期 dates_to_fetch = [date for date in all_trade_dates if date not in existing_dates] else: @@ -74,7 +75,7 @@ class DataFetcher: if not dates_to_fetch: self.logger.info("所有数据已在数据库中,无需更新") - return self.db_manager.load_df_from_db(table_key='moneyflow_ind_dc') + return self.db_manager.load_df_from_db(table_key=table_key) self.logger.info(f"需要获取 {len(dates_to_fetch)} 个交易日的数据") @@ -98,19 +99,19 @@ class DataFetcher: new_df = pd.concat(all_new_data, ignore_index=True) if force_update: # 强制更新模式:需要删除已有的日期数据,然后重新插入 - existing_df = self.db_manager.load_df_from_db(table_key='moneyflow_ind_dc') + existing_df = self.db_manager.load_df_from_db(table_key=table_key) # 过滤掉需要更新的日期范围内的数据 filtered_df = existing_df[~existing_df['trade_date'].isin(dates_to_fetch)] # 拼接新数据 final_df = pd.concat([filtered_df, new_df], ignore_index=True) # 替换整个表 - self.db_manager.save_df_to_db(final_df, table_key='moneyflow_ind_dc', if_exists='replace') + self.db_manager.save_df_to_db(final_df, table_key=table_key, if_exists='replace') self.logger.info(f"已强制更新 {len(new_df)} 条记录到数据库") else: # 普通追加模式 - self.db_manager.save_df_to_db(new_df, table_key='moneyflow_ind_dc', if_exists='append') + self.db_manager.save_df_to_db(new_df, table_key=table_key, if_exists='append') self.logger.info(f"已将 {len(new_df)} 条新记录追加到数据库") else: self.logger.info("未获取到任何新数据") - return self.db_manager.load_df_from_db(table_key='moneyflow_ind_dc') \ No newline at end of file + return self.db_manager.load_df_from_db(table_key=table_key) \ No newline at end of file