From 099e0ee71033eab5ec0154d97be64fb5039d7472 Mon Sep 17 00:00:00 2001 From: Qihang Zhang Date: Fri, 18 Apr 2025 17:42:38 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(data):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E6=B6=A8=E5=81=9C=E6=9D=BF=E5=88=97=E8=A1=A8?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=8A=9F=E8=83=BD=E5=8F=8A=E4=BA=A4=E6=98=93?= =?UTF-8?q?=E6=97=A5=E5=8E=86=E5=B7=A5=E5=85=B7=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- get_kpl_list.py | 114 ++++++++++++++++++++++++++++++++++++++++++++++++ utils.py | 22 +++++++++- 2 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 get_kpl_list.py diff --git a/get_kpl_list.py b/get_kpl_list.py new file mode 100644 index 0000000..4c7df92 --- /dev/null +++ b/get_kpl_list.py @@ -0,0 +1,114 @@ +import pandas as pd +from datetime import datetime, timedelta +from tqdm import tqdm +import time +import os +from utils import load_config, get_engine, get_trade_cal + +# 加载配置并初始化tushare +config = load_config() +import tushare as ts + +ts.set_token(config['tushare_token']) +pro = ts.pro_api() + + +def get_kpl_data(start_date=None, end_date=None): + """ + 获取指定时间段内的打板数据 + 参数: + start_date (str): 开始日期,格式'YYYYMMDD' + end_date (str): 结束日期,格式'YYYYMMDD' + 返回: + pandas.DataFrame: 所有打板数据 + """ + # 获取目标交易日历 + all_trade_dates = get_trade_cal(start_date, end_date) + + # 检查是否已有现有数据 + existing_data = pd.DataFrame() + existing_dates = set() + output_file = 'ori_kpl_list.xlsx' + + if os.path.exists(output_file): + try: + print(f"检测到已有数据文件: {output_file}") + existing_data = pd.read_excel(output_file) + if not existing_data.empty and 'trade_date' in existing_data.columns: + # 确保trade_date是字符串类型 + existing_data['trade_date'] = existing_data['trade_date'].astype(str) + # 提取已有数据的交易日期 + existing_dates = set(existing_data['trade_date'].astype(str).unique()) + print(f"已有数据包含 {len(existing_dates)} 个交易日") + except Exception as e: + print(f"读取现有数据时出错: {e}") + + # 确定需要获取的日期 + dates_to_fetch = [date for date in all_trade_dates if date not in existing_dates] + + if not dates_to_fetch: + print("所有数据均已存在,无需更新") + return existing_data + + print(f"需要获取 {len(dates_to_fetch)} 个新交易日的数据") + + # 获取新的打板数据 + new_data = [] + for trade_date in tqdm(dates_to_fetch): + try: + # 不指定字段参数,获取所有返回的字段 + df = pro.kpl_list(trade_date=trade_date, tag='涨停') + if not df.empty: + # 确保新数据的trade_date也是字符串类型 + if 'trade_date' in df.columns: + df['trade_date'] = df['trade_date'].astype(str) + new_data.append(df) + # 避免频繁请求导致API限制 + time.sleep(0.5) + except Exception as e: + print(f"获取 {trade_date} 数据时出错: {e}") + time.sleep(1) # 出错时稍微多等待一下 + + # 合并所有数据 + if new_data: + new_result = pd.concat(new_data, ignore_index=True) + print(f"成功获取 {len(new_data)} 个交易日的新数据,共 {len(new_result)} 条记录") + + # 合并新旧数据 + if not existing_data.empty: + result = pd.concat([existing_data, new_result], ignore_index=True) + print(f"合并后共有 {len(result)} 条记录") + else: + result = new_result + + # 进行一次去重操作,以防万一 + if 'ts_code' in result.columns and 'trade_date' in result.columns: + result = result.drop_duplicates(subset=['ts_code', 'trade_date'], keep='last') + print(f"去重后共有 {len(result)} 条记录") + + # 确保trade_date是字符串类型后再排序 + if 'trade_date' in result.columns: + result['trade_date'] = result['trade_date'].astype(str) + result = result.sort_values(by='trade_date', ascending=False) # 降序排列,最新的数据在前 + print("数据已按交易日期排序") + + return result + else: + print("未获取到任何新数据") + return existing_data + + +if __name__ == "__main__": + # 指定日期范围 + start_date = '20250101' + end_date = None + + # 获取打板数据 + kpl_data = get_kpl_data(start_date, end_date) + + # 保存到Excel + if not kpl_data.empty: + kpl_data.to_excel('ori_kpl_list.xlsx', index=False) + print(f"数据已保存到 ori_kpl_list.xlsx, 共 {len(kpl_data)} 条记录") + else: + print("没有数据可保存") \ No newline at end of file diff --git a/utils.py b/utils.py index 15e8b59..600a27d 100644 --- a/utils.py +++ b/utils.py @@ -53,4 +53,24 @@ def get_engine(): connection_string = f"mysql+pymysql://{mysql['user']}:{mysql['password']}@{mysql['host']}:{mysql['port']}/{mysql['database']}?charset={mysql['charset']}&use_unicode=1" _engine = create_engine(connection_string) - return _engine \ No newline at end of file + return _engine + +def get_trade_cal(start_date=None, end_date=None): + """ + 获取指定时间段内的交易日历 + + 参数: + start_date (str): 开始日期,格式'YYYYMMDD' + end_date (str): 结束日期,格式'YYYYMMDD' + + 返回: + pandas.DataFrame: 交易日历 + """ + if start_date is None: + start_date = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d') + if end_date is None: + end_date = datetime.now().strftime('%Y%m%d') + + pro = ts.pro_api() + trade_cal_df = pro.trade_cal(exchange='', start_date=start_date, end_date=end_date) + return trade_cal_df[trade_cal_df['is_open'] == 1]['cal_date'].tolist() \ No newline at end of file