import pandas as pd from datetime import datetime, timedelta from tqdm import tqdm import time import os from utils import load_config, get_engine, get_trade_cal # 加载配置并初始化tushare config = load_config() import tushare as ts ts.set_token(config['tushare_token']) pro = ts.pro_api() def get_kpl_data(start_date=None, end_date=None): """ 获取指定时间段内的打板数据 参数: start_date (str): 开始日期,格式'YYYYMMDD' end_date (str): 结束日期,格式'YYYYMMDD' 返回: pandas.DataFrame: 所有打板数据 """ # 获取目标交易日历 all_trade_dates = get_trade_cal(start_date, end_date) # 检查是否已有现有数据 existing_data = pd.DataFrame() existing_dates = set() output_file = 'ori_kpl_list.xlsx' if os.path.exists(output_file): try: print(f"检测到已有数据文件: {output_file}") existing_data = pd.read_excel(output_file) if not existing_data.empty and 'trade_date' in existing_data.columns: # 确保trade_date是字符串类型 existing_data['trade_date'] = existing_data['trade_date'].astype(str) # 提取已有数据的交易日期 existing_dates = set(existing_data['trade_date'].astype(str).unique()) print(f"已有数据包含 {len(existing_dates)} 个交易日") except Exception as e: print(f"读取现有数据时出错: {e}") # 确定需要获取的日期 dates_to_fetch = [date for date in all_trade_dates if date not in existing_dates] if not dates_to_fetch: print("所有数据均已存在,无需更新") return existing_data print(f"需要获取 {len(dates_to_fetch)} 个新交易日的数据") # 获取新的打板数据 new_data = [] for trade_date in tqdm(dates_to_fetch): try: # 不指定字段参数,获取所有返回的字段 df = pro.kpl_list(trade_date=trade_date, tag='涨停') if not df.empty: # 确保新数据的trade_date也是字符串类型 if 'trade_date' in df.columns: df['trade_date'] = df['trade_date'].astype(str) new_data.append(df) # 避免频繁请求导致API限制 time.sleep(0.5) except Exception as e: print(f"获取 {trade_date} 数据时出错: {e}") time.sleep(1) # 出错时稍微多等待一下 # 合并所有数据 if new_data: new_result = pd.concat(new_data, ignore_index=True) print(f"成功获取 {len(new_data)} 个交易日的新数据,共 {len(new_result)} 条记录") # 合并新旧数据 if not existing_data.empty: result = pd.concat([existing_data, new_result], ignore_index=True) print(f"合并后共有 {len(result)} 条记录") else: result = new_result # 进行一次去重操作,以防万一 if 'ts_code' in result.columns and 'trade_date' in result.columns: result = result.drop_duplicates(subset=['ts_code', 'trade_date'], keep='last') print(f"去重后共有 {len(result)} 条记录") # 确保trade_date是字符串类型后再排序 if 'trade_date' in result.columns: result['trade_date'] = result['trade_date'].astype(str) result = result.sort_values(by='trade_date', ascending=False) # 降序排列,最新的数据在前 print("数据已按交易日期排序") return result else: print("未获取到任何新数据") return existing_data if __name__ == "__main__": # 指定日期范围 start_date = '20250101' end_date = None # 获取打板数据 kpl_data = get_kpl_data(start_date, end_date) # 保存到Excel if not kpl_data.empty: kpl_data.to_excel('ori_kpl_list.xlsx', index=False) print(f"数据已保存到 ori_kpl_list.xlsx, 共 {len(kpl_data)} 条记录") else: print("没有数据可保存")