backtrader/money_flow_analysis.py
Qihang Zhang c5ea6c3333 feat(money_flow): 添加板块资金流向分析功能
♻️ refactor(utils): 将数据库从MySQL迁移至SQLite,简化配置加载,并移除MySQL相关模块和实现

🔧 chore(config): 添加配置模板文件
2025-04-19 13:11:41 +08:00

72 lines
2.4 KiB
Python

import pandas as pd
from tqdm import tqdm
from utils import load_config, get_trade_cal
from utils import save_df_to_db, load_df_from_db, get_existing_trade_dates
# 加载配置并初始化tushare
config = load_config()
import tushare as ts
ts.set_token(config['tushare_token'])
pro = ts.pro_api()
def get_sector_moneyflow_data(start_date=None, end_date=None):
"""
获取指定时间段内的板块资金流向数据,使用数据库缓存
参数:
start_date (str): 开始日期,格式'YYYYMMDD'
end_date (str): 结束日期,格式'YYYYMMDD'
返回:
pandas.DataFrame: 所有板块资金流向数据
"""
# 获取目标交易日历
all_trade_dates = get_trade_cal(start_date, end_date)
# 从数据库获取已有的交易日期,使用表键名
existing_dates = get_existing_trade_dates(table_key='moneyflow_ind_dc')
# 筛选出需要新获取的日期
new_dates = [date for date in all_trade_dates if date not in existing_dates]
if not new_dates:
print("所有数据已在数据库中,无需更新")
return load_df_from_db(table_key='moneyflow_ind_dc')
print(f"需要获取 {len(new_dates)} 个新交易日的数据")
# 获取新日期的数据
all_new_data = []
# 使用tqdm显示进度
for trade_date in tqdm(new_dates):
try:
# 从tushare获取当日板块资金流向数据
df = pro.moneyflow_ind_dc(trade_date=trade_date)
# 如果有数据,添加到列表
if not df.empty:
all_new_data.append(df)
else:
print(f"日期 {trade_date} 无数据")
except Exception as e:
print(f"获取 {trade_date} 的数据时出错: {e}")
# 如果有新数据,合并并保存到数据库
if all_new_data:
# 将所有新数据合并为一个DataFrame
new_df = pd.concat(all_new_data, ignore_index=True)
# 保存到数据库,使用表键名
save_df_to_db(new_df, table_key='moneyflow_ind_dc', if_exists='append')
print(f"已将 {len(new_df)} 条新记录保存到数据库")
else:
print("未获取到任何新数据")
return load_df_from_db(table_key='moneyflow_ind_dc')
if __name__ == "__main__":
# 指定日期范围
start_date = '20250401'
end_date = None
# 获取板块资金流向数据
get_sector_moneyflow_data(start_date, end_date)