From 5e1f2b5fa7a6885e223b815d546194321cb9acb6 Mon Sep 17 00:00:00 2001 From: Qihang Zhang Date: Fri, 18 Apr 2025 19:50:37 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(get=5Fkpl=5Flist):=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E6=B6=A8=E5=81=9C=E6=9D=BF=E6=95=B0=E6=8D=AE=E5=88=86?= =?UTF-8?q?=E6=9E=90=E5=8A=9F=E8=83=BD=EF=BC=8C=E7=94=9F=E6=88=90=E6=9D=BF?= =?UTF-8?q?=E5=9D=97=E7=83=AD=E5=8A=9B=E5=9B=BEExcel=E6=8A=A5=E8=A1=A8?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- get_kpl_list.py | 154 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 145 insertions(+), 9 deletions(-) diff --git a/get_kpl_list.py b/get_kpl_list.py index 4c7df92..e6d2ee5 100644 --- a/get_kpl_list.py +++ b/get_kpl_list.py @@ -1,9 +1,12 @@ -import pandas as pd -from datetime import datetime, timedelta -from tqdm import tqdm -import time import os -from utils import load_config, get_engine, get_trade_cal +import time + +import openpyxl +import pandas as pd +from openpyxl.styles import PatternFill +from tqdm import tqdm + +from utils import load_config, get_trade_cal # 加载配置并初始化tushare config = load_config() @@ -63,8 +66,6 @@ def get_kpl_data(start_date=None, end_date=None): if 'trade_date' in df.columns: df['trade_date'] = df['trade_date'].astype(str) new_data.append(df) - # 避免频繁请求导致API限制 - time.sleep(0.5) except Exception as e: print(f"获取 {trade_date} 数据时出错: {e}") time.sleep(1) # 出错时稍微多等待一下 @@ -98,9 +99,141 @@ def get_kpl_data(start_date=None, end_date=None): return existing_data +def analyze_kpl_data(): + """ + 分析涨停板数据,统计每日各板块涨停数量,并使用热力图风格展示 + 按照最近100个工作日的涨停总数对板块进行排序 + 删除100个工作日内没有涨停记录的板块 + """ + print("开始分析涨停板数据...") + + # 1. 从原始文件读取数据 + try: + ori_data = pd.read_excel('ori_kpl_list.xlsx') + print(f"成功读取原始数据,共 {len(ori_data)} 条记录") + except Exception as e: + print(f"读取数据失败: {e}") + return + + # 确保日期字段是字符串类型 + ori_data['trade_date'] = ori_data['trade_date'].astype(str) + + # 获取所有唯一的交易日期和板块 + all_dates = sorted(ori_data['trade_date'].unique(), reverse=True) # 降序排列日期 + + if 'lu_desc' not in ori_data.columns: + print("错误: 原始数据中没有板块信息字段 'lu_desc'") + return + + # 获取所有唯一的板块 + all_sectors = ori_data['lu_desc'].dropna().unique() + print(f"数据包含 {len(all_dates)} 个交易日和 {len(all_sectors)} 个板块") + + # 创建一个包含所有日期和板块的DataFrame用于统计 + temp_result = pd.DataFrame(0, index=all_dates, columns=all_sectors) + + # 按日期分组统计 + for date in all_dates: + # 获取当日数据 + daily_data = ori_data[ori_data['trade_date'] == date] + + # 统计各板块涨停数量 + sector_counts = daily_data.groupby('lu_desc').size() + + # 更新临时结果DataFrame + for sector, count in sector_counts.items(): + if sector in temp_result.columns: + temp_result.loc[date, sector] = count + + # 计算最近100个工作日(或所有可用天数)的各板块涨停总数 + recent_days = min(100, len(all_dates)) + recent_dates = all_dates[:recent_days] + + # 计算这些日期内每个板块的涨停总数 + sector_totals = temp_result.loc[recent_dates].sum() + + # 筛选出在100个工作日内有涨停记录的板块 + active_sectors = sector_totals[sector_totals > 0].index.tolist() + + # 按照涨停总数对活跃板块进行排序 + sorted_sectors = sector_totals[active_sectors].sort_values(ascending=False).index.tolist() + + print(f"已按最近{recent_days}个工作日的涨停总数对板块排序") + print( + f"共保留了{len(sorted_sectors)}个有涨停记录的板块,删除了{len(all_sectors) - len(sorted_sectors)}个无涨停记录的板块") + + if sorted_sectors: + print("涨停数量前10的板块:") + for i, sector in enumerate(sorted_sectors[:min(10, len(sorted_sectors))], 1): + print(f"{i}. {sector}: {sector_totals[sector]}只") + + # 如果没有活跃板块,提前返回 + if not sorted_sectors: + print("警告: 在指定时间段内没有板块有涨停记录") + return pd.DataFrame() + + # 创建最终结果DataFrame,只使用有涨停记录的排序后的板块 + result = pd.DataFrame("", index=all_dates, columns=sorted_sectors) + + # 填充数据,只填入非零值 + for date in all_dates: + for sector in sorted_sectors: + count = temp_result.loc[date, sector] + if count > 0: + result.loc[date, sector] = count + + # 保存结果到新的Excel文件 + output_file = 'sector_limit_up_analysis.xlsx' + result.to_excel(output_file) + + # 创建热力图色阶函数:从浅红色到深红色(FFFF0000) + def get_heatmap_color(value): + try: + value = int(value) + # 将值限制在0-20范围内 + value = min(max(value, 0), 20) + + # 计算颜色深度 - 值越大颜色越深 + # 红色固定为FF,绿色和蓝色从FF(浅)递减到00(深) + intensity = int(255 - (value / 20 * 255)) + intensity_hex = format(intensity, '02X') + + # 构建颜色代码: 红色固定为FF,绿色和蓝色根据值变化 + color_code = f"FF{intensity_hex}{intensity_hex}" + + return color_code + except: + return None + + # 使用openpyxl添加热力图风格 + print("正在添加热力图样式...") + workbook = openpyxl.load_workbook(output_file) + worksheet = workbook.active + + # 遍历所有数据单元格 + for row in range(2, worksheet.max_row + 1): # 跳过标题行 + for col in range(2, worksheet.max_column + 1): # 跳过索引列 + cell = worksheet.cell(row=row, column=col) + if cell.value and str(cell.value).strip(): # 只处理非空单元格 + # 获取相应的热力图颜色 + color_code = get_heatmap_color(cell.value) + if color_code: + # 应用背景色 + cell.fill = PatternFill(start_color=color_code, end_color=color_code, fill_type="solid") + + # 保存格式化后的Excel + workbook.save(output_file) + + print(f"分析完成,结果已保存到 {output_file}") + print(f"统计了 {len(result.columns)} 个活跃板块的涨停数据") + print(f"已使用红色热力图标记涨停数量:0-20对应从浅红到深红") + + return result + + if __name__ == "__main__": # 指定日期范围 - start_date = '20250101' + start_date = '20220101' end_date = None # 获取打板数据 @@ -111,4 +244,7 @@ if __name__ == "__main__": kpl_data.to_excel('ori_kpl_list.xlsx', index=False) print(f"数据已保存到 ori_kpl_list.xlsx, 共 {len(kpl_data)} 条记录") else: - print("没有数据可保存") \ No newline at end of file + print("没有数据可保存") + + # 执行分析 + analyze_kpl_data() \ No newline at end of file