本文详细讲解如何借助 Pandas 与 Numba 的协同配合,在大规模行情数据中实现多个并发多头仓位的完全向量化止盈与止损退出检测。该方法彻底摒弃逐行循环或 apply 操作,在千万级数据量下可带来 10 至 100 倍的性能提升,并确保时间复杂度处于 O(n×m) 的可控范围内。
在量化回测过程中,处理多个并发多头仓位的止盈与止损退出始终是一项经典挑战。尤其当策略信号极为密集时(如网格交易或高频信号频繁触发),传统做法往往依赖 apply() 或 iterrows() 对每个入场点逐一扫描后续 K 线,导致计算复杂度急剧攀升至 O(N²)。一旦面对 700 万行以上的分钟级数据,这种方式几乎无法运行。本文提出的解决方案,核心思路在于将 Pandas 的便捷性与 Numba 的编译加速能力有机结合,打造一套生产就绪且完全向量化的退出计算框架。
核心设计原则
- 零 Python 循环开销:关键的双重循环逻辑交由 Numba 的 @njit 编译为原生机器码执行,Python 层面仅负责数据准备与结果汇总。
- 内存连续访问:high、low、tp、sl 等核心数组统一转换为 NumPy float64 格式的一维数组,有效规避 Pandas 索引跳转带来的额外性能损耗。
- 时间索引兼容性:由于 datetime64 在 Numba 中无法直接使用,先将其转为 int64(纳秒时间戳),再转换为 float64 传入函数;返回结果后恢复为 datetime64 格式。
- 空值鲁棒处理:自动跳过 longTpPrice 或 longSlPrice 为 NaN 的行,仅对确实存在入场点的位置执行计算。
完整可运行代码
import pandas as pd
import numpy as np
import numba
# 示例数据(同问题中)
df = pd.DataFrame({
'open': {pd.Timestamp('2021-01-03 22:11:00'): 1.22319, pd.Timestamp('2021-01-03 22:12:00'): 1.22315,
pd.Timestamp('2021-01-03 22:15:00'): 1.22324, pd.Timestamp('2021-01-03 22:16:00'): 1.22355,
pd.Timestamp('2021-01-03 22:17:00'): 1.22357},
'high': {pd.Timestamp('2021-01-03 22:11:00'): 1.22319, pd.Timestamp('2021-01-03 22:12:00'): 1.22318,
pd.Timestamp('2021-01-03 22:15:00'): 1.22358, pd.Timestamp('2021-01-03 22:16:00'): 1.2236,
pd.Timestamp('2021-01-03 22:17:00'): 1.22361},
'low': {pd.Timestamp('2021-01-03 22:11:00'): 1.22317, pd.Timestamp('2021-01-03 22:12:00'): 1.22315,
pd.Timestamp('2021-01-03 22:15:00'): 1.22324, pd.Timestamp('2021-01-03 22:16:00'): 1.22352,
pd.Timestamp('2021-01-03 22:17:00'): 1.22355},
'close': {pd.Timestamp('2021-01-03 22:11:00'): 1.22317, pd.Timestamp('2021-01-03 22:12:00'): 1.22315,
pd.Timestamp('2021-01-03 22:15:00'): 1.22358, pd.Timestamp('2021-01-03 22:16:00'): 1.22352,
pd.Timestamp('2021-01-03 22:17:00'): 1.22356},
'longEntrySignal': {pd.Timestamp('2021-01-03 22:11:00'): False, pd.Timestamp('2021-01-03 22:12:00'): False,
pd.Timestamp('2021-01-03 22:15:00'): True, pd.Timestamp('2021-01-03 22:16:00'): False,
pd.Timestamp('2021-01-03 22:17:00'): False},
'longEntry': {pd.Timestamp('2021-01-03 22:11:00'): False, pd.Timestamp('2021-01-03 22:12:00'): False,
pd.Timestamp('2021-01-03 22:15:00'): False, pd.Timestamp('2021-01-03 22:16:00'): True,
pd.Timestamp('2021-01-03 22:17:00'): False},
'longEntryPrice': {pd.Timestamp('2021-01-03 22:11:00'): np.nan, pd.Timestamp('2021-01-03 22:12:00'): np.nan,
pd.Timestamp('2021-01-03 22:15:00'): np.nan, pd.Timestamp('2021-01-03 22:16:00'): 1.22355,
pd.Timestamp('2021-01-03 22:17:00'): np.nan},
'longTpPrice': {pd.Timestamp('2021-01-03 22:11:00'): np.nan, pd.Timestamp('2021-01-03 22:12:00'): np.nan,
pd.Timestamp('2021-01-03 22:15:00'): np.nan, pd.Timestamp('2021-01-03 22:16:00'): 1.2243451663854852,
pd.Timestamp('2021-01-03 22:17:00'): np.nan},
'longSlPrice': {pd.Timestamp('2021-01-03 22:11:00'): np.nan, pd.Timestamp('2021-01-03 22:12:00'): np.nan,
pd.Timestamp('2021-01-03 22:15:00'): np.nan, pd.Timestamp('2021-01-03 22:16:00'): 1.2227548336145146,
pd.Timestamp('2021-01-03 22:17:00'): np.nan}
})
# --- 步骤 1:预分配输出列 ---
df["exitPrice"] = np.nan
df["exitTime"] = pd.NaT # 使用 NaT 而非 NaN,语义更准确
# --- 步骤 2:定义 Numba 加速函数 ---
@numba.njit
def get_long_exit(
index_floats, # float64 时间戳数组(纳秒级)
high_vals, # float64 high 序列
low_vals, # float64 low 序列
tp_prices, # float64 TP 价格序列(对应入场点)
sl_prices, # float64 SL 价格序列(对应入场点)
out_exit_price, # 输出:触发价格
out_exit_time # 输出:触发时间(float64)
):
n = len(index_floats)
for i in range(n - 1): # 遍历每个入场点索引
if np.isnan(tp_prices[i]) or np.isnan(sl_prices[i]):
continue
tp, sl = tp_prices[i], sl_prices[i]
# 向后搜索首个满足条件的 K 线
for j in range(i + 1, n):
h, l = high_vals[j], low_vals[j]
# 注意:多头止盈需 price >= TP;止损需 price <= SL
if h >= tp:
out_exit_price[i] = tp
out_exit_time[i] = index_floats[j]
break
elif l <= sl:
out_exit_price[i] = sl
out_exit_time[i] = index_floats[j]
break
# --- 步骤 3:准备输入并调用 ---
index_as_float = df.index.astype("int64").values.astype("float64") # 纳秒时间戳 → float64
get_long_exit(
index_as_float,
df["high"].values,
df["low"].values,
df["longTpPrice"].values,
df["longSlPrice"].values,
df["exitPrice"].values,
df["exitTime"].values
)
# --- 步骤 4:还原 exitTime 为 datetime64 ---
df["exitTime"] = pd.to_datetime(df["exitTime"], unit="ns")
print(df[["open", "high", "low", "close", "longEntry", "longEntryPrice",
"longTpPrice", "longSlPrice", "exitPrice", "exitTime"]])
关键注意事项
- Numba 兼容性:datetime64 在 Numba 中无法直接使用,必须预先转换为 int64(纳秒)再转为 float64 传入;返回后通过 pd.to_datetime(..., unit='ns') 还原为原始格式。
- 触发优先级:代码中先判断 if h >= tp,再判断 elif l <= sl,这意味着同一根 K 线同时满足 TP 与 SL 条件时,止盈优先触发——这一设计符合多数交易系统的实际需求。如需止损优先,只需调整条件判断顺序即可。
- 性能优化建议:
- 如果序列极长(例如超过 100 万行),可考虑分块处理,例如每 5 万行划分为一个区块,避免单次内层循环过深。
- 如果入场点较为稀疏(例如每千行才出现一个),可以先使用 np.where(df['longEntry']) 提取有效索引,仅对这些位置执行搜索,从而实现进一步的剪枝优化。
- 扩展性:该框架天然支持多空混合场景以及动态 TP/SL 策略(只需替换 tp_prices/sl_prices 数组),同时不同参数组合可并行调用 get_long_exit 函数,非常适合批量回测需求。
总结
这套方案从根本上摒弃了 Pandas 中低效的 apply 与隐式循环操作,借助 Numba 将核心退出逻辑编译至接近 C 语言的执行速度,同时保留了 Pandas 数据结构出色的易用性。它不仅提供了“多仓位 TP/SL 向量化”的技术解法,更代表了构建高性能量化回测引擎的一种关键范式——计算下沉、数据扁平化、类型显式化以及内存友好设计。
