484 lines
20 KiB
Python
484 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
tail_engine.py — 꼬리잡기 백테스트·실매매 공통 엔진
|
|
====================================================
|
|
백테스트(backtest_web)와 실매매(kis_short_ver3)가 동일한 진입/청산 계산식을 쓰도록 공통 로직만 둠.
|
|
|
|
■ 엔진 공통 로직 (백테·실매 동일)
|
|
매수: 당일 낙폭(drop) ≥ min_drop_rate, 당일 회복률(rec_day) ≥ min_recovery_ratio,
|
|
망치봉 꼬리(tail_ratio/tail_pct), 3분봉 회복(rec_3m) 구간, RSI < rsi_threshold,
|
|
고점추격 방지(high_chase_thr), 시간대/쿨다운/max_daily.
|
|
매도: 손절가(sl_pct), 익절가(tp_pct), 어깨컷(shoulder_min_high + shoulder_cut_pct), 장마감.
|
|
|
|
■ 엔진 밖(실매매 전용): 금액손실컷, ATR 기반 손절/목표가, 퀵프로핏, MA20/ML 필터 등.
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
def get_tail_defaults_from_db(db=None) -> Dict[str, Any]:
|
|
"""
|
|
env_config 최신 행에서 꼬리잡기 관련 값 전부 로드.
|
|
백테스트·파라미터서치·실매매가 동일 DB 값을 쓰도록 단일 소스.
|
|
db: 기존 TradeDB 인스턴스. 주면 연결 생성/종료 없이 재사용 (봇 메인 루프에서 반복 호출 시 로그 스팸·연결 낭비 방지).
|
|
결과에 영향을 주는 env 키: MIN_DROP_RATE, MIN_RECOVERY_RATIO_SHORT, TAIL_RATIO_MIN, TAIL_PCT_MIN,
|
|
STOP_LOSS_PCT, TAKE_PROFIT_PCT, SHOULDER_MIN_HIGH_PCT, SHOULDER_CUT_PCT, RSI_PERIOD, RSI_OVERHEAT_THRESHOLD,
|
|
REENTRY_COOLDOWN_SEC, HIGH_PRICE_CHASE_THRESHOLD, MAX_RECOVERY_RATIO_3M, (시간/일일한도).
|
|
"""
|
|
own_db = None
|
|
try:
|
|
if db is None:
|
|
from database import TradeDB
|
|
own_db = TradeDB()
|
|
db = own_db
|
|
row = db.conn.execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1").fetchone()
|
|
if row:
|
|
r = dict(row)
|
|
min_drop = float(r.get("MIN_DROP_RATE") or 0.03)
|
|
min_rec = float(r.get("MIN_RECOVERY_RATIO_SHORT") or 0.5)
|
|
tail_ratio = float(r.get("TAIL_RATIO_MIN") or 1.5)
|
|
tail_pct = float(r.get("TAIL_PCT_MIN") or 0.003)
|
|
sl_pct = abs(float(r.get("STOP_LOSS_PCT") or -0.03))
|
|
tp_pct = float(r.get("TAKE_PROFIT_PCT") or 0.05)
|
|
shoulder_high = float(r.get("SHOULDER_MIN_HIGH_PCT") or 0.015)
|
|
shoulder_cut = float(r.get("SHOULDER_CUT_PCT") or 0.03)
|
|
cooldown_sec = int(float(r.get("REENTRY_COOLDOWN_SEC") or 900))
|
|
rsi_period = int(r.get("RSI_PERIOD") or 14)
|
|
rsi_threshold = float(r.get("RSI_OVERHEAT_THRESHOLD") or 78)
|
|
max_rec_3m = float(r.get("MAX_RECOVERY_RATIO_3M") or 0.8)
|
|
high_chase = float(r.get("HIGH_PRICE_CHASE_THRESHOLD") or 0.96)
|
|
time_start = int(r.get("TAIL_TIME_START") or r.get("TIME_START") or 930)
|
|
time_end = int(r.get("TAIL_TIME_END") or r.get("TIME_END") or 1500)
|
|
max_daily = int(r.get("MAX_DAILY_TAIL") or r.get("MAX_STOCKS") or 3)
|
|
else:
|
|
min_drop, min_rec = 0.03, 0.5
|
|
tail_ratio, tail_pct = 1.5, 0.003
|
|
sl_pct, tp_pct = 0.03, 0.05
|
|
shoulder_high, shoulder_cut = 0.015, 0.03
|
|
cooldown_sec, rsi_period, rsi_threshold = 900, 14, 78.0
|
|
max_rec_3m, high_chase = 0.8, 0.96
|
|
time_start, time_end, max_daily = 930, 1500, 3
|
|
except Exception:
|
|
min_drop, min_rec = 0.03, 0.5
|
|
tail_ratio, tail_pct = 1.5, 0.003
|
|
sl_pct, tp_pct = 0.03, 0.05
|
|
shoulder_high, shoulder_cut = 0.015, 0.03
|
|
cooldown_sec, rsi_period, rsi_threshold = 900, 14, 78.0
|
|
max_rec_3m, high_chase = 0.8, 0.96
|
|
time_start, time_end, max_daily = 930, 1500, 3
|
|
finally:
|
|
if own_db is not None:
|
|
try:
|
|
own_db.close()
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"min_drop_rate": min_drop,
|
|
"min_recovery_ratio": min_rec,
|
|
"max_rec_3m": max_rec_3m,
|
|
"tail_ratio_min": tail_ratio,
|
|
"tail_pct_min": tail_pct,
|
|
"sl_pct": sl_pct,
|
|
"tp_pct": tp_pct,
|
|
"shoulder_min_high": shoulder_high,
|
|
"shoulder_cut_pct": shoulder_cut,
|
|
"rsi_period": rsi_period,
|
|
"rsi_threshold": rsi_threshold,
|
|
"high_chase_thr": high_chase,
|
|
"cooldown_min": cooldown_sec // 60,
|
|
"time_start_hm": time_start,
|
|
"time_end_hm": time_end,
|
|
"max_daily": max_daily,
|
|
}
|
|
|
|
|
|
def compute_rsi_series(closes: List[float], period: int = 14) -> List[Optional[float]]:
|
|
"""RSI 시리즈 (Wilder 스무딩). backtest_web과 동일."""
|
|
rsi_list: List[Optional[float]] = [None] * len(closes)
|
|
if len(closes) < period + 1:
|
|
return rsi_list
|
|
deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
|
|
gains = [max(d, 0) for d in deltas]
|
|
losses = [max(-d, 0) for d in deltas]
|
|
avg_gain = sum(gains[:period]) / period
|
|
avg_loss = sum(losses[:period]) / period
|
|
for i in range(period, len(closes)):
|
|
idx = i - 1
|
|
if i > period:
|
|
avg_gain = (avg_gain * (period - 1) + gains[idx]) / period
|
|
avg_loss = (avg_loss * (period - 1) + losses[idx]) / period
|
|
rs = avg_gain / avg_loss if avg_loss > 0 else float("inf")
|
|
rsi_val = 100 - (100 / (1 + rs)) if avg_loss > 0 else 100.0
|
|
rsi_list[i] = rsi_val
|
|
return rsi_list
|
|
|
|
|
|
def _t2dt(t: str) -> datetime:
|
|
"""candle_time 문자열 → datetime."""
|
|
return datetime.strptime(t, "%Y%m%d%H%M")
|
|
|
|
|
|
def check_buy_signal_live(
|
|
candles: List[Dict],
|
|
params: Dict[str, Any],
|
|
state: Dict[str, Any],
|
|
) -> tuple:
|
|
"""
|
|
실시간: 마지막 봉이 백테스트(run_tail_backtest)와 동일한 꼬리잡기 매수 조건을 만족하는지 판단.
|
|
로직 단일 소스: run_tail_backtest와 완전 동일한 조건·계산식.
|
|
|
|
candles: 3분봉 리스트 (candle_time, open, high, low, close, volume)
|
|
state: { "last_exit_dt": datetime|None, "daily_cnt": int }
|
|
|
|
반환: (reject_reason, reject_msg, signal_dict)
|
|
- 통과 시: (None, None, {"signal": True, "tail_ratio", "recovery_pos", "rsi_val"})
|
|
- 탈락 시: ("탈락-XXX", "상세메시지(숫자포함)", None) → 호출측에서 "🔍 [탈락-XXX] name code: 상세메시지" 로그
|
|
"""
|
|
if len(candles) < 10:
|
|
return ("탈락-데이터", "봉 수 부족 (len<10)", None)
|
|
i = len(candles) - 1
|
|
c = candles[i]
|
|
day = c["candle_time"][:8]
|
|
hm = int(c["candle_time"][8:12])
|
|
op = float(c["open"])
|
|
hi = float(c["high"])
|
|
lo = float(c["low"])
|
|
cl = float(c["close"])
|
|
|
|
min_drop_rate = float(params.get("min_drop_rate", 0.03))
|
|
min_recovery_ratio = float(params.get("min_recovery_ratio", 0.5))
|
|
max_rec_3m = float(params.get("max_rec_3m", 0.8))
|
|
tail_ratio_min = float(params.get("tail_ratio_min", 1.5))
|
|
tail_pct_min = float(params.get("tail_pct_min", 0.003))
|
|
rsi_period = int(params.get("rsi_period", 14))
|
|
rsi_threshold = float(params.get("rsi_threshold", 78))
|
|
high_chase_thr = float(params.get("high_chase_thr", 0.96))
|
|
time_start_hm = int(params.get("time_start_hm", 930))
|
|
time_end_hm = int(params.get("time_end_hm", 1500))
|
|
cooldown_min = float(params.get("cooldown_min", 15))
|
|
max_daily = int(params.get("max_daily", 3))
|
|
|
|
if hm < time_start_hm or hm > time_end_hm:
|
|
return (None, None, None) # 시간대 탈락 (로그 생략 가능)
|
|
if state.get("daily_cnt", 0) >= max_daily:
|
|
return (None, None, None)
|
|
last_exit_dt = state.get("last_exit_dt")
|
|
if last_exit_dt is not None:
|
|
elapsed = (_t2dt(c["candle_time"]) - last_exit_dt).total_seconds() / 60
|
|
if elapsed < cooldown_min:
|
|
return (None, None, None)
|
|
|
|
# 당일 누적 OHLC (run_tail_backtest와 동일)
|
|
running_open = op
|
|
running_high = hi
|
|
running_low = lo if lo > 0 else hi
|
|
for j in range(i - 1, -1, -1):
|
|
if candles[j]["candle_time"][:8] != day:
|
|
break
|
|
running_open = float(candles[j]["open"])
|
|
running_high = max(running_high, float(candles[j]["high"]))
|
|
lj = float(candles[j]["low"])
|
|
if lj > 0:
|
|
running_low = min(running_low, lj)
|
|
|
|
if cl <= 0 or running_open <= 0:
|
|
return (None, None, None)
|
|
|
|
drop = (running_open - running_low) / running_open
|
|
if drop < min_drop_rate:
|
|
return (
|
|
"탈락-낙폭",
|
|
f"낙폭 {drop*100:.2f}% < {min_drop_rate*100:.1f}% (당일 시가 {running_open:,.0f}원 → 저점 {running_low:,.0f}원)",
|
|
None,
|
|
)
|
|
day_range = running_high - running_low
|
|
rec_day = (cl - running_low) / day_range if day_range > 0 else 0
|
|
if rec_day < min_recovery_ratio:
|
|
return (
|
|
"탈락-회복률",
|
|
f"회복률 {rec_day*100:.1f}% < {min_recovery_ratio*100:.0f}% (저점 {running_low:,.0f}원 → 현재 {cl:,.0f}원 / 범위 {day_range:,.0f}원)",
|
|
None,
|
|
)
|
|
|
|
# 망치봉 꼬리 (run_tail_backtest와 동일: 최대 3봉 전까지 탐색)
|
|
body_top = max(op, cl)
|
|
body_bot = min(op, cl)
|
|
body_len = body_top - body_bot if body_top > body_bot else 1.0
|
|
tail_len = body_bot - lo if lo > 0 else 0.0
|
|
lo_use = lo
|
|
if tail_len <= 0:
|
|
for j in range(i - 1, max(i - 4, rsi_period), -1):
|
|
prev = candles[j]
|
|
o2 = float(prev["open"])
|
|
l2 = float(prev["low"])
|
|
c2 = float(prev["close"])
|
|
if l2 <= 0:
|
|
continue
|
|
bt2, bb2 = max(o2, c2), min(o2, c2)
|
|
bl2 = bt2 - bb2 if bt2 > bb2 else 1.0
|
|
tl2 = bb2 - l2
|
|
if tl2 > 0:
|
|
tail_len, body_len, lo_use = tl2, bl2, l2
|
|
break
|
|
tail_ratio = tail_len / body_len if body_len > 0 else 0
|
|
tail_pct = tail_len / lo_use if lo_use > 0 and tail_len > 0 else 0.0
|
|
if tail_ratio < tail_ratio_min or tail_pct < tail_pct_min:
|
|
return (
|
|
"탈락-꼬리",
|
|
f"꼬리비율 {tail_ratio:.2f} (기준 {tail_ratio_min}) 또는 꼬리% {tail_pct*100:.2f}% (기준 {tail_pct_min*100:.2f}%)",
|
|
None,
|
|
)
|
|
|
|
c_range = hi - lo if hi > lo else 0
|
|
rec_3m = (cl - lo) / c_range if c_range > 0 else 0
|
|
if not (min_recovery_ratio <= rec_3m <= max_rec_3m):
|
|
return (
|
|
"탈락-회복3분",
|
|
f"3분봉 회복률 {rec_3m*100:.1f}% (기준 {min_recovery_ratio*100:.0f}~{max_rec_3m*100:.0f}%)",
|
|
None,
|
|
)
|
|
|
|
closes = [float(x["close"]) for x in candles]
|
|
rsis = compute_rsi_series(closes, rsi_period)
|
|
rsi_val = rsis[i] if i < len(rsis) else None
|
|
if rsi_val is None or rsi_val >= rsi_threshold:
|
|
return (
|
|
"탈락-RSI",
|
|
(f"RSI {rsi_val:.1f}" if rsi_val is not None else "RSI None") + f" ≥ {rsi_threshold:.0f}",
|
|
None,
|
|
)
|
|
if cl >= running_high * high_chase_thr:
|
|
return (
|
|
"탈락-피뢰침 고점추격",
|
|
f"현재가 {cl:,.0f} ≥ 고점대비 {high_chase_thr*100:.0f}%",
|
|
None,
|
|
)
|
|
|
|
return (
|
|
None,
|
|
None,
|
|
{"signal": True, "tail_ratio": tail_ratio, "tail_pct": tail_pct, "recovery_pos": rec_3m, "rsi_val": rsi_val},
|
|
)
|
|
|
|
|
|
def check_sell_signal_live(
|
|
position: Dict[str, Any],
|
|
current_candle: Dict[str, Any],
|
|
params: Dict[str, Any],
|
|
is_eod: bool = False,
|
|
) -> Optional[tuple]:
|
|
"""
|
|
실시간: 백테스트와 동일한 청산 조건 (손절/익절/어깨컷/장마감).
|
|
position: entry_price, entry_time, stop, target, max_price
|
|
current_candle: high, low, close
|
|
반환: (reason_str, exit_price) 또는 None
|
|
"""
|
|
sl_pct = float(params.get("sl_pct", 0.03))
|
|
tp_pct = float(params.get("tp_pct", 0.05))
|
|
shoulder_min_high = float(params.get("shoulder_min_high", 0.015))
|
|
shoulder_cut_pct = float(params.get("shoulder_cut_pct", 0.03))
|
|
|
|
hi = float(current_candle.get("high", current_candle["close"]))
|
|
lo = float(current_candle.get("low", current_candle["close"]))
|
|
cl = float(current_candle["close"])
|
|
max_p = max(position["max_price"], hi)
|
|
ep = position["entry_price"]
|
|
stop = position.get("stop", ep * (1 - sl_pct))
|
|
target = position.get("target", ep * (1 + tp_pct))
|
|
|
|
reason = None
|
|
exit_price = cl
|
|
if lo > 0 and lo <= stop:
|
|
reason = "손절"
|
|
exit_price = stop
|
|
elif hi >= target:
|
|
reason = "익절"
|
|
exit_price = target
|
|
elif max_p >= ep * (1 + shoulder_min_high) and cl <= max_p * (1 - shoulder_cut_pct):
|
|
reason = "어깨컷"
|
|
exit_price = cl
|
|
if reason is None and is_eod:
|
|
reason = "장마감"
|
|
exit_price = cl
|
|
if reason:
|
|
return (reason, exit_price)
|
|
return None
|
|
|
|
|
|
def run_tail_backtest(
|
|
candles_by_code: Dict[str, List[Dict]],
|
|
params: Dict[str, Any],
|
|
) -> List[Dict]:
|
|
"""
|
|
백테스트 1회 실행 (backtest_web과 동일 로직). API에서 호출해 단일 소스 유지.
|
|
candles_by_code: code -> list of {candle_time, open, high, low, close, volume}
|
|
params: get_tail_defaults_from_db() + sl_pct, tp_pct 등
|
|
반환: all_trades 리스트
|
|
"""
|
|
min_drop_rate = float(params.get("min_drop_rate", 0.03))
|
|
min_recovery_ratio = float(params.get("min_recovery_ratio", 0.5))
|
|
max_rec_3m = float(params.get("max_rec_3m", 0.8))
|
|
tail_ratio_min = float(params.get("tail_ratio_min", 1.5))
|
|
tail_pct_min = float(params.get("tail_pct_min", 0.003))
|
|
sl_pct = float(params.get("sl_pct", 0.03))
|
|
tp_pct = float(params.get("tp_pct", 0.05))
|
|
shoulder_min_high = float(params.get("shoulder_min_high", 0.015))
|
|
shoulder_cut_pct = float(params.get("shoulder_cut_pct", 0.03))
|
|
rsi_period = int(params.get("rsi_period", 14))
|
|
rsi_threshold = float(params.get("rsi_threshold", 78))
|
|
high_chase_thr = float(params.get("high_chase_thr", 0.96))
|
|
time_start_hm = int(params.get("time_start_hm", 930))
|
|
time_end_hm = int(params.get("time_end_hm", 1500))
|
|
cooldown_min = float(params.get("cooldown_min", 15))
|
|
max_daily = int(params.get("max_daily", 3))
|
|
|
|
all_trades: List[Dict] = []
|
|
for code, candles in candles_by_code.items():
|
|
if len(candles) < rsi_period + 5:
|
|
continue
|
|
closes = [float(c["close"]) for c in candles]
|
|
rsis = compute_rsi_series(closes, rsi_period)
|
|
position = None
|
|
last_exit_dt: Dict[str, datetime] = {}
|
|
daily_cnt: Dict[str, int] = {}
|
|
cur_day = None
|
|
running_open, running_high, running_low = 0.0, 0.0, 0.0
|
|
|
|
i = rsi_period + 1
|
|
while i < len(candles):
|
|
c = candles[i]
|
|
day = c["candle_time"][:8]
|
|
hm = int(c["candle_time"][8:12])
|
|
op = float(c["open"])
|
|
hi = float(c["high"])
|
|
lo = float(c["low"])
|
|
cl = float(c["close"])
|
|
|
|
if day != cur_day:
|
|
cur_day = day
|
|
running_open = op
|
|
running_high = hi
|
|
running_low = lo if lo > 0 else hi
|
|
else:
|
|
running_high = max(running_high, hi)
|
|
if lo > 0:
|
|
running_low = min(running_low, lo)
|
|
|
|
is_eod = (i == len(candles) - 1) or (candles[i + 1]["candle_time"][:8] != day)
|
|
|
|
if position is not None:
|
|
max_p = max(position["max_price"], hi)
|
|
position["max_price"] = max_p
|
|
reason = None
|
|
exit_price = cl
|
|
if lo > 0 and lo <= position["stop"]:
|
|
reason = "손절"
|
|
exit_price = position["stop"]
|
|
elif hi >= position["target"]:
|
|
reason = "익절"
|
|
exit_price = position["target"]
|
|
elif max_p >= position["entry_price"] * (1 + shoulder_min_high) and cl <= max_p * (1 - shoulder_cut_pct):
|
|
reason = "어깨컷"
|
|
exit_price = cl
|
|
elif is_eod:
|
|
reason = "장마감"
|
|
exit_price = cl
|
|
if reason:
|
|
all_trades.append({
|
|
"code": code,
|
|
"entry_time": position["entry_time"],
|
|
"exit_time": c["candle_time"],
|
|
"entry": round(position["entry_price"]),
|
|
"exit": round(exit_price),
|
|
"pnl": 0,
|
|
"reason": reason,
|
|
"hold_min": 0,
|
|
})
|
|
last_exit_dt[day] = _t2dt(c["candle_time"])
|
|
daily_cnt[day] = daily_cnt.get(day, 0) + 1
|
|
position = None
|
|
i += 1
|
|
continue
|
|
|
|
if cl <= 0 or running_open <= 0:
|
|
i += 1
|
|
continue
|
|
if hm < time_start_hm or hm > time_end_hm:
|
|
i += 1
|
|
continue
|
|
if daily_cnt.get(day, 0) >= max_daily:
|
|
i += 1
|
|
continue
|
|
if day in last_exit_dt:
|
|
elapsed = (_t2dt(c["candle_time"]) - last_exit_dt[day]).total_seconds() / 60
|
|
if elapsed < cooldown_min:
|
|
i += 1
|
|
continue
|
|
|
|
drop = (running_open - running_low) / running_open
|
|
if drop < min_drop_rate:
|
|
i += 1
|
|
continue
|
|
day_range = running_high - running_low
|
|
rec_day = (cl - running_low) / day_range if day_range > 0 else 0
|
|
if rec_day < min_recovery_ratio:
|
|
i += 1
|
|
continue
|
|
|
|
body_top = max(op, cl)
|
|
body_bot = min(op, cl)
|
|
body_len = body_top - body_bot if body_top > body_bot else 1.0
|
|
tail_len = body_bot - lo if lo > 0 else 0.0
|
|
lo_use = lo
|
|
if tail_len <= 0:
|
|
for j in range(i - 1, max(i - 4, rsi_period), -1):
|
|
prev = candles[j]
|
|
o2, l2, c2 = float(prev["open"]), float(prev["low"]), float(prev["close"])
|
|
if l2 <= 0:
|
|
continue
|
|
bt2, bb2 = max(o2, c2), min(o2, c2)
|
|
bl2 = bt2 - bb2 if bt2 > bb2 else 1.0
|
|
tl2 = bb2 - l2
|
|
if tl2 > 0:
|
|
tail_len, body_len, lo_use = tl2, bl2, l2
|
|
break
|
|
tail_ratio = tail_len / body_len if body_len > 0 else 0
|
|
tail_pct = tail_len / lo_use if lo_use > 0 and tail_len > 0 else 0.0
|
|
if tail_ratio < tail_ratio_min or tail_pct < tail_pct_min:
|
|
i += 1
|
|
continue
|
|
c_range = hi - lo if hi > lo else 0
|
|
rec_3m = (cl - lo) / c_range if c_range > 0 else 0
|
|
if not (min_recovery_ratio <= rec_3m <= max_rec_3m):
|
|
i += 1
|
|
continue
|
|
rsi_val = rsis[i] if i < len(rsis) else None
|
|
if rsi_val is None or rsi_val >= rsi_threshold:
|
|
i += 1
|
|
continue
|
|
if cl >= running_high * high_chase_thr:
|
|
i += 1
|
|
continue
|
|
|
|
if i + 1 >= len(candles):
|
|
i += 1
|
|
continue
|
|
next_c = candles[i + 1]
|
|
if next_c["candle_time"][:8] != day:
|
|
i += 1
|
|
continue
|
|
entry_price = float(next_c["open"])
|
|
if entry_price <= 0:
|
|
entry_price = cl
|
|
position = {
|
|
"entry_price": entry_price,
|
|
"entry_time": next_c["candle_time"],
|
|
"stop": entry_price * (1 - sl_pct),
|
|
"target": entry_price * (1 + tp_pct),
|
|
"max_price": entry_price,
|
|
}
|
|
i += 1 # 진입 봉(next) 건너뜀 — 다음 봉부터 포지션 보유로 청산 체크
|
|
continue
|
|
|
|
return all_trades
|